1
0
Fork 0
dch-webhooks/dch_webhooks.py

430 lines
12 KiB
Python

import base64
import datetime
import importlib.metadata
import logging
import os
import re
from types import TracebackType
from typing import Optional, Self, Type
import fastapi
import httpx
import pydantic
import pyrfc6266
__all__ = [
'app',
]
log = logging.getLogger(__name__)
DIST = importlib.metadata.metadata(__name__)
DESCRIPTION_CLEAN_PATTERN = re.compile('[^a-z ]')
EXCLUDE_DESCRIPTION_WORDS = {
'a',
'ach',
'an',
'card',
'debit',
'pay',
'payment',
'purchase',
'retail',
'the',
}
FIREFLY_URL = os.environ.get(
'FIREFLY_URL',
'http://firefly-iii',
)
MAX_DOCUMENT_SIZE = int(
os.environ.get(
'MAX_DOCUMENT_SIZE',
50 * 2**20,
)
)
NTFY_URL = os.environ.get('NTFY_URL', 'http://ntfy.ntfy:2586')
PAPERLESS_URL = os.environ.get(
'PAPERLESS_URL',
'http://paperless-ngx',
)
class FireflyIIITransactionSplit(pydantic.BaseModel):
type: str
date: datetime.datetime
amount: str
transaction_journal_id: int
description: str
class FireflyIIITransaction(pydantic.BaseModel):
transactions: list[FireflyIIITransactionSplit]
class FireflyIIIWebhook(pydantic.BaseModel):
content: FireflyIIITransaction
class PaperlessNgxDocument(pydantic.BaseModel):
id: int
title: str
class PaperlessNgxSearchResults(pydantic.BaseModel):
count: int
next: str | None
previous: str | None
results: list[PaperlessNgxDocument]
class HttpxClientMixin:
def __init__(self) -> None:
super().__init__()
self._client: Optional[httpx.AsyncClient] = None
async def __aenter__(self) -> Self:
await self.client.__aenter__()
return self
async def __aexit__(
self,
exc_type: Optional[Type[Exception]],
exc_value: Optional[Exception],
tb: Optional[TracebackType],
) -> None:
await self.client.__aexit__(exc_type, exc_value, tb)
@property
def client(self) -> httpx.AsyncClient:
if self._client is None:
self._client = self._get_client()
return self._client
def _get_client(self) -> httpx.AsyncClient:
return httpx.AsyncClient(
headers={
'User-Agent': f'{DIST["Name"]}/{DIST["Version"]}',
},
)
class Firefly(HttpxClientMixin):
def _get_client(self) -> httpx.AsyncClient:
client = super()._get_client()
if token_file := os.environ.get('FIREFLY_AUTH_TOKEN'):
try:
f = open(token_file, encoding='utf-8')
except OSError as e:
log.error('Could not load Firefly-III access token: %s', e)
else:
with f:
token = f.read().strip()
client.headers['Authorization'] = f'Bearer {token}'
return client
async def attach_receipt(
self,
xact_id: int,
doc: bytes,
filename: str,
title: str | None = None,
) -> None:
log.info('Attaching receipt %r to transaction %d', filename, xact_id)
url = f'{FIREFLY_URL}/api/v1/attachments'
data = {
'filename': filename,
'attachable_type': 'TransactionJournal',
'attachable_id': xact_id,
}
if title:
data['title'] = title
r = await self.client.post(url, data=data)
r.raise_for_status()
rbody = r.json()
attachment = rbody['data']
url = f'{FIREFLY_URL}/api/v1/attachments/{attachment["id"]}/upload'
r = await self.client.post(url, content=doc, headers={
'Content-Type': 'application/octet-stream',
})
r.raise_for_status()
class Paperless(HttpxClientMixin):
def _get_client(self) -> httpx.AsyncClient:
client = super()._get_client()
if token_file := os.environ.get('PAPERLESS_AUTH_TOKEN'):
try:
f = open(token_file, encoding='utf-8')
except OSError as e:
log.error(
'Could not load Paperless-ngx authentication token: %s', e
)
else:
with f:
token = f.read().strip()
client.headers['Authorization'] = f'Token {token}'
return client
async def find_receipts(
self, search: str, amount: float, date: datetime.date
) -> list[tuple[str, str, bytes]]:
date_begin = date - datetime.timedelta(days=2)
date_end = date + datetime.timedelta(days=2)
query = ' '.join(
(
search,
str(amount),
'type:Invoice/Receipt',
f'created:[{date_begin} TO {date_end}]',
)
)
log.info('Searching for receipt in Paperless: %s', query)
docs: list[tuple[str, str, bytes]] = []
url = f'{PAPERLESS_URL}/api/documents/'
r = await self.client.get(url, params={'query': query})
if r.status_code != 200:
if log.isEnabledFor(logging.ERROR):
try:
data = r.json()
except ValueError as e:
log.debug(
'Failed to parse HTTP error response as JSON: %s', e
)
detail = r.text
else:
try:
detail = data['detail']
except KeyError:
detail = ''
log.error(
'Error searching Paperless: HTTP %d %s: %s',
r.status_code,
r.reason_phrase,
detail,
)
return docs
try:
data = r.json()
except ValueError as e:
log.error('Failed to parse HTTP response as JSON: %s', e)
return docs
try:
results = PaperlessNgxSearchResults.parse_obj(data)
except pydantic.ValidationError as e:
log.error('Could not parse search response: %s', e)
return docs
log.info('Search returned %d documents', results.count)
if results.next:
log.warning(
'Search returned multiple pages of results; '
'only the results on the first page are used'
)
for doc in results.results:
url = f'{PAPERLESS_URL}/api/documents/{doc.id}/download/'
r = await self.client.get(url, params={'original': True})
if r.status_code != 200:
log.error(
'Failed to download document: HTTP %d %s',
r.status_code,
r.reason_phrase,
)
continue
try:
size = int(r.headers['Content-Length'])
except (KeyError, ValueError) as e:
log.error(
'Skipping document ID %d: Cannot determine file size: %s',
doc.id,
e,
)
continue
if size > MAX_DOCUMENT_SIZE:
log.warning(
'Skipping document ID %d: Size (%d bytes) is greater than '
'the configured maximum document size (%d bytes)',
size,
MAX_DOCUMENT_SIZE,
)
continue
docs.append(
(response_filename(r), doc.title, await r.aread())
)
return docs
async def handle_firefly_transaction(xact: FireflyIIITransaction) -> None:
async with Firefly() as ff, Paperless() as pl:
for split in xact.transactions:
search = clean_description(split.description)
try:
amount = float(split.amount)
except ValueError as e:
log.error('Invalid transaction amount: %s', e)
continue
for filename, title, doc in await pl.find_receipts(
search,
amount,
split.date.date(),
):
try:
await ff.attach_receipt(
split.transaction_journal_id, doc, filename, title
)
except Exception as e:
log.error(
'Failed to attach receipt to transaction ID %d: %s',
split.transaction_journal_id,
e,
)
def clean_description(text: str) -> str:
matches = DESCRIPTION_CLEAN_PATTERN.sub('', text.lower())
if not matches:
log.warning(
'Failed to clean transaction description: '
'text did not match regular expression pattern'
)
return text
match_tokens = set(matches.split())
terms = match_tokens - EXCLUDE_DESCRIPTION_WORDS
return ' '.join(terms)
def response_filename(response: httpx.Response) -> str:
if cd := response.headers.get('Content-Disposition'):
__, params = pyrfc6266.parse(cd)
maybename = ''
for p in params:
if p.name == 'filename*':
return p.value
if p.name == 'filename':
maybename = p.value
if maybename:
if maybename.startswith("b'") and maybename.endswith("'"):
maybename = maybename[2:-1]
return maybename
return response.url.path.rstrip('/').rsplit('/', 1)[-1]
async def ntfy(
message: Optional[str],
topic: str,
title: Optional[str] = None,
tags: Optional[str] = None,
attach: Optional[bytes] = None,
filename: Optional[str] = None,
cache: Optional[bool] = None,
) -> None:
assert message or attach
headers = {}
if title:
headers['Title'] = title
if tags:
headers['Tags'] = tags
if cache is not None:
headers['Cache'] = 'yes' if cache else 'no'
url = f'{NTFY_URL}/{topic}'
client = httpx.AsyncClient()
if attach:
if filename:
headers['Filename'] = filename
if message:
try:
message.encode("ascii")
except UnicodeEncodeError:
message = rfc2047_base64encode(message)
else:
message = message.replace('\n', '\\n')
headers['Message'] = message
r = await client.put(
url,
headers=headers,
content=attach,
)
else:
r = await client.post(
url,
headers=headers,
content=message,
)
r.raise_for_status()
def rfc2047_base64encode(
message: str,
) -> str:
encoded = base64.b64encode(message.encode("utf-8")).decode("ascii")
return f"=?UTF-8?B?{encoded}?="
app = fastapi.FastAPI(
name=DIST['Name'],
version=DIST['Version'],
docs_url='/api-doc/',
)
@app.on_event('startup')
def on_start() -> None:
log.setLevel(logging.DEBUG)
h = logging.StreamHandler()
h.setLevel(logging.DEBUG)
log.addHandler(h)
@app.get('/')
def status() -> str:
return 'UP'
@app.post('/hooks/firefly-iii/create')
async def firefly_iii_create(hook: FireflyIIIWebhook) -> None:
await handle_firefly_transaction(hook.content)
@app.post('/hooks/jenkins')
async def jenkins_notify(request: fastapi.Request) -> None:
body = await request.json()
data = body.get('data', {})
if body['type'] == 'run.started':
title = 'Build started'
tag = 'building_construction'
build_cause = None
for action in data.get('actions', []):
for cause in action.get('causes', []):
if build_cause := cause.get('shortDescription'):
break
else:
continue
break
message = f'Build started: {data["fullDisplayName"]}'
if build_cause:
message = f'{message} ({build_cause})'
elif body['type'] == 'run.finalized':
message = f'{data["fullDisplayName"]} {data["result"]}'
title = 'Build finished'
match data['result']:
case 'FAILURE':
tag = 'red_circle'
case 'SUCCESS':
tag = 'green_circle'
case 'UNSTABLE':
tag = 'yellow_circle'
case 'NOT_BUILT':
tag = 'large_blue_circle'
case 'ABORTED':
tag = 'black_circle'
case _:
tag = 'white_circle'
else:
return
try:
await ntfy(message, 'jenkins', title, tag, cache=False)
except httpx.HTTPError:
log.exception('Failed to send notification:')