771 lines
26 KiB
Python
771 lines
26 KiB
Python
import asyncio
|
|
import base64
|
|
import copy
|
|
import datetime
|
|
import json
|
|
import logging
|
|
import os
|
|
import random
|
|
import sys
|
|
import urllib.parse
|
|
from pathlib import Path
|
|
from types import TracebackType
|
|
from typing import Any, Optional, Type
|
|
|
|
import httpx
|
|
from playwright.async_api import Playwright, Page
|
|
from playwright.async_api import async_playwright
|
|
|
|
|
|
log = logging.getLogger('xactfetch')
|
|
|
|
|
|
NTFY_URL = os.environ.get('NTFY_URL', 'https://ntfy.pyrocufflink.blue')
|
|
NTFY_TOPIC = os.environ.get('NTFY_TOPIC', 'dustin')
|
|
FIREFLY_III_URL = os.environ.get(
|
|
'FIREFLY_III_URL', 'https://firefly.pyrocufflink.blue'
|
|
)
|
|
FIREFLY_III_IMPORTER_URL = os.environ.get(
|
|
'FIREFLY_IMPORT_URL', 'https://firefly-importer.pyrocufflink.blue'
|
|
)
|
|
SECRET_SOCKET_PATH = os.environ.get('SECRET_SOCKET_PATH')
|
|
XDG_RUNTIME_DIR = os.environ.get('XDG_RUNTIME_DIR')
|
|
|
|
ACCOUNTS = {
|
|
'commerce': {
|
|
'8357': 1,
|
|
'7730': 67,
|
|
},
|
|
'chase': 15,
|
|
}
|
|
|
|
|
|
class FireflyImporter:
|
|
def __init__(
|
|
self,
|
|
url: str,
|
|
secret: str,
|
|
auth: Optional[tuple[str, str]],
|
|
) -> None:
|
|
self.url = url
|
|
self.secret = secret
|
|
self.auth = auth
|
|
self.client = httpx.AsyncClient()
|
|
|
|
async def import_csv(
|
|
self,
|
|
csv: Path,
|
|
config: dict[str, Any],
|
|
) -> None:
|
|
log.debug('Importing transactions from %s to Firefly III', csv)
|
|
url = f'{self.url.rstrip("/")}/autoupload'
|
|
with csv.open('rb') as f:
|
|
async with httpx.AsyncClient(auth=self.auth) as client:
|
|
r = await client.post(
|
|
url,
|
|
params={
|
|
'secret': self.secret,
|
|
},
|
|
headers={
|
|
'Accept': 'application/json',
|
|
},
|
|
files={
|
|
'importable': ('import.csv', f),
|
|
'json': ('import.json', json.dumps(config)),
|
|
},
|
|
timeout=300,
|
|
)
|
|
r.raise_for_status()
|
|
|
|
|
|
class SecretsClient:
|
|
def __init__(self) -> None:
|
|
self.sock: tuple[asyncio.StreamReader, asyncio.StreamWriter]
|
|
|
|
async def __aenter__(self) -> 'SecretsClient':
|
|
if not hasattr(self, 'sock'):
|
|
await self.connect()
|
|
return self
|
|
|
|
async def __aexit__(
|
|
self,
|
|
exc_type: Optional[Type[Exception]],
|
|
exc_value: Optional[Exception],
|
|
tb: Optional[TracebackType],
|
|
) -> bool:
|
|
self.sock[1].close()
|
|
return False
|
|
|
|
async def connect(self) -> None:
|
|
if SECRET_SOCKET_PATH:
|
|
path = Path(SECRET_SOCKET_PATH)
|
|
elif XDG_RUNTIME_DIR:
|
|
path = Path(XDG_RUNTIME_DIR) / 'secretsocket/.ss'
|
|
else:
|
|
path = '.secretsocket'
|
|
self.sock = await asyncio.open_unix_connection(str(path))
|
|
|
|
async def get_secret(self, key: str) -> bytes:
|
|
self.sock[1].write(f'{key}\n'.encode('utf-8'))
|
|
buf = await self.sock[0].read(64 * 2**10)
|
|
return buf.rstrip(b'\n')
|
|
|
|
|
|
async def ntfy(
|
|
message: Optional[str] = None,
|
|
topic: str = NTFY_TOPIC,
|
|
title: Optional[str] = None,
|
|
tags: Optional[str] = None,
|
|
attach: Optional[bytes] = None,
|
|
filename: Optional[str] = None,
|
|
) -> None:
|
|
assert message or attach
|
|
headers = {
|
|
'Title': title or 'xactfetch',
|
|
}
|
|
if tags:
|
|
headers['Tags'] = tags
|
|
url = f'{NTFY_URL}/{topic}'
|
|
client = httpx.AsyncClient()
|
|
if attach:
|
|
if filename:
|
|
headers['Filename'] = filename
|
|
if message:
|
|
try:
|
|
message.encode("ascii")
|
|
except UnicodeEncodeError:
|
|
message = rfc2047_base64encode(message)
|
|
else:
|
|
message = message.replace('\n', '\\n')
|
|
headers['Message'] = message
|
|
async with client:
|
|
r = await client.put(
|
|
url,
|
|
headers=headers,
|
|
content=attach,
|
|
)
|
|
else:
|
|
async with client:
|
|
r = await client.post(
|
|
url,
|
|
headers=headers,
|
|
content=message,
|
|
)
|
|
r.raise_for_status()
|
|
|
|
|
|
def rfc2047_base64encode(
|
|
message: str,
|
|
) -> str:
|
|
encoded = base64.b64encode(message.encode("utf-8")).decode("ascii")
|
|
return f"=?UTF-8?B?{encoded}?="
|
|
|
|
|
|
async def get_last_transaction_date(key: int, token: str) -> datetime.date:
|
|
url = f'{FIREFLY_III_URL}/api/v1/accounts/{key}/transactions'
|
|
async with httpx.AsyncClient() as client:
|
|
r = await client.get(
|
|
url,
|
|
headers={
|
|
'Authorization': f'Bearer {token}',
|
|
'Accept': 'application/vnd.api+json',
|
|
},
|
|
timeout=10,
|
|
)
|
|
r.raise_for_status()
|
|
last_date = datetime.datetime.min
|
|
for xact in r.json()['data']:
|
|
for split in xact['attributes']['transactions']:
|
|
try:
|
|
datestr = split['date'].split('T')[0]
|
|
date = datetime.datetime.fromisoformat(datestr)
|
|
except (KeyError, ValueError) as e:
|
|
log.warning(
|
|
'Could not parse date from transaction %s: %s',
|
|
xact['id'],
|
|
e,
|
|
)
|
|
continue
|
|
if date > last_date:
|
|
last_date = date
|
|
return last_date.date()
|
|
|
|
|
|
async def download_chase(
|
|
page: Page,
|
|
secrets: SecretsClient,
|
|
end_date: datetime.date,
|
|
token: str,
|
|
importer: FireflyImporter,
|
|
) -> bool:
|
|
async with Chase(page, secrets) as c, ntfyerror('Chase', page) as r:
|
|
key = ACCOUNTS['chase']
|
|
try:
|
|
start_date = await get_last_transaction_date(key, token)
|
|
except (OSError, ValueError) as e:
|
|
log.error(
|
|
'Skipping Chase account: could not get last transaction: %s',
|
|
e,
|
|
)
|
|
return False
|
|
if start_date > end_date:
|
|
log.info(
|
|
'Skipping Chase account: last transaction was %s',
|
|
start_date,
|
|
)
|
|
return True
|
|
await c.login()
|
|
csv = await c.download_transactions(start_date, end_date)
|
|
log.info('Importing transactions from Chase into Firefly III')
|
|
await c.firefly_import(csv, key, importer)
|
|
return r.success
|
|
|
|
|
|
async def download_commerce(
|
|
page: Page,
|
|
secrets: SecretsClient,
|
|
end_date: datetime.date,
|
|
token: str,
|
|
importer: FireflyImporter,
|
|
) -> bool:
|
|
log.info('Downloading transaction lists from Commerce Bank')
|
|
csvs = []
|
|
async with CommerceBank(page, secrets) as c, ntfyerror(
|
|
'Commerce Bank', page
|
|
) as r:
|
|
for name, key in ACCOUNTS['commerce'].items():
|
|
try:
|
|
start_date = await get_last_transaction_date(key, token)
|
|
except (OSError, ValueError) as e:
|
|
log.error(
|
|
'Skipping account %s: could not get last transaction: %s',
|
|
name,
|
|
e,
|
|
)
|
|
continue
|
|
if start_date > end_date:
|
|
log.info(
|
|
'Skipping account %s: last transaction was %s',
|
|
name,
|
|
start_date,
|
|
)
|
|
continue
|
|
log.info(
|
|
'Getting transactions since %s for account xxx%s',
|
|
start_date,
|
|
name,
|
|
)
|
|
await c.login()
|
|
await c.open_account(name)
|
|
csvs.append(
|
|
(key, await c.download_transactions(start_date, end_date))
|
|
)
|
|
log.info('Importing transactions from Commerce Bank into Firefly III')
|
|
for key, csv in csvs:
|
|
await c.firefly_import(csv, key, importer)
|
|
return r.success
|
|
|
|
|
|
class ntfyerror:
|
|
def __init__(self, bank: str, page: Page) -> None:
|
|
self.bank = bank
|
|
self.page = page
|
|
self.success = True
|
|
|
|
async def __aenter__(self) -> 'ntfyerror':
|
|
return self
|
|
|
|
async def __aexit__(
|
|
self,
|
|
exc_type: Optional[Type[Exception]],
|
|
exc_value: Optional[Exception],
|
|
tb: Optional[TracebackType],
|
|
) -> bool:
|
|
if exc_type and exc_value and tb:
|
|
self.success = False
|
|
log.exception(
|
|
'Swallowed exception:', exc_info=(exc_type, exc_value, tb)
|
|
)
|
|
if os.environ.get('DEBUG_NTFY', '1') == '0':
|
|
return True
|
|
try:
|
|
if ss := await self.page.screenshot():
|
|
await asyncio.to_thread(save_screenshot, ss)
|
|
except Exception:
|
|
log.exception('Failed to get screenshot:')
|
|
ss = None
|
|
await ntfy(
|
|
message=str(exc_value),
|
|
title=f'xactfetch failed for {self.bank}',
|
|
tags='warning',
|
|
attach=ss,
|
|
filename='screenshot.png',
|
|
)
|
|
return True
|
|
|
|
|
|
def save_screenshot(screenshot: bytes):
|
|
now = datetime.datetime.now()
|
|
filename = now.strftime('screenshot_%Y%m%d%H%M%S.png')
|
|
log.debug('Saving browser screenshot to %s', filename)
|
|
try:
|
|
with open(filename, 'wb') as f:
|
|
f.write(screenshot)
|
|
except Exception as e:
|
|
log.error('Failed to save browser screenshot: %s', e)
|
|
else:
|
|
log.info('Browser screenshot saved as %s', filename)
|
|
|
|
|
|
class CommerceBank:
|
|
URL = 'https://banking.commercebank.com/CBI/Auth/Login'
|
|
IMPORT_CONFIG = {
|
|
'version': 3,
|
|
'source': 'fidi-1.2.2',
|
|
'created_at': '2023-04-27T08:05:10-05:00',
|
|
'date': 'n/j/Y',
|
|
'delimiter': 'comma',
|
|
'headers': True,
|
|
'rules': True,
|
|
'skip_form': False,
|
|
'add_import_tag': True,
|
|
'roles': [
|
|
'date_transaction',
|
|
'internal_reference',
|
|
'description',
|
|
'amount_debit',
|
|
'amount_credit',
|
|
],
|
|
'do_mapping': [
|
|
False,
|
|
False,
|
|
False,
|
|
False,
|
|
False,
|
|
],
|
|
'mapping': [],
|
|
'duplicate_detection_method': 'classic',
|
|
'ignore_duplicate_lines': False,
|
|
'unique_column_index': 0,
|
|
'unique_column_type': 'internal_reference',
|
|
'flow': 'file',
|
|
'identifier': '0',
|
|
'connection': '0',
|
|
'ignore_spectre_categories': False,
|
|
'map_all_data': False,
|
|
'accounts': [],
|
|
'date_range': '',
|
|
'date_range_number': 30,
|
|
'date_range_unit': 'd',
|
|
'date_not_before': '',
|
|
'date_not_after': '',
|
|
'nordigen_country': '',
|
|
'nordigen_bank': '',
|
|
'nordigen_requisitions': [],
|
|
'nordigen_max_days': '90',
|
|
'conversion': False,
|
|
'ignore_duplicate_transactions': True,
|
|
}
|
|
|
|
def __init__(self, page: Page, secrets: SecretsClient) -> None:
|
|
self.page = page
|
|
self.secrets = secrets
|
|
self._logged_in = False
|
|
|
|
async def __aenter__(self) -> 'CommerceBank':
|
|
return self
|
|
|
|
async def __aexit__(
|
|
self,
|
|
exc_type: Optional[Type[Exception]],
|
|
exc_value: Optional[Exception],
|
|
tb: Optional[TracebackType],
|
|
) -> None:
|
|
await self.logout()
|
|
|
|
async def login(self) -> None:
|
|
if self._logged_in:
|
|
return
|
|
log.debug('Navigating to %s', self.URL)
|
|
await self.page.goto(self.URL)
|
|
username = await self.get_secret('bank.commerce.username')
|
|
password = await self.get_secret('bank.commerce.password')
|
|
log.debug('Filling username/password login form')
|
|
await self.page.get_by_role('textbox', name='Customer ID').fill(
|
|
username
|
|
)
|
|
await self.page.get_by_role('textbox', name='Password').fill(password)
|
|
await self.page.get_by_role('button', name='Log In').click()
|
|
log.debug('Waiting for OTP 2FA form')
|
|
otp_input = self.page.locator('id=securityCodeInput')
|
|
await otp_input.wait_for()
|
|
await self.page.wait_for_timeout(random.randint(1000, 3000))
|
|
log.debug('Filling OTP 2FA form')
|
|
otp = await self.get_secret('bank.commerce.otp')
|
|
await otp_input.fill(otp)
|
|
async with self.page.expect_event('load'):
|
|
await self.page.get_by_role('button', name='Continue').click()
|
|
log.debug('Waiting for page load')
|
|
await self.page.wait_for_load_state()
|
|
cur_url = urllib.parse.urlparse(self.page.url)
|
|
if cur_url.path != '/CBI/Accounts/Summary':
|
|
new_url = cur_url._replace(path='/CBI/Accounts/Summary', query='')
|
|
await self.page.goto(urllib.parse.urlunparse(new_url))
|
|
log.info('Successfully logged in to Commerce Bank')
|
|
self._logged_in = True
|
|
|
|
async def logout(self) -> None:
|
|
if not self._logged_in:
|
|
return
|
|
log.debug('Logging out of Commerce Bank')
|
|
async with self.page.expect_event('load'):
|
|
await self.page.get_by_test_id('navWrap').get_by_text(
|
|
'Logout'
|
|
).click()
|
|
log.info('Logged out of Commerce Bank')
|
|
|
|
async def open_account(self, account: str) -> None:
|
|
log.debug('Navigating to activity page for account %s', account)
|
|
if '/Activity/' in self.page.url:
|
|
await self.page.get_by_role('button', name='My Accounts').click()
|
|
async with self.page.expect_event('load'):
|
|
await self.page.get_by_role('link', name=account).click()
|
|
await self.page.wait_for_load_state()
|
|
await self.page.wait_for_timeout(random.randint(1000, 3000))
|
|
log.info('Loaded activity page for account %s', account)
|
|
|
|
async def download_transactions(
|
|
self, from_date: datetime.date, to_date: datetime.date
|
|
) -> Path:
|
|
log.info('Downloading transactions from %s to %s', from_date, to_date)
|
|
datefmt = '%m/%d/%Y'
|
|
await self.page.get_by_role(
|
|
'link', name='Download Transactions'
|
|
).click()
|
|
await self.page.wait_for_timeout(random.randint(750, 1250))
|
|
modal = self.page.locator('#download-transactions')
|
|
input_from = modal.locator('input[data-qaid=fromDate]')
|
|
await input_from.click()
|
|
await self.page.keyboard.press('Control+A')
|
|
await self.page.keyboard.press('Delete')
|
|
await self.page.keyboard.type(from_date.strftime(datefmt))
|
|
input_to = modal.locator('input[data-qaid=toDate]')
|
|
await input_to.click()
|
|
await self.page.keyboard.press('Control+A')
|
|
await self.page.keyboard.press('Delete')
|
|
await self.page.keyboard.type(to_date.strftime(datefmt))
|
|
await modal.get_by_role('button', name='Select Type').click()
|
|
await self.page.get_by_text('Comma Separated').click()
|
|
async with self.page.expect_download() as di:
|
|
await self.page.get_by_role('button', name='Download').click()
|
|
log.debug('Waiting for download to complete')
|
|
path = await (await di.value).path()
|
|
assert path
|
|
log.info('Downloaded transactions to %s', path)
|
|
await modal.get_by_label('Close').click()
|
|
return path
|
|
|
|
async def firefly_import(
|
|
self, csv: Path, account: int, importer: FireflyImporter
|
|
) -> None:
|
|
config = copy.deepcopy(self.IMPORT_CONFIG)
|
|
config['default_account'] = account
|
|
await importer.import_csv(csv, config)
|
|
|
|
async def get_secret(self, key: str) -> str:
|
|
secret = await self.secrets.get_secret(key)
|
|
return secret.decode()
|
|
|
|
|
|
class Chase:
|
|
URL = 'https://secure26ea.chase.com/web/auth/dashboard'
|
|
IMPORT_CONFIG = {
|
|
'version': 3,
|
|
'source': 'fidi-1.2.2',
|
|
'created_at': '2023-04-27T09:54:42-05:00',
|
|
'date': 'n/j/Y',
|
|
'delimiter': 'comma',
|
|
'headers': True,
|
|
'rules': True,
|
|
'skip_form': False,
|
|
'add_import_tag': True,
|
|
'roles': [
|
|
'_ignore',
|
|
'date_transaction',
|
|
'date_process',
|
|
'description',
|
|
'tags-comma',
|
|
'_ignore',
|
|
'amount',
|
|
'note',
|
|
],
|
|
'do_mapping': [
|
|
False,
|
|
False,
|
|
False,
|
|
False,
|
|
False,
|
|
False,
|
|
False,
|
|
False,
|
|
],
|
|
'mapping': [],
|
|
'duplicate_detection_method': 'classic',
|
|
'ignore_duplicate_lines': True,
|
|
'unique_column_index': 0,
|
|
'unique_column_type': 'internal_reference',
|
|
'flow': 'file',
|
|
'identifier': '0',
|
|
'connection': '0',
|
|
'ignore_spectre_categories': False,
|
|
'map_all_data': True,
|
|
'accounts': [],
|
|
'date_range': '',
|
|
'date_range_number': 30,
|
|
'date_range_unit': 'd',
|
|
'date_not_before': '',
|
|
'date_not_after': '',
|
|
'nordigen_country': '',
|
|
'nordigen_bank': '',
|
|
'nordigen_requisitions': [],
|
|
'nordigen_max_days': '90',
|
|
'conversion': False,
|
|
'ignore_duplicate_transactions': True,
|
|
}
|
|
|
|
def __init__(self, page: Page, secrets: SecretsClient) -> None:
|
|
self.page = page
|
|
self.secrets = secrets
|
|
self.saved_cookies = Path('cookies.json')
|
|
self._logged_in = False
|
|
|
|
async def __aenter__(self) -> 'Chase':
|
|
await self.load_cookies()
|
|
return self
|
|
|
|
async def __aexit__(
|
|
self,
|
|
exc_type: Optional[Type[Exception]],
|
|
exc_value: Optional[Exception],
|
|
tb: Optional[TracebackType],
|
|
) -> None:
|
|
try:
|
|
await self.logout()
|
|
finally:
|
|
await self.save_cookies()
|
|
|
|
async def load_cookies(self) -> None:
|
|
log.debug('Loading saved cookies from %s', self.saved_cookies)
|
|
try:
|
|
with self.saved_cookies.open(encoding='utf-8') as f:
|
|
cookies = await asyncio.to_thread(json.load, f)
|
|
await self.page.context.add_cookies(cookies)
|
|
except Exception as e:
|
|
log.debug('Failed to load saved cookies: %s', e)
|
|
else:
|
|
log.info('Successfully loaded saved cookies')
|
|
|
|
async def save_cookies(self) -> None:
|
|
log.debug('Saving cookies from %s', self.saved_cookies)
|
|
try:
|
|
with self.saved_cookies.open('w', encoding='utf-8') as f:
|
|
cookies = await self.page.context.cookies()
|
|
f.write(await asyncio.to_thread(json.dumps, cookies))
|
|
except Exception as e:
|
|
log.error('Failed to save cookies: %s', e)
|
|
else:
|
|
log.info('Successfully saved cookies to %s', self.saved_cookies)
|
|
|
|
async def login(self) -> None:
|
|
if self._logged_in:
|
|
return
|
|
log.debug('Navigating to %s', self.URL)
|
|
await self.page.goto(self.URL)
|
|
await self.page.wait_for_load_state()
|
|
await self.page.wait_for_timeout(random.randint(2000, 4000))
|
|
username = await self.get_secret('bank.chase.username')
|
|
password = await self.get_secret('bank.chase.password')
|
|
log.debug('Filling username/password login form')
|
|
logonbox = self.page.frame_locator('#logonbox')
|
|
await logonbox.get_by_label('Username').fill(username)
|
|
await logonbox.get_by_label('Password').fill(password)
|
|
await self.page.wait_for_timeout(random.randint(500, 750))
|
|
await logonbox.get_by_role('button', name='Sign in').click()
|
|
log.debug('Waiting for page load')
|
|
await self.page.wait_for_load_state()
|
|
logonframe = self.page.frame_locator('iframe[title="logon"]')
|
|
t_2fa = asyncio.create_task(
|
|
logonframe.get_by_role(
|
|
'heading', name="We don't recognize this device"
|
|
).wait_for()
|
|
)
|
|
t_finished = asyncio.create_task(
|
|
self.page.get_by_role('button', name='Pay Card').wait_for()
|
|
)
|
|
done, pending = await asyncio.wait(
|
|
(t_2fa, t_finished),
|
|
return_when=asyncio.FIRST_COMPLETED,
|
|
)
|
|
for t in pending:
|
|
t.cancel()
|
|
for t in done:
|
|
await t
|
|
if t_2fa in done:
|
|
log.warning('Device verification (SMS 2-factor auth) required')
|
|
await logonframe.get_by_label('Tell us how: Choose one').click()
|
|
await logonframe.locator(
|
|
'#container-1-simplerAuth-dropdownoptions-styledselect'
|
|
).click()
|
|
otp_task = asyncio.create_task(self.get_secret('bank.chase.otp'))
|
|
await logonframe.get_by_role('button', name='Next').click()
|
|
log.info('Waiting for SMS verification code')
|
|
otp = await otp_task
|
|
log.debug('Filling verification code form')
|
|
await logonframe.get_by_label('One-time code').fill(otp)
|
|
await logonframe.get_by_label('Password').fill(password)
|
|
await logonframe.get_by_role('button', name='Next').click()
|
|
await self.page.wait_for_load_state()
|
|
await self.page.get_by_role('button', name='Pay Card').wait_for()
|
|
log.info('Successfully logged in to Chase')
|
|
self._logged_in = True
|
|
|
|
async def download_transactions(
|
|
self, from_date: datetime.date, to_date: datetime.date
|
|
) -> Path:
|
|
log.info('Downloading transactions from %s to %s', from_date, to_date)
|
|
fmt = '%m/%d/%Y'
|
|
await self.page.locator('#CARD_ACCOUNTS').get_by_role(
|
|
'button', name='(...2467)'
|
|
).first.click()
|
|
fl = self.page.locator('#flyout')
|
|
await fl.wait_for()
|
|
await fl.get_by_role('button', name='Pay card', exact=True).wait_for()
|
|
await fl.get_by_role(
|
|
'button', name='Account activity', exact=True
|
|
).wait_for()
|
|
await fl.get_by_role('link', name='Show details').wait_for()
|
|
await fl.get_by_role(
|
|
'link', name='Activity since last statement'
|
|
).click()
|
|
await fl.get_by_role('link', name='All transactions').click()
|
|
await fl.get_by_text('See more activity').wait_for()
|
|
await fl.get_by_role(
|
|
'button', name='Download Account Activity'
|
|
).click()
|
|
log.debug('Filling account activity download form')
|
|
await self.page.locator(
|
|
'#select-downloadActivityOptionId-label'
|
|
).click()
|
|
await self.page.get_by_text('Choose a date range').nth(1).locator(
|
|
'../..'
|
|
).click()
|
|
await self.page.wait_for_timeout(random.randint(500, 1500))
|
|
await self.page.locator('#accountActivityFromDate-input-input').fill(
|
|
from_date.strftime(fmt)
|
|
)
|
|
await self.page.locator('#accountActivityFromDate-input-input').blur()
|
|
await self.page.wait_for_timeout(random.randint(500, 1500))
|
|
await self.page.locator('#accountActivityToDate-input-input').fill(
|
|
to_date.strftime(fmt)
|
|
)
|
|
await self.page.locator('#accountActivityToDate-input-input').blur()
|
|
await self.page.wait_for_timeout(random.randint(500, 1500))
|
|
async with self.page.expect_download(timeout=5000) as di:
|
|
await self.page.get_by_role(
|
|
'button', name='Download', exact=True
|
|
).click()
|
|
log.debug('Waiting for download to complete')
|
|
await self.page.wait_for_timeout(random.randint(1000, 2500))
|
|
path = await (await di.value).path()
|
|
assert path
|
|
log.info('Downloaded transactions to %s', path)
|
|
return path
|
|
|
|
async def logout(self) -> None:
|
|
if not self._logged_in:
|
|
return
|
|
log.debug('Logging out of Chase')
|
|
async with self.page.expect_event('load'):
|
|
await self.page.get_by_role('button', name='Sign out').click()
|
|
log.info('Logged out of Chase')
|
|
|
|
async def firefly_import(
|
|
self, csv: Path, account: int, importer: FireflyImporter
|
|
) -> None:
|
|
config = copy.deepcopy(self.IMPORT_CONFIG)
|
|
config['default_account'] = account
|
|
with csv.open('r', encoding='utf-8') as f:
|
|
headers = f.readline()
|
|
if headers.startswith('Card'):
|
|
log.debug('Detected CSV schema with Card column')
|
|
elif headers.count(',') == 6:
|
|
log.debug('Detected CSV schema without Card column')
|
|
config['roles'].pop(0)
|
|
config['do_mapping'].pop(0)
|
|
else:
|
|
raise ValueError(f'Unexpected CSV schema: {headers}')
|
|
await importer.import_csv(csv, config)
|
|
|
|
async def get_secret(self, key: str) -> str:
|
|
secret = await self.secrets.get_secret(key)
|
|
return secret.decode()
|
|
|
|
|
|
async def fetch_transactions(pw: Playwright, secrets: SecretsClient) -> bool:
|
|
log.debug('Getting Firefly III access token')
|
|
token = (await secrets.get_secret('firefly.token')).decode()
|
|
import_secret = (
|
|
await secrets.get_secret('firefly.import.secret')
|
|
).decode()
|
|
import_auth = (
|
|
(await secrets.get_secret('firefly.import.username')).decode(),
|
|
(await secrets.get_secret('firefly.import.password')).decode(),
|
|
)
|
|
importer = FireflyImporter(
|
|
FIREFLY_III_IMPORTER_URL, import_secret, import_auth
|
|
)
|
|
end_date = datetime.date.today() - datetime.timedelta(days=1)
|
|
failed = False
|
|
browser = await pw.chromium.launch(headless=False)
|
|
context = await browser.new_context()
|
|
await context.tracing.start(screenshots=True, snapshots=True)
|
|
page = await context.new_page()
|
|
banks = sys.argv[1:] or list(ACCOUNTS.keys())
|
|
if 'commerce' in banks:
|
|
if not await download_commerce(
|
|
page, secrets, end_date, token, importer
|
|
):
|
|
failed = True
|
|
if 'chase' in banks:
|
|
if not await download_chase(page, secrets, end_date, token, importer):
|
|
failed = True
|
|
if failed:
|
|
await context.tracing.stop(path='trace.zip')
|
|
with open('trace.zip', 'rb') as f:
|
|
await ntfy(
|
|
'Downloading one or more transaction lists failed.',
|
|
attach=f.read(),
|
|
filename='trace.zip',
|
|
)
|
|
return failed
|
|
|
|
|
|
async def amain() -> None:
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
async with SecretsClient() as secrets:
|
|
try:
|
|
async with async_playwright() as pw:
|
|
failed = await fetch_transactions(pw, secrets)
|
|
raise SystemExit(1 if failed else 0)
|
|
except asyncio.exceptions.InvalidStateError as e:
|
|
log.debug('Ignoring exception: %s', e, exc_info=sys.exc_info())
|
|
|
|
|
|
def main():
|
|
asyncio.run(amain())
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|