from pathlib import Path import urllib.request from ansible.errors import AnsibleError from ansible.executor.stats import AggregateStats from ansible.playbook import Playbook from ansible.plugins.callback import CallbackBase DOCUMENTATION = r''' author: Dustin C. Hatch name: ntfy short_description: Send notifications to ntfy.sh description: - This plugin sends playbook failure notifications via ntfy.sh. options: server: description: ntfy.sh server type: str default: https://ntfy.sh env: - name: NTFY_SERVER ini: - section: callback_ntfy key: server topic: description: ntfy.sh topic name type: str default: ansible env: - name: NTFY_TOPIC ini: - section: callback_ntfy key: topic ''' class CallbackModule(CallbackBase): CALLBACK_VERSION = 1.0 CALLBACK_TYPE = 'notification' CALLBACK_NAME = 'ntfy' def __init__(self): super().__init__() self.playbook = None def set_options(self, task_keys=None, var_options=None, direct=None): super().set_options(task_keys, var_options, direct) ntfy_server = self.get_option('server').rstrip('/') if '://' not in ntfy_server: ntfy_server = f'http://{ntfy_server}' ntfy_topic = self.get_option('topic') self.ntfy_url = f'{ntfy_server}/{ntfy_topic}' def v2_playbook_on_start(self, playbook: Playbook): self.playbook = playbook def v2_playbook_on_stats(self, stats: AggregateStats): if not self.playbook: return if not stats.failures and not stats.dark: return assert self.playbook._file_name playbook = Path(self.playbook._file_name) results = {} hosts = set(stats.failures.keys()).union(stats.dark.keys()) title = f'Playbook {playbook.name} failed for {len(hosts)} hosts' for host in hosts: results[host] = { 'ok': stats.ok.get(host, 0), 'changed': stats.changed.get(host, 0), 'unreachable': stats.dark.get(host, 0), 'failed': stats.failures.get(host, 0), 'skipped': stats.skipped.get(host, 0), 'rescued': stats.rescued.get(host, 0), 'ignored': stats.ignored.get(host, 0), } lines = [] for host, result in results.items(): result_txt = ' '.join(f'{k}={v}' for k, v in result.items()) lines.append(f'{host} : {result_txt}') message = '\n'.join(lines) req = urllib.request.Request( self.ntfy_url, method='POST', data=message.encode('utf-8'), ) req.add_header('Title', title) req.add_header('Tag', 'red_circle') with urllib.request.urlopen(req) as response: status_code = response.getcode() if status_code < 200 or status_code >= 300: response_data = response.read() raise AnsibleError(f'Failed to send notification: {response_data.decode()}')