Merge branch 'journal2ntfy'

step-ssh
Dustin 2023-05-23 08:31:52 -05:00
commit 78296f7198
8 changed files with 265 additions and 0 deletions

View File

@ -3,3 +3,5 @@ burp_notify: gyrfalcon@ebonfire.com
collectd_processes:
- name: burp
journal2ntfy_filters: _TRANSPORT=kernel --grep ^md

3
hosts
View File

@ -74,6 +74,9 @@ hass2.pyrocufflink.blue
[jenkins-slave]
[journal2ntfy:children]
burp-server
[k8s-controller]
k8s-ctrl0.pyrocufflink.blue

4
journal2ntfy.yml Normal file
View File

@ -0,0 +1,4 @@
- hosts: journal2ntfy
roles:
- role: journal2ntfy
tags: journal2ntfy

View File

@ -0,0 +1,169 @@
#!/usr/bin/env python3
import asyncio
import datetime
import json
import logging
import os
import shlex
import signal
import sys
import syslog
import urllib.error
import urllib.request
from typing import Any, AsyncIterator
log = logging.getLogger('journal2ntfy')
Json = dict[str, Any]
NTFY_URL = os.environ.get('NTFY_URL', 'https://ntfy.pyrocufflink.blue')
NTFY_TOPIC = os.environ.get('NTFY_TOPIC', 'alerts')
ntfy_lock = asyncio.Lock()
async def follow_journal(*filters: str) -> AsyncIterator[Json]:
cmd = [
'journalctl',
'--since=now',
'--output=json',
'--follow',
]
cmd += filters
if log.isEnabledFor(logging.DEBUG):
log.debug(
'Running command: %s', ' '.join(shlex.quote(str(a)) for a in cmd)
)
p = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
assert p.stdout
try:
async for line in p.stdout:
try:
data = json.loads(line)
except ValueError as e:
log.error('Failed to parse journal entry: %s', e)
continue
try:
yield data
except Exception:
log.exception('Error handling message:')
continue
except asyncio.CancelledError:
log.debug('Terminating child process')
p.terminate()
try:
await asyncio.wait_for(p.wait(), 10)
except asyncio.TimeoutError:
log.warning('Child process did not terminate when asked, killing')
p.kill()
await p.wait()
raise
async def on_message(data: Json) -> None:
message = data.get('MESSAGE')
if not message:
log.debug('Ignoring journal record with empty message')
return
try:
timestamp = datetime.datetime.fromtimestamp(
int(data['__REALTIME_TIMESTAMP']) / 1e6
)
except (KeyError, ValueError) as e:
log.warning('Could not parse message timestamp: %s', e)
timestamp = datetime.datetime.now()
try:
hostname = data['_HOSTNAME']
except KeyError:
hostname = os.uname().nodename
try:
ident = data['SYSLOG_IDENTIFIER']
except KeyError:
ident = data.get('_TRANSPORT', '')
message = (
f'{timestamp.strftime("%b %d %H:%M:%S")} {hostname} {ident}: '
f'{message}'
)
try:
priority = int(data['PRIORITY'])
except (KeyError, ValueError):
priority = 0
if priority <= syslog.LOG_CRIT:
tag = 'rotating_light'
elif priority == syslog.LOG_ERR:
tag = 'red_circle'
elif priority == syslog.LOG_WARNING:
tag = 'warning'
elif priority == syslog.LOG_NOTICE:
tag = 'pushpin'
elif priority == syslog.LOG_INFO:
tag = 'information_source'
else:
tag = 'white_circle'
req = urllib.request.Request(
f'{NTFY_URL}/{NTFY_TOPIC}',
data=message.encode('utf-8'),
headers={
'Tags': tag,
'Content-Type': 'text/plain; charset=utf-8',
},
)
try:
# The lock ensures that messages are delivered in the order
# they were read from the journal, while `to_thread` keeps the
# network request from blocking the event loop.
async with ntfy_lock:
res = await asyncio.to_thread(urllib.request.urlopen, req)
except urllib.error.URLError as e:
log.error('Could not send ntfy notification: %s', e)
return
if res.status == 200:
log.debug('Successfully sent ntfy notification')
else:
log.error('Sending ntfy notification failed: %s', res.reason)
def shutdown(signum: int) -> None:
log.info('Received %s, shutting down', signal.Signals(signum).name)
cur_task = asyncio.current_task()
for task in asyncio.all_tasks():
if task is not cur_task:
task.cancel()
async def main():
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGTERM, shutdown, signal.SIGTERM)
loop.add_signal_handler(signal.SIGINT, shutdown, signal.SIGINT)
if len(sys.argv) > 1:
filters = sys.argv[1:]
else:
filters = shlex.split(os.environ.get('JOURNAL2NTFY_FILTERS', ''))
try:
async for data in follow_journal(*filters):
await on_message(data)
except asyncio.CancelledError:
return
if __name__ == '__main__':
asyncio.run(main())

View File

@ -0,0 +1,29 @@
[Unit]
Description=Send kernel messages from md via ntfy
Wants=network-online.target
After=network-online.target
[Service]
Type=exec
EnvironmentFile=-/etc/sysconfig/journal2ntfy
ExecStart=/usr/local/bin/journal2ntfy
DevicePolicy=closed
MemoryDenyWriteExecute=yes
PrivateDevices=yes
PrivateTmp=yes
PrivateUsers=yes
ProtectClock=yes
ProtectHome=yes
ProtectKernelLogs=yes
ProtectKernelModules=yes
ProtectKernelTunables=yes
ProtectProc=invisible
ProtectSystem=strict
RestrictRealtime=yes
RestrictSUIDSGID=yes
SystemCallFilter=@system-service
SystemCallFilter=~@privileged @resources
UMask=0077
[Install]
WantedBy=multi-user.target

View File

@ -0,0 +1,4 @@
- name: restart journal2ntfy
service:
name: journal2ntfy
state: restarted

View File

@ -0,0 +1,51 @@
- name: ensure journal2ntfy script is installed
copy:
src: journal2ntfy.py
dest: /usr/local/bin/journal2ntfy
owner: root
group: root
mode: u=rwx,go=rx
notify:
- restart journal2ntfy
tags:
- install
- name: ensure journal2ntfy.service systemd unit is installed
copy:
src: journal2ntfy.service
dest: /etc/systemd/system/journal2ntfy.service
owner: root
group: root
mode: u=rw,go=r
notify:
- restart journal2ntfy
tags:
- systemd
- name: ensure journal2ntfy is configured
template:
src: journal2ntfy.env.j2
dest: /etc/sysconfig/journal2ntfy
owner: root
group: root
mode: u=rw,go=r
notify:
- restart journal2ntfy
tags:
- config
- name: ensure journal2ntfy service is enabled
service:
name: journal2ntfy
enabled: true
tags:
- service
- meta: flush_handlers
- name: ensure journal2ntfy is running
service:
name: journal2ntfy
state: started
tags:
- service

View File

@ -0,0 +1,3 @@
{% if journal2ntfy_filters|d(none) %}
JOURNAL2NTFY_FILTERS={{ journal2ntfy_filters }}
{% endif %}