#!/usr/bin/env python3 import asyncio import datetime import json import logging import os import shlex import signal import sys import syslog import urllib.error import urllib.request from typing import Any, AsyncIterator log = logging.getLogger('journal2ntfy') Json = dict[str, Any] NTFY_URL = os.environ.get('NTFY_URL', 'https://ntfy.pyrocufflink.blue') NTFY_TOPIC = os.environ.get('NTFY_TOPIC', 'alerts') ntfy_lock = asyncio.Lock() async def follow_journal(*filters: str) -> AsyncIterator[Json]: cmd = [ 'journalctl', '--since=now', '--output=json', '--follow', ] cmd += filters if log.isEnabledFor(logging.DEBUG): log.debug( 'Running command: %s', ' '.join(shlex.quote(str(a)) for a in cmd) ) p = await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.DEVNULL, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) assert p.stdout try: async for line in p.stdout: try: data = json.loads(line) except ValueError as e: log.error('Failed to parse journal entry: %s', e) continue try: yield data except Exception: log.exception('Error handling message:') continue except asyncio.CancelledError: log.debug('Terminating child process') p.terminate() try: await asyncio.wait_for(p.wait(), 10) except asyncio.TimeoutError: log.warning('Child process did not terminate when asked, killing') p.kill() await p.wait() raise async def on_message(data: Json) -> None: message = data.get('MESSAGE') if not message: log.debug('Ignoring journal record with empty message') return try: timestamp = datetime.datetime.fromtimestamp( int(data['__REALTIME_TIMESTAMP']) / 1e6 ) except (KeyError, ValueError) as e: log.warning('Could not parse message timestamp: %s', e) timestamp = datetime.datetime.now() try: hostname = data['_HOSTNAME'] except KeyError: hostname = os.uname().nodename try: ident = data['SYSLOG_IDENTIFIER'] except KeyError: ident = data.get('_TRANSPORT', '') message = ( f'{timestamp.strftime("%b %d %H:%M:%S")} {hostname} {ident}: ' f'{message}' ) try: priority = int(data['PRIORITY']) except (KeyError, ValueError): priority = 0 if priority <= syslog.LOG_CRIT: tag = 'rotating_light' elif priority == syslog.LOG_ERR: tag = 'red_circle' elif priority == syslog.LOG_WARNING: tag = 'warning' elif priority == syslog.LOG_NOTICE: tag = 'pushpin' elif priority == syslog.LOG_INFO: tag = 'information_source' else: tag = 'white_circle' req = urllib.request.Request( f'{NTFY_URL}/{NTFY_TOPIC}', data=message.encode('utf-8'), headers={ 'Tags': tag, 'Content-Type': 'text/plain; charset=utf-8', }, ) try: # The lock ensures that messages are delivered in the order # they were read from the journal, while `to_thread` keeps the # network request from blocking the event loop. async with ntfy_lock: res = await asyncio.to_thread(urllib.request.urlopen, req) except urllib.error.URLError as e: log.error('Could not send ntfy notification: %s', e) return if res.status == 200: log.debug('Successfully sent ntfy notification') else: log.error('Sending ntfy notification failed: %s', res.reason) def shutdown(signum: int) -> None: log.info('Received %s, shutting down', signal.Signals(signum).name) cur_task = asyncio.current_task() for task in asyncio.all_tasks(): if task is not cur_task: task.cancel() async def main(): logging.basicConfig(level=logging.DEBUG) loop = asyncio.get_running_loop() loop.add_signal_handler(signal.SIGTERM, shutdown, signal.SIGTERM) loop.add_signal_handler(signal.SIGINT, shutdown, signal.SIGINT) if len(sys.argv) > 1: filters = sys.argv[1:] else: filters = shlex.split(os.environ.get('JOURNAL2NTFY_FILTERS', '')) try: async for data in follow_journal(*filters): await on_message(data) except asyncio.CancelledError: return if __name__ == '__main__': asyncio.run(main())