This repository has been archived on 2023-11-14. You can view files and clone it, but cannot push or open issues/pull-requests.
bwpass/src/bwpass.py

281 lines
7.4 KiB
Python
Raw Blame History

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

#!/usr/bin/env python
from typing import (
Mapping,
Optional,
Sequence,
)
import argparse
import json
import locale
import logging
import os
import subprocess
import sys
import tempfile
log = logging.getLogger('bwpass')
LOG_FORMAT = (
'%(asctime)s [%(name)s] %(threadName)s %(levelname)s %(message)s'
)
PINENTRY_DEBUG = os.environ.get('PINENTRY_DEBUG')
XDG_CACHE_HOME = os.environ.get(
'XDG_CACHE_HOME',
os.path.expanduser('~/.cache'),
)
BITWARDENCLI_APPDATA_DIR = os.environ.get(
'BITWARDENCLI_APPDATA_DIR',
os.path.join(XDG_CACHE_HOME, 'bitwarden'),
)
class BitwardenError(Exception):
pass
class PinentryError(Exception):
pass
class MainArguments:
quiet: int
verbose: int
args: Sequence[str]
class Pinentry:
def __init__(
self,
title: str = None,
description: str = None,
prompt: str = None,
) -> None:
self.title = title
self.description = description
self.prompt = prompt
def getpin(self):
codec = locale.getpreferredencoding()
p = subprocess.Popen(
['pinentry'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
def getline():
line = p.stdout.readline().rstrip(b'\n').decode(codec)
if PINENTRY_DEBUG:
log.debug('pinentry: %s', line)
if line.startswith('ERR'):
raise PinentryError(line)
return line
def putline(line):
p.stdin.write(line.encode(codec) + b'\n')
p.stdin.flush()
try:
with p.stdin, p.stdout:
getline()
if self.title:
putline(f'SETTITLE {self.title}')
getline()
if self.description:
putline(f'SETDESC {self.description}')
getline()
if self.prompt:
putline(f'SETTITLE {self.title}')
getline()
putline('GETPIN')
d = getline()
if not d.startswith('D '):
raise PinentryError(
f'Unexpected response from pinentry: {d}'
)
pin = d[2:]
getline()
finally:
p.wait()
return pin
class Vault:
def __init__(self) -> None:
self.items: Mapping[str, str] = {}
self.session_id: Optional[str] = None
self.bw_data = os.path.join(BITWARDENCLI_APPDATA_DIR, 'data.json')
self.cache = os.path.join(BITWARDENCLI_APPDATA_DIR, 'bwpass.json')
def __enter__(self) -> 'Vault':
return self
def __exit__(self, exc_type, exc_value, tb):
with open(self.cache, 'w') as f:
json.dump(self.items, f)
@classmethod
def load(cls) -> 'Vault':
self = cls()
try:
st = os.stat(self.cache)
st2 = os.stat(self.bw_data)
except FileNotFoundError:
pass
else:
if st.st_mtime >= st2.st_mtime:
self.load_cache()
return self
self.unlock()
self.get_items()
return self
def get_items(self):
folders: Mapping[str, str] = {}
for folder in json.loads(self._run('list', 'folders')):
folders[folder['id']] = folder['name']
for item in json.loads(self._run('list', 'items')):
if item['folderId'] is None:
key = item['name']
else:
key = f'{folders[item["folderId"]]}/{item["name"]}'
self.items[key] = item['id']
def get_password(self, item):
item_id = self.items[item]
log.debug('Getting password for %s', item_id)
if not self.session_id:
self.unlock()
return self._run('get', 'password', item_id)
def load_cache(self) -> None:
log.debug('Loading items from cache')
with open(self.cache) as f:
self.items = json.load(f)
def unlock(self) -> None:
uid = os.getuid()
fn = os.path.join(
tempfile.gettempdir(),
f'.bw_session-{uid}',
)
try:
with open(fn) as f:
log.debug('Loading session ID from %s', fn)
self.session_id = f.readline().rstrip('\n')
return
except FileNotFoundError:
pass
pinentry = Pinentry(
title='Unlock Vault',
description='Enter the master password to unlock the Vault',
prompt='Master password:',
)
for __ in range(3):
try:
log.debug('Getting master password')
password = pinentry.getpin()
self.session_id = self._run('unlock', '--raw', stdin=password)
except PinentryError as e:
log.error('Failed to get master password: %s', e)
break
except BitwardenError as e:
log.error('Error unlocking vault: %s', e)
else:
break
if self.session_id:
fd = os.open(fn, os.O_WRONLY | os.O_CREAT, 0o0400)
with open(fd, 'w') as f:
f.write(self.session_id)
def _run(self, *args, stdin: str = None):
stdin_bytes = stdin.encode('utf-8') if stdin else None
cmd = ['bw']
cmd += args
env = os.environ.copy()
env['BITWARDENCLI_APPDATA_DIR'] = BITWARDENCLI_APPDATA_DIR
if self.session_id:
env['BW_SESSION'] = self.session_id
log.debug('Running command: %s', cmd)
p = subprocess.Popen(
cmd,
stdin=subprocess.PIPE if stdin else subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
env=env,
)
data = p.communicate(stdin_bytes)[0].decode('utf-8')
if p.returncode != 0:
raise BitwardenError(data.rstrip('\n'))
return data
def parse_args():
parser = argparse.ArgumentParser()
g_verb = parser.add_mutually_exclusive_group()
g_verb.add_argument(
'--verbose', '-v',
action='count',
default=0,
)
g_verb.add_argument(
'--quiet', '-q',
action='count',
default=0,
)
parser.add_argument(
'args',
nargs='*',
)
return parser.parse_args(namespace=MainArguments())
def setup_logging(verbose: int = 0) -> None:
if verbose < -1:
level = logging.ERROR
elif verbose < 0:
level = logging.WARNING
elif verbose < 1:
level = logging.INFO
else:
level = logging.DEBUG
handler = logging.StreamHandler()
handler.setLevel(level)
handler.setFormatter(logging.Formatter(LOG_FORMAT))
logger = logging.getLogger()
logger.setLevel(level)
logger.addHandler(handler)
def main():
args = parse_args()
setup_logging(args.verbose or args.quiet * -1)
with Vault.load() as vault:
if args.args:
if len(args.args) == 1:
args.args.insert(0, 'show')
if args.args[0] == 'show':
item = args.args[1]
try:
print(vault.get_password(item), end='')
except KeyError:
print('No such item:', item, file=sys.stderr)
else:
print('\n'.join(sorted(vault.items)))
if __name__ == '__main__':
main()