ansible-playbook api 调用
Inventory
如果需要动态生成 Inventory, 可以通过继承 InventoryManager 进行修改
from ansible.inventory.manager import InventoryManager
inventory_info = {'slave': [{'ip': '192.168.1.101',
'ansible_ssh_port': 2222,
'ansible_ssh_user': 'root'},
{'ip': '192.168.56.101',
'ansible_ssh_user': 'root'}],
'master': [{'ip': '192.168.56.101',
'ansible_ssh_user': 'root'}]}
group_vars = {'master': {'firewall_port': [{'protocol': 'tcp', 'port': 5051},
{'protocol': 'udp', 'port': 5052}]}}
class AnsibleInventory(InventoryManager):
def __init__(self, inventory_info, group_vars, loader):
super().__init__(loader)
self.inventory_info = inventory_info
self.group_vars = group_vars
self.set_inventory()
def set_inventory(self):
for group, hosts in self.inventory_info.items():
self.add_group(group)
if group in self.group_vars:
for k, v in self.group_vars[group].items():
self.groups[group].set_variable(k, v)
for host in hosts:
self.add_host(host.get('ip'), group, host.get('ansible_ssh_port'))
self.get_host(host.get('ip')).set_variable('ansible_ssh_user', host.get('ansible_ssh_user'))
Options
ansible 运行时候需要的一些参数,也可以通过 namedtuple 来定义
class Options(object):
def __init__(self, connection='ssh', forks=10, listtags=False, listtasks=False, listhosts=False, syntax=False,
ssh_extra_args=None, sftp_extra_args=None, scp_extra_args=None, become=False, become_method=None,
become_user=None, verbosity=None, check=False, diff=False, host_key_checking=False, module_path=None,
remote_user=None, private_key_file=None, ssh_common_args=None):
self.connection = connection
self.forks = forks
self.listtags = listtags
self.listtasks = listtasks
self.listhosts = listhosts
self.syntax = syntax
self.ssh_extra_args = ssh_extra_args
self.sftp_extra_args = sftp_extra_args
self.scp_extra_args = scp_extra_args
self.become = become
self.become_method = become_method
self.become_user = become_user
self.verbosity = verbosity
self.check = check
self.diff = diff
self.host_key_checking = host_key_checking
self.module_path = module_path
self.remote_user = remote_user
self.private_key_file = private_key_file
self.ssh_common_args = ssh_common_args
Callback
默认会将运行结果输出到 stdout 或者日志文件中,如果想对输出结果进行自定义或者其他 callback 操作,可以定义自己的 callback 类。
这里我需要保留 stdout 的输出并返回一些其他定义,继承了ansible.plugins.callback.default.CallbackModule
。如果不需要原始的callback,可以选择继承ansible.plugins.callback.CallbackBase
from ansible.plugins.callback.default import CallbackModule
class ResultsCallback(CallbackModule):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.ok = {}
self.failed = {}
self.unreachable = {}
def v2_runner_on_ok(self, result):
self.ok[result._host.get_name()] = result
super().v2_runner_on_ok(result)
def v2_runner_on_failed(self, result, ignore_errors=False):
self.failed[result._host.get_name()] = self._dump_results(result._result)
super().v2_runner_on_failed(result, ignore_errors=False)
def v2_runner_on_unreachable(self, result):
self.unreachable[result._host.get_name()] = result
super().v2_runner_on_unreachable(result)
Playbook 调用
from ansible.parsing.dataloader import DataLoader
from ansible.executor.playbook_executor import PlaybookExecutor
from ansible.vars.manager import VariableManager
class Playbook(object):
def __init__(self):
self.loader = DataLoader()
self.inventory = AnsibleInventory(inventory_info, loader)
# inventory = InventoryManager(loader, '/path/to/hosts') 如果不想动态生成,也可以指定 inventory文件
self.options = Options(connection='ssh', forks=10)
self.variable_manager = VariableManager(loader=self.loader, inventory=self.inventory)
self.callback = ResultsCallback()
def run(self):
pbex = PlaybookExecutor(playbooks=['/path/to/playbook.yml'],
inventory=self.inventory,
variable_manager=self.variable_manager,
loader=self.loader,
options=self.options,
passwords={})
pbex._tqm._stdout_callback = self.callback
pbex.run()
参考
ansible2.0 playbook api运维应用
Ansible常用模块API调用
Running Ansible 2 Programmatically