1
0
Fork 0
xactfetch/xactfetch.py

771 lines
26 KiB
Python

import asyncio
import base64
import copy
import datetime
import json
import logging
import os
import random
import sys
import urllib.parse
from pathlib import Path
from types import TracebackType
from typing import Any, Optional, Type
import httpx
from playwright.async_api import Playwright, Page
from playwright.async_api import async_playwright
log = logging.getLogger('xactfetch')
NTFY_URL = os.environ.get('NTFY_URL', 'https://ntfy.pyrocufflink.blue')
NTFY_TOPIC = os.environ.get('NTFY_TOPIC', 'dustin')
FIREFLY_III_URL = os.environ.get(
'FIREFLY_III_URL', 'https://firefly.pyrocufflink.blue'
)
FIREFLY_III_IMPORTER_URL = os.environ.get(
'FIREFLY_IMPORT_URL', 'https://firefly-importer.pyrocufflink.blue'
)
SECRET_SOCKET_PATH = os.environ.get('SECRET_SOCKET_PATH')
XDG_RUNTIME_DIR = os.environ.get('XDG_RUNTIME_DIR')
ACCOUNTS = {
'commerce': {
'8357': 1,
'7730': 67,
},
'chase': 15,
}
class FireflyImporter:
def __init__(
self,
url: str,
secret: str,
auth: Optional[tuple[str, str]],
) -> None:
self.url = url
self.secret = secret
self.auth = auth
self.client = httpx.AsyncClient()
async def import_csv(
self,
csv: Path,
config: dict[str, Any],
) -> None:
log.debug('Importing transactions from %s to Firefly III', csv)
url = f'{self.url.rstrip("/")}/autoupload'
with csv.open('rb') as f:
async with httpx.AsyncClient(auth=self.auth) as client:
r = await client.post(
url,
params={
'secret': self.secret,
},
headers={
'Accept': 'application/json',
},
files={
'importable': ('import.csv', f),
'json': ('import.json', json.dumps(config)),
},
timeout=300,
)
r.raise_for_status()
class SecretsClient:
def __init__(self) -> None:
self.sock: tuple[asyncio.StreamReader, asyncio.StreamWriter]
async def __aenter__(self) -> 'SecretsClient':
if not hasattr(self, 'sock'):
await self.connect()
return self
async def __aexit__(
self,
exc_type: Optional[Type[Exception]],
exc_value: Optional[Exception],
tb: Optional[TracebackType],
) -> bool:
self.sock[1].close()
return False
async def connect(self) -> None:
if SECRET_SOCKET_PATH:
path = Path(SECRET_SOCKET_PATH)
elif XDG_RUNTIME_DIR:
path = Path(XDG_RUNTIME_DIR) / 'secretsocket/.ss'
else:
path = '.secretsocket'
self.sock = await asyncio.open_unix_connection(str(path))
async def get_secret(self, key: str) -> bytes:
self.sock[1].write(f'{key}\n'.encode('utf-8'))
buf = await self.sock[0].read(64 * 2**10)
return buf.rstrip(b'\n')
async def ntfy(
message: Optional[str] = None,
topic: str = NTFY_TOPIC,
title: Optional[str] = None,
tags: Optional[str] = None,
attach: Optional[bytes] = None,
filename: Optional[str] = None,
) -> None:
assert message or attach
headers = {
'Title': title or 'xactfetch',
}
if tags:
headers['Tags'] = tags
url = f'{NTFY_URL}/{topic}'
client = httpx.AsyncClient()
if attach:
if filename:
headers['Filename'] = filename
if message:
try:
message.encode("ascii")
except UnicodeEncodeError:
message = rfc2047_base64encode(message)
else:
message = message.replace('\n', '\\n')
headers['Message'] = message
async with client:
r = await client.put(
url,
headers=headers,
content=attach,
)
else:
async with client:
r = await client.post(
url,
headers=headers,
content=message,
)
r.raise_for_status()
def rfc2047_base64encode(
message: str,
) -> str:
encoded = base64.b64encode(message.encode("utf-8")).decode("ascii")
return f"=?UTF-8?B?{encoded}?="
async def get_last_transaction_date(key: int, token: str) -> datetime.date:
url = f'{FIREFLY_III_URL}/api/v1/accounts/{key}/transactions'
async with httpx.AsyncClient() as client:
r = await client.get(
url,
headers={
'Authorization': f'Bearer {token}',
'Accept': 'application/vnd.api+json',
},
timeout=10,
)
r.raise_for_status()
last_date = datetime.datetime.min
for xact in r.json()['data']:
for split in xact['attributes']['transactions']:
try:
datestr = split['date'].split('T')[0]
date = datetime.datetime.fromisoformat(datestr)
except (KeyError, ValueError) as e:
log.warning(
'Could not parse date from transaction %s: %s',
xact['id'],
e,
)
continue
if date > last_date:
last_date = date
return last_date.date()
async def download_chase(
page: Page,
secrets: SecretsClient,
end_date: datetime.date,
token: str,
importer: FireflyImporter,
) -> bool:
async with Chase(page, secrets) as c, ntfyerror('Chase', page) as r:
key = ACCOUNTS['chase']
try:
start_date = await get_last_transaction_date(key, token)
except (OSError, ValueError) as e:
log.error(
'Skipping Chase account: could not get last transaction: %s',
e,
)
return False
if start_date > end_date:
log.info(
'Skipping Chase account: last transaction was %s',
start_date,
)
return True
await c.login()
csv = await c.download_transactions(start_date, end_date)
log.info('Importing transactions from Chase into Firefly III')
await c.firefly_import(csv, key, importer)
return r.success
async def download_commerce(
page: Page,
secrets: SecretsClient,
end_date: datetime.date,
token: str,
importer: FireflyImporter,
) -> bool:
log.info('Downloading transaction lists from Commerce Bank')
csvs = []
async with CommerceBank(page, secrets) as c, ntfyerror(
'Commerce Bank', page
) as r:
for name, key in ACCOUNTS['commerce'].items():
try:
start_date = await get_last_transaction_date(key, token)
except (OSError, ValueError) as e:
log.error(
'Skipping account %s: could not get last transaction: %s',
name,
e,
)
continue
if start_date > end_date:
log.info(
'Skipping account %s: last transaction was %s',
name,
start_date,
)
continue
log.info(
'Getting transactions since %s for account xxx%s',
start_date,
name,
)
await c.login()
await c.open_account(name)
csvs.append(
(key, await c.download_transactions(start_date, end_date))
)
log.info('Importing transactions from Commerce Bank into Firefly III')
for key, csv in csvs:
await c.firefly_import(csv, key, importer)
return r.success
class ntfyerror:
def __init__(self, bank: str, page: Page) -> None:
self.bank = bank
self.page = page
self.success = True
async def __aenter__(self) -> 'ntfyerror':
return self
async def __aexit__(
self,
exc_type: Optional[Type[Exception]],
exc_value: Optional[Exception],
tb: Optional[TracebackType],
) -> bool:
if exc_type and exc_value and tb:
self.success = False
log.exception(
'Swallowed exception:', exc_info=(exc_type, exc_value, tb)
)
if os.environ.get('DEBUG_NTFY', '1') == '0':
return True
try:
if ss := await self.page.screenshot():
await asyncio.to_thread(save_screenshot, ss)
except Exception:
log.exception('Failed to get screenshot:')
ss = None
await ntfy(
message=str(exc_value),
title=f'xactfetch failed for {self.bank}',
tags='warning',
attach=ss,
filename='screenshot.png',
)
return True
def save_screenshot(screenshot: bytes):
now = datetime.datetime.now()
filename = now.strftime('screenshot_%Y%m%d%H%M%S.png')
log.debug('Saving browser screenshot to %s', filename)
try:
with open(filename, 'wb') as f:
f.write(screenshot)
except Exception as e:
log.error('Failed to save browser screenshot: %s', e)
else:
log.info('Browser screenshot saved as %s', filename)
class CommerceBank:
URL = 'https://banking.commercebank.com/CBI/Auth/Login'
IMPORT_CONFIG = {
'version': 3,
'source': 'fidi-1.2.2',
'created_at': '2023-04-27T08:05:10-05:00',
'date': 'n/j/Y',
'delimiter': 'comma',
'headers': True,
'rules': True,
'skip_form': False,
'add_import_tag': True,
'roles': [
'date_transaction',
'internal_reference',
'description',
'amount_debit',
'amount_credit',
],
'do_mapping': [
False,
False,
False,
False,
False,
],
'mapping': [],
'duplicate_detection_method': 'classic',
'ignore_duplicate_lines': False,
'unique_column_index': 0,
'unique_column_type': 'internal_reference',
'flow': 'file',
'identifier': '0',
'connection': '0',
'ignore_spectre_categories': False,
'map_all_data': False,
'accounts': [],
'date_range': '',
'date_range_number': 30,
'date_range_unit': 'd',
'date_not_before': '',
'date_not_after': '',
'nordigen_country': '',
'nordigen_bank': '',
'nordigen_requisitions': [],
'nordigen_max_days': '90',
'conversion': False,
'ignore_duplicate_transactions': True,
}
def __init__(self, page: Page, secrets: SecretsClient) -> None:
self.page = page
self.secrets = secrets
self._logged_in = False
async def __aenter__(self) -> 'CommerceBank':
return self
async def __aexit__(
self,
exc_type: Optional[Type[Exception]],
exc_value: Optional[Exception],
tb: Optional[TracebackType],
) -> None:
await self.logout()
async def login(self) -> None:
if self._logged_in:
return
log.debug('Navigating to %s', self.URL)
await self.page.goto(self.URL)
username = await self.get_secret('bank.commerce.username')
password = await self.get_secret('bank.commerce.password')
log.debug('Filling username/password login form')
await self.page.get_by_role('textbox', name='Customer ID').fill(
username
)
await self.page.get_by_role('textbox', name='Password').fill(password)
await self.page.get_by_role('button', name='Log In').click()
log.debug('Waiting for OTP 2FA form')
otp_input = self.page.locator('id=securityCodeInput')
await otp_input.wait_for()
await self.page.wait_for_timeout(random.randint(1000, 3000))
log.debug('Filling OTP 2FA form')
otp = await self.get_secret('bank.commerce.otp')
await otp_input.fill(otp)
async with self.page.expect_event('load'):
await self.page.get_by_role('button', name='Continue').click()
log.debug('Waiting for page load')
await self.page.wait_for_load_state()
cur_url = urllib.parse.urlparse(self.page.url)
if cur_url.path != '/CBI/Accounts/Summary':
new_url = cur_url._replace(path='/CBI/Accounts/Summary', query='')
await self.page.goto(urllib.parse.urlunparse(new_url))
log.info('Successfully logged in to Commerce Bank')
self._logged_in = True
async def logout(self) -> None:
if not self._logged_in:
return
log.debug('Logging out of Commerce Bank')
async with self.page.expect_event('load'):
await self.page.get_by_test_id('navWrap').get_by_text(
'Logout'
).click()
log.info('Logged out of Commerce Bank')
async def open_account(self, account: str) -> None:
log.debug('Navigating to activity page for account %s', account)
if '/Activity/' in self.page.url:
await self.page.get_by_role('button', name='My Accounts').click()
async with self.page.expect_event('load'):
await self.page.get_by_role('link', name=account).click()
await self.page.wait_for_load_state()
await self.page.wait_for_timeout(random.randint(1000, 3000))
log.info('Loaded activity page for account %s', account)
async def download_transactions(
self, from_date: datetime.date, to_date: datetime.date
) -> Path:
log.info('Downloading transactions from %s to %s', from_date, to_date)
datefmt = '%m/%d/%Y'
await self.page.get_by_role(
'link', name='Download Transactions'
).click()
await self.page.wait_for_timeout(random.randint(750, 1250))
modal = self.page.locator('#download-transactions')
input_from = modal.locator('input[data-qaid=fromDate]')
await input_from.click()
await self.page.keyboard.press('Control+A')
await self.page.keyboard.press('Delete')
await self.page.keyboard.type(from_date.strftime(datefmt))
input_to = modal.locator('input[data-qaid=toDate]')
await input_to.click()
await self.page.keyboard.press('Control+A')
await self.page.keyboard.press('Delete')
await self.page.keyboard.type(to_date.strftime(datefmt))
await modal.get_by_role('button', name='Select Type').click()
await self.page.get_by_text('Comma Separated').click()
async with self.page.expect_download() as di:
await self.page.get_by_role('button', name='Download').click()
log.debug('Waiting for download to complete')
path = await (await di.value).path()
assert path
log.info('Downloaded transactions to %s', path)
await modal.get_by_label('Close').click()
return path
async def firefly_import(
self, csv: Path, account: int, importer: FireflyImporter
) -> None:
config = copy.deepcopy(self.IMPORT_CONFIG)
config['default_account'] = account
await importer.import_csv(csv, config)
async def get_secret(self, key: str) -> str:
secret = await self.secrets.get_secret(key)
return secret.decode()
class Chase:
URL = 'https://secure26ea.chase.com/web/auth/dashboard'
IMPORT_CONFIG = {
'version': 3,
'source': 'fidi-1.2.2',
'created_at': '2023-04-27T09:54:42-05:00',
'date': 'n/j/Y',
'delimiter': 'comma',
'headers': True,
'rules': True,
'skip_form': False,
'add_import_tag': True,
'roles': [
'_ignore',
'date_transaction',
'date_process',
'description',
'tags-comma',
'_ignore',
'amount',
'note',
],
'do_mapping': [
False,
False,
False,
False,
False,
False,
False,
False,
],
'mapping': [],
'duplicate_detection_method': 'classic',
'ignore_duplicate_lines': True,
'unique_column_index': 0,
'unique_column_type': 'internal_reference',
'flow': 'file',
'identifier': '0',
'connection': '0',
'ignore_spectre_categories': False,
'map_all_data': True,
'accounts': [],
'date_range': '',
'date_range_number': 30,
'date_range_unit': 'd',
'date_not_before': '',
'date_not_after': '',
'nordigen_country': '',
'nordigen_bank': '',
'nordigen_requisitions': [],
'nordigen_max_days': '90',
'conversion': False,
'ignore_duplicate_transactions': True,
}
def __init__(self, page: Page, secrets: SecretsClient) -> None:
self.page = page
self.secrets = secrets
self.saved_cookies = Path('cookies.json')
self._logged_in = False
async def __aenter__(self) -> 'Chase':
await self.load_cookies()
return self
async def __aexit__(
self,
exc_type: Optional[Type[Exception]],
exc_value: Optional[Exception],
tb: Optional[TracebackType],
) -> None:
try:
await self.logout()
finally:
await self.save_cookies()
async def load_cookies(self) -> None:
log.debug('Loading saved cookies from %s', self.saved_cookies)
try:
with self.saved_cookies.open(encoding='utf-8') as f:
cookies = await asyncio.to_thread(json.load, f)
await self.page.context.add_cookies(cookies)
except Exception as e:
log.debug('Failed to load saved cookies: %s', e)
else:
log.info('Successfully loaded saved cookies')
async def save_cookies(self) -> None:
log.debug('Saving cookies from %s', self.saved_cookies)
try:
with self.saved_cookies.open('w', encoding='utf-8') as f:
cookies = await self.page.context.cookies()
f.write(await asyncio.to_thread(json.dumps, cookies))
except Exception as e:
log.error('Failed to save cookies: %s', e)
else:
log.info('Successfully saved cookies to %s', self.saved_cookies)
async def login(self) -> None:
if self._logged_in:
return
log.debug('Navigating to %s', self.URL)
await self.page.goto(self.URL)
await self.page.wait_for_load_state()
await self.page.wait_for_timeout(random.randint(2000, 4000))
username = await self.get_secret('bank.chase.username')
password = await self.get_secret('bank.chase.password')
log.debug('Filling username/password login form')
logonbox = self.page.frame_locator('#logonbox')
await logonbox.get_by_label('Username').fill(username)
await logonbox.get_by_label('Password').fill(password)
await self.page.wait_for_timeout(random.randint(500, 750))
await logonbox.get_by_role('button', name='Sign in').click()
log.debug('Waiting for page load')
await self.page.wait_for_load_state()
logonframe = self.page.frame_locator('iframe[title="logon"]')
t_2fa = asyncio.create_task(
logonframe.get_by_role(
'heading', name="We don't recognize this device"
).wait_for()
)
t_finished = asyncio.create_task(
self.page.get_by_role('button', name='Pay Card').wait_for()
)
done, pending = await asyncio.wait(
(t_2fa, t_finished),
return_when=asyncio.FIRST_COMPLETED,
)
for t in pending:
t.cancel()
for t in done:
await t
if t_2fa in done:
log.warning('Device verification (SMS 2-factor auth) required')
await logonframe.get_by_label('Tell us how: Choose one').click()
await logonframe.locator(
'#container-1-simplerAuth-dropdownoptions-styledselect'
).click()
otp_task = asyncio.create_task(self.get_secret('bank.chase.otp'))
await logonframe.get_by_role('button', name='Next').click()
log.info('Waiting for SMS verification code')
otp = await otp_task
log.debug('Filling verification code form')
await logonframe.get_by_label('One-time code').fill(otp)
await logonframe.get_by_label('Password').fill(password)
await logonframe.get_by_role('button', name='Next').click()
await self.page.wait_for_load_state()
await self.page.get_by_role('button', name='Pay Card').wait_for()
log.info('Successfully logged in to Chase')
self._logged_in = True
async def download_transactions(
self, from_date: datetime.date, to_date: datetime.date
) -> Path:
log.info('Downloading transactions from %s to %s', from_date, to_date)
fmt = '%m/%d/%Y'
await self.page.locator('#CARD_ACCOUNTS').get_by_role(
'button', name='(...2467)'
).first.click()
fl = self.page.locator('#flyout')
await fl.wait_for()
await fl.get_by_role('button', name='Pay card', exact=True).wait_for()
await fl.get_by_role(
'button', name='Account activity', exact=True
).wait_for()
await fl.get_by_role('link', name='Show details').wait_for()
await fl.get_by_role(
'link', name='Activity since last statement'
).click()
await fl.get_by_role('link', name='All transactions').click()
await fl.get_by_text('See more activity').wait_for()
await fl.get_by_role(
'button', name='Download Account Activity'
).click()
log.debug('Filling account activity download form')
await self.page.locator(
'#select-downloadActivityOptionId-label'
).click()
await self.page.get_by_text('Choose a date range').nth(1).locator(
'../..'
).click()
await self.page.wait_for_timeout(random.randint(500, 1500))
await self.page.locator('#accountActivityFromDate-input-input').fill(
from_date.strftime(fmt)
)
await self.page.locator('#accountActivityFromDate-input-input').blur()
await self.page.wait_for_timeout(random.randint(500, 1500))
await self.page.locator('#accountActivityToDate-input-input').fill(
to_date.strftime(fmt)
)
await self.page.locator('#accountActivityToDate-input-input').blur()
await self.page.wait_for_timeout(random.randint(500, 1500))
async with self.page.expect_download(timeout=5000) as di:
await self.page.get_by_role(
'button', name='Download', exact=True
).click()
log.debug('Waiting for download to complete')
await self.page.wait_for_timeout(random.randint(1000, 2500))
path = await (await di.value).path()
assert path
log.info('Downloaded transactions to %s', path)
return path
async def logout(self) -> None:
if not self._logged_in:
return
log.debug('Logging out of Chase')
async with self.page.expect_event('load'):
await self.page.get_by_role('button', name='Sign out').click()
log.info('Logged out of Chase')
async def firefly_import(
self, csv: Path, account: int, importer: FireflyImporter
) -> None:
config = copy.deepcopy(self.IMPORT_CONFIG)
config['default_account'] = account
with csv.open('r', encoding='utf-8') as f:
headers = f.readline()
if headers.startswith('Card'):
log.debug('Detected CSV schema with Card column')
elif headers.count(',') == 6:
log.debug('Detected CSV schema without Card column')
config['roles'].pop(0)
config['do_mapping'].pop(0)
else:
raise ValueError(f'Unexpected CSV schema: {headers}')
await importer.import_csv(csv, config)
async def get_secret(self, key: str) -> str:
secret = await self.secrets.get_secret(key)
return secret.decode()
async def fetch_transactions(pw: Playwright, secrets: SecretsClient) -> bool:
log.debug('Getting Firefly III access token')
token = (await secrets.get_secret('firefly.token')).decode()
import_secret = (
await secrets.get_secret('firefly.import.secret')
).decode()
import_auth = (
(await secrets.get_secret('firefly.import.username')).decode(),
(await secrets.get_secret('firefly.import.password')).decode(),
)
importer = FireflyImporter(
FIREFLY_III_IMPORTER_URL, import_secret, import_auth
)
end_date = datetime.date.today() - datetime.timedelta(days=1)
failed = False
browser = await pw.chromium.launch(headless=False)
context = await browser.new_context()
await context.tracing.start(screenshots=True, snapshots=True)
page = await context.new_page()
banks = sys.argv[1:] or list(ACCOUNTS.keys())
if 'commerce' in banks:
if not await download_commerce(
page, secrets, end_date, token, importer
):
failed = True
if 'chase' in banks:
if not await download_chase(page, secrets, end_date, token, importer):
failed = True
if failed:
await context.tracing.stop(path='trace.zip')
with open('trace.zip', 'rb') as f:
await ntfy(
'Downloading one or more transaction lists failed.',
attach=f.read(),
filename='trace.zip',
)
return failed
async def amain() -> None:
logging.basicConfig(level=logging.DEBUG)
async with SecretsClient() as secrets:
try:
async with async_playwright() as pw:
failed = await fetch_transactions(pw, secrets)
raise SystemExit(1 if failed else 0)
except asyncio.exceptions.InvalidStateError as e:
log.debug('Ignoring exception: %s', e, exc_info=sys.exc_info())
def main():
asyncio.run(amain())
if __name__ == '__main__':
main()