import asyncio import base64 import datetime import functools import importlib.metadata import json import logging import os import re import ssl from pathlib import Path from types import TracebackType from typing import Any, Optional, Self, Type import fastapi import httpx import pika import pika.channel import pika.credentials import pydantic import pyrfc6266 import ruamel.yaml from fastapi import Form from pika.adapters.asyncio_connection import AsyncioConnection __all__ = [ 'app', ] log = logging.getLogger(__name__) context: 'Context' DIST = importlib.metadata.metadata(__name__) DESCRIPTION_CLEAN_PATTERN = re.compile('[^a-z ]') EXCLUDE_DESCRIPTION_WORDS = { 'a', 'ach', 'an', 'card', 'debit', 'pay', 'payment', 'purchase', 'retail', 'the', } FIREFLY_URL = os.environ.get( 'FIREFLY_URL', 'http://firefly-iii', ) MAX_DOCUMENT_SIZE = int( os.environ.get( 'MAX_DOCUMENT_SIZE', 50 * 2**20, ) ) NTFY_URL = os.environ.get('NTFY_URL', 'http://ntfy.ntfy:2586') PAPERLESS_URL = os.environ.get( 'PAPERLESS_URL', 'http://paperless-ngx', ) ANSIBLE_JOB_YAML = Path(os.environ.get('ANSIBLE_JOB_YAML', 'ansible-job.yaml')) ANSIBLE_JOB_NAMESPACE = os.environ.get('ANSIBLE_JOB_NAMESPACE', 'ansible') HOST_INFO_QUEUE = os.environ.get('HOST_INFO_QUEUE', 'host-provisioner') HOST_INFO_TTL = 600000 KUBERNETES_TOKEN_PATH = Path( os.environ.get( 'KUBERNETES_TOKEN_PATH', '/run/secrets/kubernetes.io/serviceaccount/token', ) ) KUBERNETES_CA_CERT = Path( os.environ.get( 'KUBERNETES_CA_CERT', '/run/secrets/kubernetes.io/serviceaccount/ca.crt', ) ) class FireflyIIITransactionSplit(pydantic.BaseModel): type: str date: datetime.datetime amount: str transaction_journal_id: int description: str class FireflyIIITransaction(pydantic.BaseModel): transactions: list[FireflyIIITransactionSplit] class FireflyIIIWebhook(pydantic.BaseModel): content: FireflyIIITransaction class PaperlessNgxDocument(pydantic.BaseModel): id: int title: str class PaperlessNgxSearchResults(pydantic.BaseModel): count: int next: str | None previous: str | None results: list[PaperlessNgxDocument] class HttpxClientMixin: def __init__(self) -> None: super().__init__() self._client: Optional[httpx.AsyncClient] = None async def __aenter__(self) -> Self: await self.client.__aenter__() return self async def __aexit__( self, exc_type: Optional[Type[Exception]], exc_value: Optional[Exception], tb: Optional[TracebackType], ) -> None: await self.client.__aexit__(exc_type, exc_value, tb) @property def client(self) -> httpx.AsyncClient: if self._client is None: self._client = self._get_client() return self._client def _get_client(self, **kwargs) -> httpx.AsyncClient: headers = kwargs.setdefault('headers', {}) headers['User-Agent'] = f'{DIST["Name"]}/{DIST["Version"]}' return httpx.AsyncClient(**kwargs) class Firefly(HttpxClientMixin): def _get_client(self, **kwargs) -> httpx.AsyncClient: client = super()._get_client(**kwargs) if token_file := os.environ.get('FIREFLY_AUTH_TOKEN'): try: f = open(token_file, encoding='utf-8') except OSError as e: log.error('Could not load Firefly-III access token: %s', e) else: with f: token = f.read().strip() client.headers['Authorization'] = f'Bearer {token}' return client async def attach_receipt( self, xact_id: int, doc: bytes, filename: str, title: str | None = None, ) -> None: log.info('Attaching receipt %r to transaction %d', filename, xact_id) url = f'{FIREFLY_URL}/api/v1/attachments' data = { 'filename': filename, 'attachable_type': 'TransactionJournal', 'attachable_id': xact_id, } if title: data['title'] = title r = await self.client.post(url, data=data) r.raise_for_status() rbody = r.json() attachment = rbody['data'] url = f'{FIREFLY_URL}/api/v1/attachments/{attachment["id"]}/upload' r = await self.client.post( url, content=doc, headers={ 'Content-Type': 'application/octet-stream', }, ) r.raise_for_status() class Paperless(HttpxClientMixin): def _get_client(self, **kwargs) -> httpx.AsyncClient: client = super()._get_client(**kwargs) if token_file := os.environ.get('PAPERLESS_AUTH_TOKEN'): try: f = open(token_file, encoding='utf-8') except OSError as e: log.error( 'Could not load Paperless-ngx authentication token: %s', e ) else: with f: token = f.read().strip() client.headers['Authorization'] = f'Token {token}' return client async def find_receipts( self, search: str, amount: float, date: datetime.date ) -> list[tuple[str, str, bytes]]: date_begin = date - datetime.timedelta(days=2) date_end = date + datetime.timedelta(days=2) query = ' '.join( ( search, str(amount), 'type:Invoice/Receipt', f'created:[{date_begin} TO {date_end}]', ) ) log.info('Searching for receipt in Paperless: %s', query) docs: list[tuple[str, str, bytes]] = [] url = f'{PAPERLESS_URL}/api/documents/' r = await self.client.get(url, params={'query': query}) if r.status_code != 200: if log.isEnabledFor(logging.ERROR): try: data = r.json() except ValueError as e: log.debug( 'Failed to parse HTTP error response as JSON: %s', e ) detail = r.text else: try: detail = data['detail'] except KeyError: detail = '' log.error( 'Error searching Paperless: HTTP %d %s: %s', r.status_code, r.reason_phrase, detail, ) return docs try: data = r.json() except ValueError as e: log.error('Failed to parse HTTP response as JSON: %s', e) return docs try: results = PaperlessNgxSearchResults.parse_obj(data) except pydantic.ValidationError as e: log.error('Could not parse search response: %s', e) return docs log.info('Search returned %d documents', results.count) if results.next: log.warning( 'Search returned multiple pages of results; ' 'only the results on the first page are used' ) for doc in results.results: url = f'{PAPERLESS_URL}/api/documents/{doc.id}/download/' r = await self.client.get(url, params={'original': True}) if r.status_code != 200: log.error( 'Failed to download document: HTTP %d %s', r.status_code, r.reason_phrase, ) continue try: size = int(r.headers['Content-Length']) except (KeyError, ValueError) as e: log.error( 'Skipping document ID %d: Cannot determine file size: %s', doc.id, e, ) continue if size > MAX_DOCUMENT_SIZE: log.warning( 'Skipping document ID %d: Size (%d bytes) is greater than ' 'the configured maximum document size (%d bytes)', size, MAX_DOCUMENT_SIZE, ) continue docs.append((response_filename(r), doc.title, await r.aread())) return docs class AMQPError(Exception): ... class AMQPContext: def __init__( self, conn_params: Optional[ pika.ConnectionParameters | pika.URLParameters ] = None, ) -> None: self._conn_params = conn_params self._connection: Optional[AsyncioConnection] = None self._channel: Optional[pika.channel.Channel] = None @classmethod def from_env(cls) -> Self: if 'AMQP_URL' in os.environ: params = pika.URLParameters(os.environ['AMQP_URL']) else: kwargs = {} if host := os.environ.get('AMQP_HOST'): kwargs['host'] = host if port := os.environ.get('AMQP_PORT'): kwargs['port'] = int(port) if vhost := os.environ.get('AMQP_VIRTUAL_HOST'): kwargs['virtual_host'] = vhost if username := os.environ.get('AMQP_USERNAME'): password = os.environ.get('AMQP_PASSWORD', '') kwargs['credentials'] = pika.PlainCredentials( username, password ) elif os.environ.get('AMQP_EXTERNAL_CREDENTIALS'): kwargs['credentials'] = pika.credentials.ExternalCredentials() if ( 'AMQP_CA_CERT' in os.environ or 'AMQP_CLIENT_CERT' in os.environ or 'AMQP_CLIENT_KEY' in os.environ ): sslctx = ssl.create_default_context( cafile=os.environ.get('AMQP_CA_CERT') ) if certfile := os.environ.get('AMQP_CLIENT_CERT'): keyfile = os.environ.get('AMQP_CLIENT_KEY') keypassword = os.environ.get('AMQP_CLIENT_KEY_PASSWORD') sslctx.load_cert_chain(certfile, keyfile, keypassword) kwargs['ssl_options'] = pika.SSLOptions( sslctx, kwargs.get('host') ) params = pika.ConnectionParameters(**kwargs) return cls(params) def close(self): if self._channel: self._channel.close() self._channel = None if self._connection: self._connection.close() self._connection = None async def connect(self): loop = asyncio.get_running_loop() if self._connection is None: fut = loop.create_future() self._connection = AsyncioConnection( self._conn_params, on_open_callback=functools.partial(self._on_open, fut), on_open_error_callback=functools.partial( self._on_open_error, fut ), on_close_callback=self._on_close, ) await fut if self._channel is None: fut = loop.create_future() self._connection.channel( on_open_callback=functools.partial(self._on_channel_open, fut), ) self._channel = await fut def publish( self, exchange: str, routing_key: str, body: bytes, properties: Optional[pika.BasicProperties] = None, ): assert self._channel self._channel.basic_publish( exchange, routing_key, body, properties, ) async def queue_declare( self, name: str, passive: bool = False, durable: bool = False, exclusive: bool = False, auto_delete: bool = False, arguments: Optional[dict[str, str]] = None, ) -> None: loop = asyncio.get_event_loop() fut = loop.create_future() assert self._channel self._channel.queue_declare( name, passive, durable, exclusive, auto_delete, arguments, callback=lambda _: fut.set_result(None), ) await fut def _on_open(self, fut: asyncio.Future[None], _conn): log.info('AMQP connection open') fut.set_result(None) def _on_open_error(self, fut: asyncio.Future[None], _conn, error): log.error('Failed to open AMQP connection: %s', error) self._connection = None fut.set_exception(AMQPError(error)) def _on_close(self, _conn, reason): level = logging.INFO if reason.reply_code == 200 else logging.WARNING if log.isEnabledFor(level): log.log( level, 'AMQP connection closed: %s (code %d)', reason.reply_text, reason.reply_code, ) self._connection = None def _on_channel_open( self, fut: asyncio.Future[pika.channel.Channel], chan: pika.channel.Channel, ) -> None: chan.add_on_close_callback(self._on_channel_close) fut.set_result(chan) def _on_channel_close(self, _chan, reason): level = logging.INFO if reason.reply_code == 0 else logging.WARNING if log.isEnabledFor(level): log.log( level, 'AMQP channel closed: %s (code %d)', reason.reply_text, reason.reply_code, ) self._channel = None class Kubernetes(HttpxClientMixin): @functools.cached_property def base_url(self) -> str: https = True port = os.environ.get('KUBERNETES_SERVICE_PORT_HTTPS') if not port: https = False port = os.environ.get('KUBERNETES_SERVICE_PORT', 8001) host = os.environ.get('KUBERNETES_SERVICE_HOST', '127.0.0.1') url = f'{"https" if https else "http"}://{host}:{port}' log.info('Using Kubernetes URL: %s', url) return url @functools.cached_property def token(self) -> str: return KUBERNETES_TOKEN_PATH.read_text().strip() def _get_client(self, **kwargs) -> httpx.AsyncClient: if KUBERNETES_CA_CERT.exists(): kwargs.setdefault('verify', str(KUBERNETES_CA_CERT)) client = super()._get_client(**kwargs) try: client.headers['Authorization'] = f'Bearer {self.token}' except (OSError, UnicodeDecodeError) as e: log.warning('Failed to read k8s auth token: %s', e) return client class Context: def __init__(self): self.amqp = AMQPContext.from_env() async def handle_firefly_transaction(xact: FireflyIIITransaction) -> None: try: xact0 = xact.transactions[0] except IndexError: log.warning('Received empty transaction Firefly?') else: message = ( f'${xact0.amount} for {xact0.description} on {xact0.date.date()}' ) title = f'Firefly III: New {xact.transactions[0].type.title()}' tags = 'money_with_wings' try: await ntfy(message, 'firefly', title, tags) except Exception: log.exception('Failed to send notification') async with Firefly() as ff, Paperless() as pl: for split in xact.transactions: search = clean_description(split.description) try: amount = float(split.amount) except ValueError as e: log.error('Invalid transaction amount: %s', e) continue for filename, title, doc in await pl.find_receipts( search, amount, split.date.date(), ): try: await ff.attach_receipt( split.transaction_journal_id, doc, filename, title ) except Exception as e: log.error( 'Failed to attach receipt to transaction ID %d: %s', split.transaction_journal_id, e, ) def clean_description(text: str) -> str: matches = DESCRIPTION_CLEAN_PATTERN.sub('', text.lower()) if not matches: log.warning( 'Failed to clean transaction description: ' 'text did not match regular expression pattern' ) return text match_tokens = set(matches.split()) terms = match_tokens - EXCLUDE_DESCRIPTION_WORDS return ' '.join(terms) def response_filename(response: httpx.Response) -> str: if cd := response.headers.get('Content-Disposition'): __, params = pyrfc6266.parse(cd) maybename = '' for p in params: if p.name == 'filename*': return p.value if p.name == 'filename': maybename = p.value if maybename: if maybename.startswith("b'") and maybename.endswith("'"): maybename = maybename[2:-1] return maybename return response.url.path.rstrip('/').rsplit('/', 1)[-1] async def ntfy( message: Optional[str], topic: str, title: Optional[str] = None, tags: Optional[str] = None, attach: Optional[bytes] = None, filename: Optional[str] = None, cache: Optional[bool] = None, ) -> None: assert message or attach headers = {} if title: headers['Title'] = title if tags: headers['Tags'] = tags if cache is not None: headers['Cache'] = 'yes' if cache else 'no' url = f'{NTFY_URL}/{topic}' client = httpx.AsyncClient() if attach: if filename: headers['Filename'] = filename if message: try: message.encode("ascii") except UnicodeEncodeError: message = rfc2047_base64encode(message) else: message = message.replace('\n', '\\n') headers['Message'] = message r = await client.put( url, headers=headers, content=attach, ) else: r = await client.post( url, headers=headers, content=message, ) r.raise_for_status() def rfc2047_base64encode( message: str, ) -> str: encoded = base64.b64encode(message.encode("utf-8")).decode("ascii") return f"=?UTF-8?B?{encoded}?=" def load_job_yaml(path: Optional[Path] = None) -> dict[str, Any]: if path is None: path = ANSIBLE_JOB_YAML yaml = ruamel.yaml.YAML() with path.open(encoding='utf-8') as f: return yaml.load(f) async def start_ansible_job(): async with Kubernetes() as kube: url = ( f'{kube.base_url}/apis/batch/v1/namespaces/' f'{ANSIBLE_JOB_NAMESPACE}/jobs' ) job = load_job_yaml() r = await kube.client.post(url, json=job) if r.status_code > 299: raise Exception(r.read()) async def publish_host_info( hostname: str, sshkeys: str, branch: Optional[str] ): data = { 'hostname': hostname, 'sshkeys': sshkeys, } if branch: data['branch'] = branch await context.amqp.connect() await context.amqp.queue_declare(HOST_INFO_QUEUE, durable=True) properties = pika.BasicProperties() properties.expiration = str(HOST_INFO_TTL) context.amqp.publish( exchange='', routing_key=HOST_INFO_QUEUE, body=json.dumps(data).encode('utf-8'), properties=properties, ) async def handle_host_online( hostname: str, sshkeys: str, branch: Optional[str] ): try: await publish_host_info(hostname, sshkeys, branch) except asyncio.CancelledError: raise except Exception: log.exception('Failed to publish host info:') return try: await start_ansible_job() except asyncio.CancelledError: raise except Exception: log.exception('Failed to start Ansible job:') app = fastapi.FastAPI( name=DIST['Name'], version=DIST['Version'], docs_url='/api-doc/', ) @app.on_event('startup') def on_start() -> None: log.setLevel(logging.DEBUG) h = logging.StreamHandler() h.setLevel(logging.DEBUG) log.addHandler(h) global context context = Context() @app.on_event('shutdown') def on_shutdown() -> None: context.amqp.close() @app.get('/') def status() -> str: return 'UP' @app.post('/hooks/firefly-iii/create') async def firefly_iii_create(hook: FireflyIIIWebhook) -> None: await handle_firefly_transaction(hook.content) @app.post('/hooks/jenkins') async def jenkins_notify(request: fastapi.Request) -> None: body = await request.json() data = body.get('data', {}) if body['type'] == 'run.started': title = 'Build started' tag = 'building_construction' build_cause = None for action in data.get('actions', []): for cause in action.get('causes', []): if build_cause := cause.get('shortDescription'): break else: continue break message = f'Build started: {data["fullDisplayName"]}' if build_cause: message = f'{message} ({build_cause})' elif body['type'] == 'run.finalized': message = f'{data["fullDisplayName"]} {data["result"]}' title = 'Build finished' match data['result']: case 'FAILURE': tag = 'red_circle' case 'SUCCESS': tag = 'green_circle' case 'UNSTABLE': tag = 'yellow_circle' case 'NOT_BUILT': tag = 'large_blue_circle' case 'ABORTED': tag = 'black_circle' case _: tag = 'white_circle' else: return try: await ntfy(message, 'jenkins', title, tag, cache=False) except httpx.HTTPError: log.exception('Failed to send notification:') @app.post( '/host/online', status_code=fastapi.status.HTTP_202_ACCEPTED, response_class=fastapi.responses.PlainTextResponse, ) async def host_online( hostname: str = Form(), sshkeys: str = Form(), branch: Optional[str] = Form(None), ) -> None: asyncio.create_task(handle_host_online(hostname, sshkeys, branch))