1
0
Fork 0

secretsocket: Handle secrets via external process

`xactfetch` has three different ways of reading secret values:

* From environment variables
* By reading the contents of a file (specified by environment variables)
* By looking them up in the Bitwarden vault

This is very cumbersome to work with, especially when trying to
troubleshoot using the container image locally.

To make this easier, I've factored out all secret lookup functionality
into a separate process.  This process listens on a UNIX socket and
implements a very simple secret lookup protocol.  The client
(`xactfetch` itself in this case) sends a string key, identifying the
secret it wants to look up, terminated by a single line feed character.
The `secretsocket` server looks up the secret associated with that key,
using the method defined in a TOML configuration file.  There are four
supported methods:

* Environment variables
* External programs
* File contents
* Static strings

The value returned by the corresponding method is then sent back to the
client via the socket connection, again as a string terminated with a
line feed.

Moving the secret handling into a separate process simplifies the
environment configuration needed in order to run `xactfetch`.  Notably,
when running it in a container, only the `secretsocket` soket needs to
be mounted into the container.  Since `rbw` is executed by the server
process now, rather than `xactfetch` directly, the vault does not need
to be present in the `xactfetch` container.  Indeed, none of the secret
values need to be present in the container.
master
Dustin 2024-07-08 17:11:24 -05:00
parent 72eae4d5b3
commit b30b38f76f
3 changed files with 270 additions and 96 deletions

1
.gitignore vendored
View File

@ -1,6 +1,7 @@
/.venv
/cookies.json
/screenshot_*.png
/secrets.toml
*.egg-info/
__pycache__/
*.py[co]

198
secretsocket.py Normal file
View File

@ -0,0 +1,198 @@
#!/usr/bin/env python3
import asyncio
import logging
import os
import shlex
import signal
import socket
import struct
import tomllib
from pathlib import Path
from typing import Optional
log = logging.getLogger('secretsocket')
ALLOW_UNKNOWN_PEER = os.environ.get('ALLOW_UNKNOWN_PEER') == '1'
XDG_RUNTIME_DIR = Path(os.environ['XDG_RUNTIME_DIR'])
class Secret:
async def lookup(self) -> Optional[bytes]:
raise NotImplementedError
class EnvSecret(Secret):
def __init__(self, env_var: str) -> None:
self.env_var = env_var
async def lookup(self) -> Optional[bytes]:
return os.environb.get(self.env_var.encode('utf-8'))
class ExecSecret(Secret):
def __init__(self, cmd: str) -> None:
self.cmd = cmd
async def lookup(self) -> Optional[bytes]:
args = shlex.split(self.cmd)
proc = await asyncio.create_subprocess_exec(
*args,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
)
o = await proc.communicate()
return o[0]
class PathSecret(Secret):
def __init__(self, path: Path) -> None:
self.path = path
async def lookup(self) -> Optional[bytes]:
try:
f = self.path.expanduser().open('rb')
except OSError as e:
log.error('Failed to read secret from %s: %s', self.path, e)
return None
with f:
return await asyncio.to_thread(f.read)
class StringSecret(Secret):
def __init__(self, value: str) -> None:
self.value = value
async def lookup(self) -> Optional[bytes]:
return self.value.encode('utf-8')
class SecretServer:
def __init__(self, path: Optional[Path] = None) -> None:
self.path = path
async def handle_client(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
):
try:
sock = writer.get_extra_info('socket')
pid, uid, gid = get_socket_peercred(sock)
except Exception as e:
log.error('Failed to get peer credentials: %s', e)
pid, uid, gid = None, None, None
log.debug('Client connected (pid %d, uid %d, gid %d)', pid, uid, gid)
if uid is None:
if ALLOW_UNKNOWN_PEER:
log.warning('Handling connection from unknown peer')
else:
log.error('Refusing to handle connection from unknown peer')
writer.close()
return
else:
my_uid = os.getresuid()[1]
if uid != my_uid:
log.error(
'Refusing to handle connection from PID %d: '
'peer UID %d does not match %d',
pid,
uid,
my_uid,
)
writer.close()
return
while 1:
try:
key = (await reader.readuntil(b'\n')).rstrip(b'\n').decode()
except asyncio.IncompleteReadError:
break
else:
log.info('Client %d requested secret %s', pid, key)
try:
secret = await self.get_secret(key)
except Exception:
log.exception('Failed to get secret:')
writer.close()
return
else:
writer.write(secret + b'\n')
await writer.drain()
log.debug('Client disconnected')
writer.close()
await writer.wait_closed()
async def get_secret(self, key: str) -> bytes:
secrets = await load_secrets(self.path)
if secret := secrets.get(key):
if value := await secret.lookup():
return value
else:
log.warning('Lookup of secret %s failed', key)
else:
log.warning('Unknown secret: %s', key)
return b''
def get_socket_peercred(sock: socket.socket) -> tuple[int, int, int]:
struct_ucred = '=iii'
buflen = struct.calcsize(struct_ucred)
cred = sock.getsockopt(socket.SOL_SOCKET, socket.SO_PEERCRED, buflen)
return struct.unpack(struct_ucred, cred)
async def load_secrets(path: Optional[Path] = None) -> dict[str, Secret]:
if path is None:
path = Path('secrets.toml')
secrets = {}
try:
f = path.open('rb')
except OSError as e:
log.error('Failed to load secrets: %s', e)
return secrets
with f:
config = await asyncio.to_thread(tomllib.load, f)
for key, value in config.items():
if 'env' in value:
secrets[key] = EnvSecret(value['env'])
elif 'exec' in value:
secrets[key] = ExecSecret(value['exec'])
elif 'path' in value:
secrets[key] = PathSecret(Path(value['path']))
elif 'string' in value:
secrets[key] = StringSecret(value['string'])
else:
log.warning(
'Unsupported configuration for secret %s: %r', key, value
)
return secrets
def shutdown(signum, server):
log.info('Received signal %d, shutting down', signum)
server.close()
async def main():
logging.basicConfig(level=logging.DEBUG)
sock_path = XDG_RUNTIME_DIR / 'secretsocket/.ss'
if not sock_path.parent.exists():
sock_path.parent.mkdir()
if sock_path.exists():
sock_path.unlink()
ss = SecretServer()
server = await asyncio.start_unix_server(ss.handle_client, path=sock_path)
async with server:
await server.start_serving()
loop = asyncio.get_running_loop()
for signum in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(signum, shutdown, signum, server)
await server.wait_closed()
if __name__ == '__main__':
asyncio.run(main())

