journal2ntfy: Script to send log messagess via ntfy

The `journal2ntfy.py` script follows the systemd journal by spawning
`journalctl` as a child process and reading from its standard output
stream.  Any command-line arguments passed to `journal2ntfy` are passed
to `journalctl`, which allows the caller to specify message filters.
For any matching journal message, `journal2ntfy` sends a message via
the *ntfy* web service.

For the BURP server, we're going to use `journal2ntfy` to generate
alerts about the RAID array.  When I reconnect the disk that was in the
fireproof safe, the kernel will log a message from the *md* subsystem
indicating that the resynchronization process has begun.  Then, when
the disks are again in sync, it will log another message, which will
let me know it is safe to archive the other disk.
step-ssh
Dustin 2023-05-17 14:51:21 -05:00
parent 2c002aa7c5
commit a7319c561d
8 changed files with 265 additions and 0 deletions

View File

@ -3,3 +3,5 @@ burp_notify: gyrfalcon@ebonfire.com
collectd_processes:
- name: burp
journal2ntfy_filters: _TRANSPORT=kernel --grep ^md

3
hosts
View File

@ -74,6 +74,9 @@ hass2.pyrocufflink.blue
[jenkins-slave]
[journal2ntfy:children]
burp-server
[k8s-controller]
k8s-ctrl0.pyrocufflink.blue

4
journal2ntfy.yml Normal file
View File

@ -0,0 +1,4 @@
- hosts: journal2ntfy
roles:
- role: journal2ntfy
tags: journal2ntfy

View File

@ -0,0 +1,169 @@
#!/usr/bin/env python3
import asyncio
import datetime
import json
import logging
import os
import shlex
import signal
import sys
import syslog
import urllib.error
import urllib.request
from typing import Any, AsyncIterator
log = logging.getLogger('journal2ntfy')
Json = dict[str, Any]
NTFY_URL = os.environ.get('NTFY_URL', 'https://ntfy.pyrocufflink.blue')
NTFY_TOPIC = os.environ.get('NTFY_TOPIC', 'alerts')
ntfy_lock = asyncio.Lock()
async def follow_journal(*filters: str) -> AsyncIterator[Json]:
cmd = [
'journalctl',
'--since=now',
'--output=json',
'--follow',
]
cmd += filters
if log.isEnabledFor(logging.DEBUG):
log.debug(
'Running command: %s', ' '.join(shlex.quote(str(a)) for a in cmd)
)
p = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
assert p.stdout
try:
async for line in p.stdout:
try:
data = json.loads(line)
except ValueError as e:
log.error('Failed to parse journal entry: %s', e)
continue
try:
yield data
except Exception:
log.exception('Error handling message:')
continue
except asyncio.CancelledError:
log.debug('Terminating child process')
p.terminate()
try:
await asyncio.wait_for(p.wait(), 10)
except asyncio.TimeoutError:
log.warning('Child process did not terminate when asked, killing')
p.kill()
await p.wait()
raise
async def on_message(data: Json) -> None:
message = data.get('MESSAGE')
if not message:
log.debug('Ignoring journal record with empty message')
return
try:
timestamp = datetime.datetime.fromtimestamp(
int(data['__REALTIME_TIMESTAMP']) / 1e6
)
except (KeyError, ValueError) as e:
log.warning('Could not parse message timestamp: %s', e)
timestamp = datetime.datetime.now()
try:
hostname = data['_HOSTNAME']
except KeyError:
hostname = os.uname().nodename
try:
ident = data['SYSLOG_IDENTIFIER']
except KeyError:
ident = data.get('_TRANSPORT', '')
message = (
f'{timestamp.strftime("%b %d %H:%M:%S")} {hostname} {ident}: '
f'{message}'
)
try:
priority = int(data['PRIORITY'])
except (KeyError, ValueError):
priority = 0
if priority <= syslog.LOG_CRIT:
tag = 'rotating_light'
elif priority == syslog.LOG_ERR:
tag = 'red_circle'
elif priority == syslog.LOG_WARNING:
tag = 'warning'
elif priority == syslog.LOG_NOTICE:
tag = 'pushpin'
elif priority == syslog.LOG_INFO:
tag = 'information_source'
else:
tag = 'white_circle'
req = urllib.request.Request(
f'{NTFY_URL}/{NTFY_TOPIC}',
data=message.encode('utf-8'),
headers={
'Tags': tag,
'Content-Type': 'text/plain; charset=utf-8',
},
)
try:
# The lock ensures that messages are delivered in the order
# they were read from the journal, while `to_thread` keeps the
# network request from blocking the event loop.
async with ntfy_lock:
res = await asyncio.to_thread(urllib.request.urlopen, req)
except urllib.error.URLError as e:
log.error('Could not send ntfy notification: %s', e)
return
if res.status == 200:
log.debug('Successfully sent ntfy notification')
else:
log.error('Sending ntfy notification failed: %s', res.reason)
def shutdown(signum: int) -> None:
log.info('Received %s, shutting down', signal.Signals(signum).name)
cur_task = asyncio.current_task()
for task in asyncio.all_tasks():
if task is not cur_task:
task.cancel()
async def main():
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGTERM, shutdown, signal.SIGTERM)
loop.add_signal_handler(signal.SIGINT, shutdown, signal.SIGINT)
if len(sys.argv) > 1:
filters = sys.argv[1:]
else:
filters = shlex.split(os.environ.get('JOURNAL2NTFY_FILTERS', ''))
try:
async for data in follow_journal(*filters):
await on_message(data)
except asyncio.CancelledError:
return
if __name__ == '__main__':
asyncio.run(main())

