1
0
Fork 0

Switch to async API

Using the Playwrigt async API is the only way to wait for one of
multiple conditions.  We will need this capability in order to detect
certain abnormal conditions, such as spurious 2FA auth or interstitial
ads.
master
Dustin 2024-07-08 18:12:22 -05:00
parent b30b38f76f
commit 43aba0c848
2 changed files with 219 additions and 175 deletions

View File

@ -11,6 +11,7 @@ classifiers = [
"Programming Language :: Python :: 3",
]
dependencies = [
"httpx~=0.27.0",
"playwright~=1.32",
"requests~=2.29.0",
]
@ -28,3 +29,7 @@ build-backend = "setuptools.build_meta"
[tool.pyright]
venvPath = '.'
venv = '.venv'
[tool.black]
line-length = 79
skip-string-normalization = true

View File

@ -1,3 +1,4 @@
import asyncio
import base64
import copy
import datetime
@ -5,16 +6,15 @@ import json
import logging
import os
import random
import socket
import sys
import urllib.parse
from pathlib import Path
from types import TracebackType
from typing import Any, Optional, Type
import requests
from playwright.sync_api import Page
from playwright.sync_api import sync_playwright
import httpx
from playwright.async_api import Page
from playwright.async_api import async_playwright
log = logging.getLogger('xactfetch')
@ -46,8 +46,9 @@ class FireflyImporter:
self.url = url
self.secret = secret
self.auth = auth
self.client = httpx.AsyncClient()
def import_csv(
async def import_csv(
self,
csv: Path,
config: dict[str, Any],
@ -55,58 +56,58 @@ class FireflyImporter:
log.debug('Importing transactions from %s to Firefly III', csv)
url = f'{self.url.rstrip("/")}/autoupload'
with csv.open('rb') as f:
r = requests.post(
async with httpx.AsyncClient(auth=self.auth) as client:
r = await client.post(
url,
auth=self.auth,
headers={
'Accept': 'application/json',
},
params={
'secret': self.secret,
},
headers={
'Accept': 'application/json',
},
files={
'importable': ('import.csv', f),
'json': ('import.json', json.dumps(config)),
},
timeout=300,
)
r.raise_for_status()
class SecretsClient:
def __init__(self) -> None:
self.sock: socket.socket
self.sock: tuple[asyncio.StreamReader, asyncio.StreamWriter]
def __enter__(self) -> 'SecretsClient':
async def __aenter__(self) -> 'SecretsClient':
if not hasattr(self, 'sock'):
self.connect()
await self.connect()
return self
def __exit__(
async def __aexit__(
self,
exc_type: Optional[Type[Exception]],
exc_value: Optional[Exception],
tb: Optional[TracebackType],
) -> bool:
self.sock.close()
self.sock[1].close()
return False
def connect(self) -> None:
async def connect(self) -> None:
if SECRET_SOCKET_PATH:
path = Path(SECRET_SOCKET_PATH)
elif XDG_RUNTIME_DIR:
path = Path(XDG_RUNTIME_DIR) / 'secretsocket/.ss'
else:
path = '.secretsocket'
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.connect(str(path))
self.sock = await asyncio.open_unix_connection(str(path))
def get_secret(self, key: str) -> bytes:
self.sock.send(f'{key}\n'.encode('utf-8'))
buf = self.sock.recv(64 * 2**10)
async def get_secret(self, key: str) -> bytes:
self.sock[1].write(f'{key}\n'.encode('utf-8'))
buf = await self.sock[0].read(64 * 2**10)
return buf.rstrip(b'\n')
def ntfy(
async def ntfy(
message: Optional[str] = None,
topic: str = NTFY_TOPIC,
title: Optional[str] = None,
@ -121,6 +122,7 @@ def ntfy(
if tags:
headers['Tags'] = tags
url = f'{NTFY_URL}/{topic}'
client = httpx.AsyncClient()
if attach:
if filename:
headers['Filename'] = filename
@ -132,16 +134,18 @@ def ntfy(
else:
message = message.replace('\n', '\\n')
headers['Message'] = message
r = requests.put(
async with client:
r = await client.put(
url,
headers=headers,
data=attach,
content=attach,
)
else:
r = requests.post(
async with client:
r = await client.post(
url,
headers=headers,
data=message,
content=message,
)
r.raise_for_status()
@ -153,9 +157,10 @@ def rfc2047_base64encode(
return f"=?UTF-8?B?{encoded}?="
def get_last_transaction_date(key: int, token: str) -> datetime.date:
async def get_last_transaction_date(key: int, token: str) -> datetime.date:
url = f'{FIREFLY_III_URL}/api/v1/accounts/{key}/transactions'
r = requests.get(
async with httpx.AsyncClient() as client:
r = await client.get(
url,
headers={
'Authorization': f'Bearer {token}',
@ -181,17 +186,17 @@ def get_last_transaction_date(key: int, token: str) -> datetime.date:
return last_date.date()
def download_chase(
async def download_chase(
page: Page,
secrets: SecretsClient,
end_date: datetime.date,
token: str,
importer: FireflyImporter,
) -> bool:
with Chase(page, secrets) as c, ntfyerror('Chase', page) as r:
async with Chase(page, secrets) as c, ntfyerror('Chase', page) as r:
key = ACCOUNTS['chase']
try:
start_date = get_last_transaction_date(key, token)
start_date = await get_last_transaction_date(key, token)
except (OSError, ValueError) as e:
log.error(
'Skipping Chase account: could not get last transaction: %s',
@ -204,14 +209,14 @@ def download_chase(
start_date,
)
return True
c.login()
csv = c.download_transactions(start_date, end_date)
await c.login()
csv = await c.download_transactions(start_date, end_date)
log.info('Importing transactions from Chase into Firefly III')
c.firefly_import(csv, key, importer)
await c.firefly_import(csv, key, importer)
return r.success
def download_commerce(
async def download_commerce(
page: Page,
secrets: SecretsClient,
end_date: datetime.date,
@ -220,12 +225,12 @@ def download_commerce(
) -> bool:
log.info('Downloading transaction lists from Commerce Bank')
csvs = []
with CommerceBank(page, secrets) as c, ntfyerror(
async with CommerceBank(page, secrets) as c, ntfyerror(
'Commerce Bank', page
) as r:
for name, key in ACCOUNTS['commerce'].items():
try:
start_date = get_last_transaction_date(key, token)
start_date = await get_last_transaction_date(key, token)
except (OSError, ValueError) as e:
log.error(
'Skipping account %s: could not get last transaction: %s',
@ -245,12 +250,14 @@ def download_commerce(
start_date,
name,
)
c.login()
c.open_account(name)
csvs.append((key, c.download_transactions(start_date, end_date)))
await c.login()
await c.open_account(name)
csvs.append(
(key, await c.download_transactions(start_date, end_date))
)
log.info('Importing transactions from Commerce Bank into Firefly III')
for key, csv in csvs:
c.firefly_import(csv, key, importer)
await c.firefly_import(csv, key, importer)
return r.success
@ -260,10 +267,10 @@ class ntfyerror:
self.page = page
self.success = True
def __enter__(self) -> 'ntfyerror':
async def __aenter__(self) -> 'ntfyerror':
return self
def __exit__(
async def __aexit__(
self,
exc_type: Optional[Type[Exception]],
exc_value: Optional[Exception],
@ -276,9 +283,9 @@ class ntfyerror:
)
if os.environ.get('DEBUG_NTFY', '1') == '0':
return True
if ss := self.page.screenshot():
save_screenshot(ss)
ntfy(
if ss := await self.page.screenshot():
await asyncio.to_thread(save_screenshot, ss)
await ntfy(
message=str(exc_value),
title=f'xactfetch failed for {self.bank}',
tags='warning',
@ -356,99 +363,109 @@ class CommerceBank:
self.secrets = secrets
self._logged_in = False
def __enter__(self) -> 'CommerceBank':
async def __aenter__(self) -> 'CommerceBank':
return self
def __exit__(
async def __aexit__(
self,
exc_type: Optional[Type[Exception]],
exc_value: Optional[Exception],
tb: Optional[TracebackType],
) -> None:
self.logout()
await self.logout()
def login(self) -> None:
async def login(self) -> None:
if self._logged_in:
return
log.debug('Navigating to %s', self.URL)
self.page.goto(self.URL)
username = self.secrets.get_secret('bank.commerce.username').decode()
password = self.secrets.get_secret('bank.commerce.password').decode()
await self.page.goto(self.URL)
username = await self.get_secret('bank.commerce.username')
password = await self.get_secret('bank.commerce.password')
log.debug('Filling username/password login form')
self.page.get_by_role('textbox', name='Customer ID').fill(username)
self.page.get_by_role('textbox', name='Password').fill(password)
self.page.get_by_role('button', name='Log In').click()
await self.page.get_by_role('textbox', name='Customer ID').fill(
username
)
await self.page.get_by_role('textbox', name='Password').fill(password)
await self.page.get_by_role('button', name='Log In').click()
log.debug('Waiting for OTP 2FA form')
otp_input = self.page.locator('id=securityCodeInput')
otp_input.wait_for()
self.page.wait_for_timeout(random.randint(1000, 3000))
await otp_input.wait_for()
await self.page.wait_for_timeout(random.randint(1000, 3000))
log.debug('Filling OTP 2FA form')
otp = self.secrets.get_secret('bank.commerce.otp').decode()
otp_input.fill(otp)
with self.page.expect_event('load'):
self.page.get_by_role('button', name='Continue').click()
otp = await self.get_secret('bank.commerce.otp')
await otp_input.fill(otp)
async with self.page.expect_event('load'):
await self.page.get_by_role('button', name='Continue').click()
log.debug('Waiting for page load')
self.page.wait_for_load_state()
await self.page.wait_for_load_state()
cur_url = urllib.parse.urlparse(self.page.url)
if cur_url.path != '/CBI/Accounts/Summary':
new_url = cur_url._replace(path='/CBI/Accounts/Summary', query='')
self.page.goto(urllib.parse.urlunparse(new_url))
await self.page.goto(urllib.parse.urlunparse(new_url))
log.info('Successfully logged in to Commerce Bank')
self._logged_in = True
def logout(self) -> None:
async def logout(self) -> None:
if not self._logged_in:
return
log.debug('Logging out of Commerce Bank')
with self.page.expect_event('load'):
self.page.get_by_test_id('navWrap').get_by_text('Logout').click()
async with self.page.expect_event('load'):
await self.page.get_by_test_id('navWrap').get_by_text(
'Logout'
).click()
log.info('Logged out of Commerce Bank')
def open_account(self, account: str) -> None:
async def open_account(self, account: str) -> None:
log.debug('Navigating to activity page for account %s', account)
if '/Activity/' in self.page.url:
self.page.get_by_role('button', name='My Accounts').click()
with self.page.expect_event('load'):
self.page.get_by_role('link', name=account).click()
self.page.wait_for_load_state()
self.page.wait_for_timeout(random.randint(1000, 3000))
await self.page.get_by_role('button', name='My Accounts').click()
async with self.page.expect_event('load'):
await self.page.get_by_role('link', name=account).click()
await self.page.wait_for_load_state()
await self.page.wait_for_timeout(random.randint(1000, 3000))
log.info('Loaded activity page for account %s', account)
def download_transactions(
async def download_transactions(
self, from_date: datetime.date, to_date: datetime.date
) -> Path:
log.info('Downloading transactions from %s to %s', from_date, to_date)
datefmt = '%m/%d/%Y'
self.page.get_by_role('link', name='Download Transactions').click()
self.page.wait_for_timeout(random.randint(750, 1250))
await self.page.get_by_role(
'link', name='Download Transactions'
).click()
await self.page.wait_for_timeout(random.randint(750, 1250))
modal = self.page.locator('#download-transactions')
input_from = modal.locator('input[data-qaid=fromDate]')
input_from.click()
self.page.keyboard.press('Control+A')
self.page.keyboard.press('Delete')
self.page.keyboard.type(from_date.strftime(datefmt))
await input_from.click()
await self.page.keyboard.press('Control+A')
await self.page.keyboard.press('Delete')
await self.page.keyboard.type(from_date.strftime(datefmt))
input_to = modal.locator('input[data-qaid=toDate]')
input_to.click()
self.page.keyboard.press('Control+A')
self.page.keyboard.press('Delete')
self.page.keyboard.type(to_date.strftime(datefmt))
modal.get_by_role('button', name='Select Type').click()
self.page.get_by_text('Comma Separated').click()
with self.page.expect_download() as di:
self.page.get_by_role('button', name='Download').click()
await input_to.click()
await self.page.keyboard.press('Control+A')
await self.page.keyboard.press('Delete')
await self.page.keyboard.type(to_date.strftime(datefmt))
await modal.get_by_role('button', name='Select Type').click()
await self.page.get_by_text('Comma Separated').click()
async with self.page.expect_download() as di:
await self.page.get_by_role('button', name='Download').click()
log.debug('Waiting for download to complete')
path = di.value.path()
path = await (await di.value).path()
assert path
log.info('Downloaded transactions to %s', path)
modal.get_by_label('Close').click()
await modal.get_by_label('Close').click()
return path
def firefly_import(
async def firefly_import(
self, csv: Path, account: int, importer: FireflyImporter
) -> None:
config = copy.deepcopy(self.IMPORT_CONFIG)
config['default_account'] = account
importer.import_csv(csv, config)
await importer.import_csv(csv, config)
async def get_secret(self, key: str) -> str:
secret = await self.secrets.get_secret(key)
return secret.decode()
class Chase:
@ -513,26 +530,27 @@ class Chase:
self.saved_cookies = Path('cookies.json')
self._logged_in = False
def __enter__(self) -> 'Chase':
self.load_cookies()
async def __aenter__(self) -> 'Chase':
await self.load_cookies()
return self
def __exit__(
async def __aexit__(
self,
exc_type: Optional[Type[Exception]],
exc_value: Optional[Exception],
tb: Optional[TracebackType],
) -> None:
try:
self.logout()
await self.logout()
finally:
self.save_cookies()
await self.save_cookies()
def load_cookies(self) -> None:
async def load_cookies(self) -> None:
log.debug('Loading saved cookies from %s', self.saved_cookies)
try:
with self.saved_cookies.open(encoding='utf-8') as f:
self.page.context.add_cookies(json.load(f))
cookies = await asyncio.to_thread(json.load, f)
await self.page.context.add_cookies(cookies)
except:
log.warning(
'Could not load saved cookies, '
@ -541,94 +559,101 @@ class Chase:
else:
log.info('Successfully loaded saved cookies')
def save_cookies(self) -> None:
async def save_cookies(self) -> None:
log.debug('Saving cookies from %s', self.saved_cookies)
try:
with self.saved_cookies.open('w', encoding='utf-8') as f:
f.write(json.dumps(self.page.context.cookies()))
cookies = await self.page.context.cookies()
f.write(await asyncio.to_thread(json.dumps, cookies))
except Exception as e:
log.error('Failed to save cookies: %s', e)
else:
log.info('Successfully saved cookies to %s', self.saved_cookies)
def login(self) -> None:
async def login(self) -> None:
if self._logged_in:
return
log.debug('Navigating to %s', self.URL)
self.page.goto(self.URL)
self.page.wait_for_load_state()
self.page.wait_for_timeout(random.randint(2000, 4000))
username = self.secrets.get_secret('bank.chase.username').decode()
password = self.secrets.get_secret('bank.chase.password').decode()
await self.page.goto(self.URL)
await self.page.wait_for_load_state()
await self.page.wait_for_timeout(random.randint(2000, 4000))
username = await self.get_secret('bank.chase.username')
password = await self.get_secret('bank.chase.password')
log.debug('Filling username/password login form')
logonbox = self.page.frame_locator('#logonbox')
logonbox.get_by_label('Username').fill(username)
logonbox.get_by_label('Password').fill(password)
self.page.wait_for_timeout(random.randint(500, 750))
logonbox.get_by_role('button', name='Sign in').click()
await logonbox.get_by_label('Username').fill(username)
await logonbox.get_by_label('Password').fill(password)
await self.page.wait_for_timeout(random.randint(500, 750))
await logonbox.get_by_role('button', name='Sign in').click()
log.debug('Waiting for page load')
self.page.wait_for_load_state()
self.page.get_by_role('button', name='Pay Card').wait_for(
await self.page.wait_for_load_state()
await self.page.get_by_role('button', name='Pay Card').wait_for(
timeout=120000
)
log.info('Successfully logged in to Chase')
self._logged_in = True
def download_transactions(
async def download_transactions(
self, from_date: datetime.date, to_date: datetime.date
) -> Path:
log.info('Downloading transactions from %s to %s', from_date, to_date)
fmt = '%m/%d/%Y'
self.page.locator('#CARD_ACCOUNTS').get_by_role(
await self.page.locator('#CARD_ACCOUNTS').get_by_role(
'button', name='(...2467)'
).first.click()
fl = self.page.locator('#flyout')
fl.wait_for()
fl.get_by_role('button', name='Pay card', exact=True).wait_for()
fl.get_by_role(
await fl.wait_for()
await fl.get_by_role('button', name='Pay card', exact=True).wait_for()
await fl.get_by_role(
'button', name='Account activity', exact=True
).wait_for()
fl.get_by_role('link', name='Show details').wait_for()
fl.get_by_role('link', name='Activity since last statement').click()
fl.get_by_role('link', name='All transactions').click()
fl.get_by_text('See more activity').wait_for()
fl.get_by_role('button', name='Download Account Activity').click()
await fl.get_by_role('link', name='Show details').wait_for()
await fl.get_by_role(
'link', name='Activity since last statement'
).click()
await fl.get_by_role('link', name='All transactions').click()
await fl.get_by_text('See more activity').wait_for()
await fl.get_by_role(
'button', name='Download Account Activity'
).click()
log.debug('Filling account activity download form')
self.page.locator('#select-downloadActivityOptionId-label').click()
self.page.get_by_text('Choose a date range').nth(1).locator(
await self.page.locator(
'#select-downloadActivityOptionId-label'
).click()
await self.page.get_by_text('Choose a date range').nth(1).locator(
'../..'
).click()
self.page.wait_for_timeout(random.randint(500, 1500))
self.page.locator('#accountActivityFromDate-input-input').fill(
await self.page.wait_for_timeout(random.randint(500, 1500))
await self.page.locator('#accountActivityFromDate-input-input').fill(
from_date.strftime(fmt)
)
self.page.locator('#accountActivityFromDate-input-input').blur()
self.page.wait_for_timeout(random.randint(500, 1500))
self.page.locator('#accountActivityToDate-input-input').fill(
await self.page.locator('#accountActivityFromDate-input-input').blur()
await self.page.wait_for_timeout(random.randint(500, 1500))
await self.page.locator('#accountActivityToDate-input-input').fill(
to_date.strftime(fmt)
)
self.page.locator('#accountActivityToDate-input-input').blur()
self.page.wait_for_timeout(random.randint(500, 1500))
with self.page.expect_download(timeout=5000) as di:
self.page.get_by_role(
await self.page.locator('#accountActivityToDate-input-input').blur()
await self.page.wait_for_timeout(random.randint(500, 1500))
async with self.page.expect_download(timeout=5000) as di:
await self.page.get_by_role(
'button', name='Download', exact=True
).click()
log.debug('Waiting for download to complete')
self.page.wait_for_timeout(random.randint(1000, 2500))
path = di.value.path()
await self.page.wait_for_timeout(random.randint(1000, 2500))
path = await (await di.value).path()
assert path
log.info('Downloaded transactions to %s', path)
return path
def logout(self) -> None:
async def logout(self) -> None:
if not self._logged_in:
return
log.debug('Logging out of Chase')
with self.page.expect_event('load'):
self.page.get_by_role('button', name='Sign out').click()
async with self.page.expect_event('load'):
await self.page.get_by_role('button', name='Sign out').click()
log.info('Logged out of Chase')
def firefly_import(
async def firefly_import(
self, csv: Path, account: int, importer: FireflyImporter
) -> None:
config = copy.deepcopy(self.IMPORT_CONFIG)
@ -643,38 +668,52 @@ class Chase:
config['do_mapping'].pop(0)
else:
raise ValueError(f'Unexpected CSV schema: {headers}')
importer.import_csv(csv, config)
await importer.import_csv(csv, config)
async def get_secret(self, key: str) -> str:
secret = await self.secrets.get_secret(key)
return secret.decode()
def main() -> None:
async def amain() -> None:
logging.basicConfig(level=logging.DEBUG)
secrets = SecretsClient()
secrets.connect()
await secrets.connect()
log.debug('Getting Firefly III access token')
token = secrets.get_secret('firefly.token').decode()
import_secret = secrets.get_secret('firefly.import.secret').decode()
token = (await secrets.get_secret('firefly.token')).decode()
import_secret = (
await secrets.get_secret('firefly.import.secret')
).decode()
import_auth = (
secrets.get_secret('firefly.import.username').decode(),
secrets.get_secret('firefly.import.password').decode(),
(await secrets.get_secret('firefly.import.username')).decode(),
(await secrets.get_secret('firefly.import.password')).decode(),
)
importer = FireflyImporter(
FIREFLY_III_IMPORTER_URL, import_secret, import_auth
)
end_date = datetime.date.today() - datetime.timedelta(days=1)
with sync_playwright() as pw, secrets:
headless = os.environ.get('DEBUG_HEADLESS_BROWSER', '1') == '1'
browser = pw.firefox.launch(headless=headless)
page = browser.new_page()
failed = False
async with async_playwright() as pw, secrets:
headless = os.environ.get('DEBUG_HEADLESS_BROWSER', '1') == '1'
browser = await pw.firefox.launch(headless=headless)
page = await browser.new_page()
banks = sys.argv[1:] or list(ACCOUNTS.keys())
if 'commerce' in banks:
if not download_commerce(page, secrets, end_date, token, importer):
if not await download_commerce(
page, secrets, end_date, token, importer
):
failed = True
if 'chase' in banks:
if not download_chase(page, secrets, end_date, token, importer):
if not await download_chase(
page, secrets, end_date, token, importer
):
failed = True
raise SystemExit(1 if failed else 0)
def main():
asyncio.run(amain())
if __name__ == '__main__':
main()