diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..f7baea4 --- /dev/null +++ b/.flake8 @@ -0,0 +1,2 @@ +[flake8] +ignore = E121,E125 diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000..d37bb0d --- /dev/null +++ b/.pylintrc @@ -0,0 +1,5 @@ +[MESSAGES CONTROL] +disable = + invalid-name, + missing-docstring, + too-few-public-methods, diff --git a/setup.py b/setup.py index 0a87484..104428d 100644 --- a/setup.py +++ b/setup.py @@ -13,4 +13,9 @@ setup( setup_requires=[ 'setuptools_scm', ], + entry_points={ + 'console_scripts': [ + 'bwpass=bwpass:main', + ], + }, ) diff --git a/src/bwpass.py b/src/bwpass.py index e69de29..baf7488 100644 --- a/src/bwpass.py +++ b/src/bwpass.py @@ -0,0 +1,280 @@ +#!/usr/bin/env python +from typing import ( + Mapping, + Optional, + Sequence, +) +import argparse +import json +import locale +import logging +import os +import subprocess +import sys +import tempfile + + +log = logging.getLogger('bwpass') + + +LOG_FORMAT = ( + '%(asctime)s [%(name)s] %(threadName)s %(levelname)s %(message)s' +) +PINENTRY_DEBUG = os.environ.get('PINENTRY_DEBUG') +XDG_CACHE_HOME = os.environ.get( + 'XDG_CACHE_HOME', + os.path.expanduser('~/.cache'), +) + +BITWARDENCLI_APPDATA_DIR = os.environ.get( + 'BITWARDENCLI_APPDATA_DIR', + os.path.join(XDG_CACHE_HOME, 'bitwarden'), +) + + +class BitwardenError(Exception): + pass + + +class PinentryError(Exception): + pass + + +class MainArguments: + + quiet: int + verbose: int + args: Sequence[str] + + +class Pinentry: + + def __init__( + self, + title: str = None, + description: str = None, + prompt: str = None, + ) -> None: + self.title = title + self.description = description + self.prompt = prompt + + def getpin(self): + codec = locale.getpreferredencoding() + p = subprocess.Popen( + ['pinentry'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + ) + + def getline(): + line = p.stdout.readline().rstrip(b'\n').decode(codec) + if PINENTRY_DEBUG: + log.debug('pinentry: %s', line) + if line.startswith('ERR'): + raise PinentryError(line) + return line + + def putline(line): + p.stdin.write(line.encode(codec) + b'\n') + p.stdin.flush() + + try: + with p.stdin, p.stdout: + getline() + if self.title: + putline(f'SETTITLE {self.title}') + getline() + if self.description: + putline(f'SETDESC {self.description}') + getline() + if self.prompt: + putline(f'SETTITLE {self.title}') + getline() + putline('GETPIN') + d = getline() + if not d.startswith('D '): + raise PinentryError( + f'Unexpected response from pinentry: {d}' + ) + pin = d[2:] + getline() + finally: + p.wait() + return pin + + +class Vault: + + def __init__(self) -> None: + self.items: Mapping[str, str] = {} + self.session_id: Optional[str] = None + + self.bw_data = os.path.join(BITWARDENCLI_APPDATA_DIR, 'data.json') + self.cache = os.path.join(BITWARDENCLI_APPDATA_DIR, 'bwpass.json') + + def __enter__(self) -> 'Vault': + return self + + def __exit__(self, exc_type, exc_value, tb): + with open(self.cache, 'w') as f: + json.dump(self.items, f) + + @classmethod + def load(cls) -> 'Vault': + self = cls() + try: + st = os.stat(self.cache) + st2 = os.stat(self.bw_data) + except FileNotFoundError: + pass + else: + if st.st_mtime >= st2.st_mtime: + self.load_cache() + return self + self.unlock() + self.get_items() + return self + + def get_items(self): + folders: Mapping[str, str] = {} + for folder in json.loads(self._run('list', 'folders')): + folders[folder['id']] = folder['name'] + for item in json.loads(self._run('list', 'items')): + if item['folderId'] is None: + key = item['name'] + else: + key = f'{folders[item["folderId"]]}/{item["name"]}' + self.items[key] = item['id'] + + def get_password(self, item): + item_id = self.items[item] + log.debug('Getting password for %s', item_id) + if not self.session_id: + self.unlock() + return self._run('get', 'password', item_id) + + def load_cache(self) -> None: + log.debug('Loading items from cache') + with open(self.cache) as f: + self.items = json.load(f) + + def unlock(self) -> None: + uid = os.getuid() + fn = os.path.join( + tempfile.gettempdir(), + f'.bw_session-{uid}', + ) + try: + with open(fn) as f: + log.debug('Loading session ID from %s', fn) + self.session_id = f.readline().rstrip('\n') + return + except FileNotFoundError: + pass + + pinentry = Pinentry( + title='Unlock Vault', + description='Enter the master password to unlock the Vault', + prompt='Master password:', + ) + for __ in range(3): + try: + log.debug('Getting master password') + password = pinentry.getpin() + self.session_id = self._run('unlock', '--raw', stdin=password) + except PinentryError as e: + log.error('Failed to get master password: %s', e) + break + except BitwardenError as e: + log.error('Error unlocking vault: %s', e) + else: + break + if self.session_id: + fd = os.open(fn, os.O_WRONLY | os.O_CREAT, 0o0400) + with open(fd, 'w') as f: + f.write(self.session_id) + + def _run(self, *args, stdin: str = None): + stdin_bytes = stdin.encode('utf-8') if stdin else None + cmd = ['bw'] + cmd += args + env = os.environ.copy() + env['BITWARDENCLI_APPDATA_DIR'] = BITWARDENCLI_APPDATA_DIR + if self.session_id: + env['BW_SESSION'] = self.session_id + log.debug('Running command: %s', cmd) + p = subprocess.Popen( + cmd, + stdin=subprocess.PIPE if stdin else subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + env=env, + ) + data = p.communicate(stdin_bytes)[0].decode('utf-8') + if p.returncode != 0: + raise BitwardenError(data.rstrip('\n')) + return data + + +def parse_args(): + parser = argparse.ArgumentParser() + + g_verb = parser.add_mutually_exclusive_group() + g_verb.add_argument( + '--verbose', '-v', + action='count', + default=0, + ) + g_verb.add_argument( + '--quiet', '-q', + action='count', + default=0, + ) + + parser.add_argument( + 'args', + nargs='*', + ) + + return parser.parse_args(namespace=MainArguments()) + + +def setup_logging(verbose: int = 0) -> None: + if verbose < -1: + level = logging.ERROR + elif verbose < 0: + level = logging.WARNING + elif verbose < 1: + level = logging.INFO + else: + level = logging.DEBUG + handler = logging.StreamHandler() + handler.setLevel(level) + handler.setFormatter(logging.Formatter(LOG_FORMAT)) + logger = logging.getLogger() + logger.setLevel(level) + logger.addHandler(handler) + + +def main(): + args = parse_args() + + setup_logging(args.verbose or args.quiet * -1) + + with Vault.load() as vault: + if args.args: + if len(args.args) == 1: + args.args.insert(0, 'show') + if args.args[0] == 'show': + item = args.args[1] + try: + print(vault.get_password(item), end='') + except KeyError: + print('No such item:', item, file=sys.stderr) + else: + print('\n'.join(sorted(vault.items))) + + +if __name__ == '__main__': + main()