Files
configpolicy/roles/journal2ntfy/files/journal2ntfy.py
Dustin C. Hatch a7319c561d journal2ntfy: Script to send log messagess via ntfy
The `journal2ntfy.py` script follows the systemd journal by spawning
`journalctl` as a child process and reading from its standard output
stream.  Any command-line arguments passed to `journal2ntfy` are passed
to `journalctl`, which allows the caller to specify message filters.
For any matching journal message, `journal2ntfy` sends a message via
the *ntfy* web service.

For the BURP server, we're going to use `journal2ntfy` to generate
alerts about the RAID array.  When I reconnect the disk that was in the
fireproof safe, the kernel will log a message from the *md* subsystem
indicating that the resynchronization process has begun.  Then, when
the disks are again in sync, it will log another message, which will
let me know it is safe to archive the other disk.
2023-05-17 14:51:21 -05:00

170 lines
4.5 KiB
Python

#!/usr/bin/env python3
import asyncio
import datetime
import json
import logging
import os
import shlex
import signal
import sys
import syslog
import urllib.error
import urllib.request
from typing import Any, AsyncIterator
log = logging.getLogger('journal2ntfy')
Json = dict[str, Any]
NTFY_URL = os.environ.get('NTFY_URL', 'https://ntfy.pyrocufflink.blue')
NTFY_TOPIC = os.environ.get('NTFY_TOPIC', 'alerts')
ntfy_lock = asyncio.Lock()
async def follow_journal(*filters: str) -> AsyncIterator[Json]:
cmd = [
'journalctl',
'--since=now',
'--output=json',
'--follow',
]
cmd += filters
if log.isEnabledFor(logging.DEBUG):
log.debug(
'Running command: %s', ' '.join(shlex.quote(str(a)) for a in cmd)
)
p = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
assert p.stdout
try:
async for line in p.stdout:
try:
data = json.loads(line)
except ValueError as e:
log.error('Failed to parse journal entry: %s', e)
continue
try:
yield data
except Exception:
log.exception('Error handling message:')
continue
except asyncio.CancelledError:
log.debug('Terminating child process')
p.terminate()
try:
await asyncio.wait_for(p.wait(), 10)
except asyncio.TimeoutError:
log.warning('Child process did not terminate when asked, killing')
p.kill()
await p.wait()
raise
async def on_message(data: Json) -> None:
message = data.get('MESSAGE')
if not message:
log.debug('Ignoring journal record with empty message')
return
try:
timestamp = datetime.datetime.fromtimestamp(
int(data['__REALTIME_TIMESTAMP']) / 1e6
)
except (KeyError, ValueError) as e:
log.warning('Could not parse message timestamp: %s', e)
timestamp = datetime.datetime.now()
try:
hostname = data['_HOSTNAME']
except KeyError:
hostname = os.uname().nodename
try:
ident = data['SYSLOG_IDENTIFIER']
except KeyError:
ident = data.get('_TRANSPORT', '')
message = (
f'{timestamp.strftime("%b %d %H:%M:%S")} {hostname} {ident}: '
f'{message}'
)
try:
priority = int(data['PRIORITY'])
except (KeyError, ValueError):
priority = 0
if priority <= syslog.LOG_CRIT:
tag = 'rotating_light'
elif priority == syslog.LOG_ERR:
tag = 'red_circle'
elif priority == syslog.LOG_WARNING:
tag = 'warning'
elif priority == syslog.LOG_NOTICE:
tag = 'pushpin'
elif priority == syslog.LOG_INFO:
tag = 'information_source'
else:
tag = 'white_circle'
req = urllib.request.Request(
f'{NTFY_URL}/{NTFY_TOPIC}',
data=message.encode('utf-8'),
headers={
'Tags': tag,
'Content-Type': 'text/plain; charset=utf-8',
},
)
try:
# The lock ensures that messages are delivered in the order
# they were read from the journal, while `to_thread` keeps the
# network request from blocking the event loop.
async with ntfy_lock:
res = await asyncio.to_thread(urllib.request.urlopen, req)
except urllib.error.URLError as e:
log.error('Could not send ntfy notification: %s', e)
return
if res.status == 200:
log.debug('Successfully sent ntfy notification')
else:
log.error('Sending ntfy notification failed: %s', res.reason)
def shutdown(signum: int) -> None:
log.info('Received %s, shutting down', signal.Signals(signum).name)
cur_task = asyncio.current_task()
for task in asyncio.all_tasks():
if task is not cur_task:
task.cancel()
async def main():
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGTERM, shutdown, signal.SIGTERM)
loop.add_signal_handler(signal.SIGINT, shutdown, signal.SIGINT)
if len(sys.argv) > 1:
filters = sys.argv[1:]
else:
filters = shlex.split(os.environ.get('JOURNAL2NTFY_FILTERS', ''))
try:
async for data in follow_journal(*filters):
await on_message(data)
except asyncio.CancelledError:
return
if __name__ == '__main__':
asyncio.run(main())