View File

@ -1,13 +1,12 @@
import base64
import copy
import datetime
import getpass
import json
import logging
import os
import random
import socket
import sys
import subprocess
import urllib.parse
from pathlib import Path
from types import TracebackType
@ -25,6 +24,8 @@ NTFY_URL = os.environ['NTFY_URL']
NTFY_TOPIC = os.environ['NTFY_TOPIC']
FIREFLY_III_URL = os.environ['FIREFLY_III_URL']
FIREFLY_III_IMPORTER_URL = os.environ['FIREFLY_IMPORT_URL']
SECRET_SOCKET_PATH = os.environ.get('SECRET_SOCKET_PATH')
XDG_RUNTIME_DIR = os.environ.get('XDG_RUNTIME_DIR')
ACCOUNTS = {
'commerce': {
@ -71,6 +72,40 @@ class FireflyImporter:
r.raise_for_status()
class SecretsClient:
def __init__(self) -> None:
self.sock: socket.socket
def __enter__(self) -> 'SecretsClient':
if not hasattr(self, 'sock'):
self.connect()
return self
def __exit__(
self,
exc_type: Optional[Type[Exception]],
exc_value: Optional[Exception],
tb: Optional[TracebackType],
) -> bool:
self.sock.close()
return False
def connect(self) -> None:
if SECRET_SOCKET_PATH:
path = Path(SECRET_SOCKET_PATH)
elif XDG_RUNTIME_DIR:
path = Path(XDG_RUNTIME_DIR) / 'secretsocket/.ss'
else:
path = '.secretsocket'
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.connect(str(path))
def get_secret(self, key: str) -> bytes:
self.sock.send(f'{key}\n'.encode('utf-8'))
buf = self.sock.recv(64 * 2**10)
return buf.rstrip(b'\n')
def ntfy(
message: Optional[str] = None,
topic: str = NTFY_TOPIC,
@ -111,57 +146,6 @@ def ntfy(
r.raise_for_status()
def rbw_unlocked() -> bool:
log.debug('Checking if rbw vault is locked')
cmd = ['rbw', 'unlocked']
p = subprocess.run(cmd, check=False, stdout=subprocess.DEVNULL)
unlocked = p.returncode == 0
log.info('rbw vault is %s', 'unlocked' if unlocked else 'locked')
return unlocked
def rbw_get(
name: str, folder: Optional[str] = None, username: Optional[str] = None
) -> str:
log.info(
'Getting password for Bitwarden vault item '
'%s (folder: %s, username: %s)',
name,
folder,
username,
)
cmd = ['rbw', 'get']
if folder is not None:
cmd += ('--folder', folder)
cmd.append(name)
if username is not None:
cmd.append(username)
p = subprocess.run(cmd, check=True, capture_output=True, encoding='utf-8')
assert p.stdout is not None
return p.stdout.rstrip('\n')
def rbw_code(
name: str, folder: Optional[str] = None, username: Optional[str] = None
) -> str:
log.info(
'Getting OTP code for Bitwarden vault item '
'%s (folder: %s, username: %s)',
name,
folder,
username,
)
cmd = ['rbw', 'code']
if folder is not None:
cmd += ('--folder', folder)
cmd.append(name)
if username is not None:
cmd.append(username)
p = subprocess.run(cmd, check=True, capture_output=True, encoding='utf-8')
assert p.stdout is not None
return p.stdout.rstrip('\n')
def rfc2047_base64encode(
message: str,
) -> str:
@ -169,13 +153,6 @@ def rfc2047_base64encode(
return f"=?UTF-8?B?{encoded}?="
def secret_from_file(env: str, default: str) -> str:
filename = os.environ.get(env, default)
log.debug('Loading secret value from %s', filename)
with open(filename, 'r', encoding='utf-8') as f:
return f.read().rstrip()
def get_last_transaction_date(key: int, token: str) -> datetime.date:
url = f'{FIREFLY_III_URL}/api/v1/accounts/{key}/transactions'
r = requests.get(
@ -205,9 +182,13 @@ def get_last_transaction_date(key: int, token: str) -> datetime.date:
def download_chase(
page: Page, end_date: datetime.date, token: str, importer: FireflyImporter
page: Page,
secrets: SecretsClient,
end_date: datetime.date,
token: str,
importer: FireflyImporter,
) -> bool:
with Chase(page) as c, ntfyerror('Chase', page) as r:
with Chase(page, secrets) as c, ntfyerror('Chase', page) as r:
key = ACCOUNTS['chase']
try:
start_date = get_last_transaction_date(key, token)
@ -232,13 +213,16 @@ def download_chase(
def download_commerce(
page: Page,
secrets: SecretsClient,
end_date: datetime.date,
token: str,
importer: FireflyImporter,
) -> bool:
log.info('Downloading transaction lists from Commerce Bank')
csvs = []
with CommerceBank(page) as c, ntfyerror('Commerce Bank', page) as r:
with CommerceBank(page, secrets) as c, ntfyerror(
'Commerce Bank', page
) as r:
for name, key in ACCOUNTS['commerce'].items():
try:
start_date = get_last_transaction_date(key, token)
@ -367,11 +351,9 @@ class CommerceBank:
'ignore_duplicate_transactions': True,
}
def __init__(self, page: Page) -> None:
def __init__(self, page: Page, secrets: SecretsClient) -> None:
self.page = page
self.username = 'admiraln3mo'
self.vault_item = 'Commerce Bank'
self.vault_folder = 'Websites'
self.secrets = secrets
self._logged_in = False
def __enter__(self) -> 'CommerceBank':
@ -390,11 +372,10 @@ class CommerceBank:
return
log.debug('Navigating to %s', self.URL)
self.page.goto(self.URL)
password = rbw_get(self.vault_item, self.vault_folder, self.username)
username = self.secrets.get_secret('bank.commerce.username').decode()
password = self.secrets.get_secret('bank.commerce.password').decode()
log.debug('Filling username/password login form')
self.page.get_by_role('textbox', name='Customer ID').fill(
self.username
)
self.page.get_by_role('textbox', name='Customer ID').fill(username)
self.page.get_by_role('textbox', name='Password').fill(password)
self.page.get_by_role('button', name='Log In').click()
log.debug('Waiting for OTP 2FA form')
@ -402,7 +383,7 @@ class CommerceBank:
otp_input.wait_for()
self.page.wait_for_timeout(random.randint(1000, 3000))
log.debug('Filling OTP 2FA form')
otp = rbw_code(self.vault_item, self.vault_folder, self.username)
otp = self.secrets.get_secret('bank.commerce.otp').decode()
otp_input.fill(otp)
with self.page.expect_event('load'):
self.page.get_by_role('button', name='Continue').click()
@ -526,11 +507,9 @@ class Chase:
'ignore_duplicate_transactions': True,
}
def __init__(self, page: Page) -> None:
def __init__(self, page: Page, secrets: SecretsClient) -> None:
self.page = page
self.username = 'AdmiralN3mo'
self.vault_item = 'Chase'
self.vault_folder = 'Websites'
self.secrets = secrets
self.saved_cookies = Path('cookies.json')
self._logged_in = False
@ -579,18 +558,14 @@ class Chase:
self.page.goto(self.URL)
self.page.wait_for_load_state()
self.page.wait_for_timeout(random.randint(2000, 4000))
password = rbw_get(self.vault_item, self.vault_folder, self.username)
username = self.secrets.get_secret('bank.chase.username').decode()
password = self.secrets.get_secret('bank.chase.password').decode()
log.debug('Filling username/password login form')
self.page.frame_locator('#logonbox').get_by_label('Username').fill(
self.username
)
self.page.frame_locator('#logonbox').get_by_label('Password').fill(
password
)
logonbox = self.page.frame_locator('#logonbox')
logonbox.get_by_label('Username').fill(username)
logonbox.get_by_label('Password').fill(password)
self.page.wait_for_timeout(random.randint(500, 750))
self.page.frame_locator('#logonbox').get_by_role(
'button', name='Sign in'
).click()
logonbox.get_by_role('button', name='Sign in').click()
log.debug('Waiting for page load')
self.page.wait_for_load_state()
self.page.get_by_role('button', name='Pay Card').wait_for(
@ -673,30 +648,30 @@ class Chase:
def main() -> None:
logging.basicConfig(level=logging.DEBUG)
log.debug('Getting Firefly III access token from rbw vault')
token = rbw_get('xactfetch')
import_secret = secret_from_file(
'FIREFLY_IMPORT_SECRET_FILE', 'import.secret'
)
secrets = SecretsClient()
secrets.connect()
log.debug('Getting Firefly III access token')
token = secrets.get_secret('firefly.token').decode()
import_secret = secrets.get_secret('firefly.import.secret').decode()
import_auth = (
os.environ.get('FIREFLY_IMPORT_USER', getpass.getuser()),
secret_from_file('FIREFLY_IMPORT_PASSWORD_FILE', 'import.password'),
secrets.get_secret('firefly.import.username').decode(),
secrets.get_secret('firefly.import.password').decode(),
)
importer = FireflyImporter(
FIREFLY_III_IMPORTER_URL, import_secret, import_auth
)
end_date = datetime.date.today() - datetime.timedelta(days=1)
with sync_playwright() as pw:
with sync_playwright() as pw, secrets:
headless = os.environ.get('DEBUG_HEADLESS_BROWSER', '1') == '1'
browser = pw.firefox.launch(headless=headless)
page = browser.new_page()
failed = False
banks = sys.argv[1:] or list(ACCOUNTS.keys())
if 'commerce' in banks:
if not download_commerce(page, end_date, token, importer):
if not download_commerce(page, secrets, end_date, token, importer):
failed = True
if 'chase' in banks:
if not download_chase(page, end_date, token, importer):
if not download_chase(page, secrets, end_date, token, importer):
failed = True
raise SystemExit(1 if failed else 0)