View File

@ -0,0 +1,29 @@
[Unit]
Description=Send kernel messages from md via ntfy
Wants=network-online.target
After=network-online.target
[Service]
Type=exec
EnvironmentFile=-/etc/sysconfig/journal2ntfy
ExecStart=/usr/local/bin/journal2ntfy
DevicePolicy=closed
MemoryDenyWriteExecute=yes
PrivateDevices=yes
PrivateTmp=yes
PrivateUsers=yes
ProtectClock=yes
ProtectHome=yes
ProtectKernelLogs=yes
ProtectKernelModules=yes
ProtectKernelTunables=yes
ProtectProc=invisible
ProtectSystem=strict
RestrictRealtime=yes
RestrictSUIDSGID=yes
SystemCallFilter=@system-service
SystemCallFilter=~@privileged @resources
UMask=0077
[Install]
WantedBy=multi-user.target

View File

@ -0,0 +1,4 @@
- name: restart journal2ntfy
service:
name: journal2ntfy
state: restarted

View File

@ -0,0 +1,51 @@
- name: ensure journal2ntfy script is installed
copy:
src: journal2ntfy.py
dest: /usr/local/bin/journal2ntfy
owner: root
group: root
mode: u=rwx,go=rx
notify:
- restart journal2ntfy
tags:
- install
- name: ensure journal2ntfy.service systemd unit is installed
copy:
src: journal2ntfy.service
dest: /etc/systemd/system/journal2ntfy.service
owner: root
group: root
mode: u=rw,go=r
notify:
- restart journal2ntfy
tags:
- systemd
- name: ensure journal2ntfy is configured
template:
src: journal2ntfy.env.j2
dest: /etc/sysconfig/journal2ntfy
owner: root
group: root
mode: u=rw,go=r
notify:
- restart journal2ntfy
tags:
- config
- name: ensure journal2ntfy service is enabled
service:
name: journal2ntfy
enabled: true
tags:
- service
- meta: flush_handlers
- name: ensure journal2ntfy is running
service:
name: journal2ntfy
state: started
tags:
- service

View File

@ -0,0 +1,3 @@
{% if journal2ntfy_filters|d(none) %}
JOURNAL2NTFY_FILTERS={{ journal2ntfy_filters }}
{% endif %}