317 lines
8.5 KiB
Python
317 lines
8.5 KiB
Python
#!/usr/bin/env python
|
||
from typing import (
|
||
Mapping,
|
||
Optional,
|
||
Sequence,
|
||
)
|
||
import argparse
|
||
import json
|
||
import locale
|
||
import logging
|
||
import os
|
||
import subprocess
|
||
import sys
|
||
import tempfile
|
||
|
||
|
||
log = logging.getLogger('bwpass')
|
||
|
||
|
||
LOG_FORMAT = (
|
||
'%(asctime)s [%(name)s] %(threadName)s %(levelname)s %(message)s'
|
||
)
|
||
PINENTRY_DEBUG = os.environ.get('PINENTRY_DEBUG')
|
||
XDG_CACHE_HOME = os.environ.get(
|
||
'XDG_CACHE_HOME',
|
||
os.path.expanduser('~/.cache'),
|
||
)
|
||
XDG_RUNTIME_DIR = os.environ.get('XDG_RUNTIME_DIR')
|
||
|
||
BITWARDENCLI_APPDATA_DIR = os.environ.get(
|
||
'BITWARDENCLI_APPDATA_DIR',
|
||
os.path.join(XDG_CACHE_HOME, 'bitwarden'),
|
||
)
|
||
|
||
|
||
class BitwardenError(Exception):
|
||
pass
|
||
|
||
|
||
class LockedVaultError(BitwardenError):
|
||
pass
|
||
|
||
|
||
class PinentryError(Exception):
|
||
pass
|
||
|
||
|
||
class MainArguments:
|
||
|
||
quiet: int
|
||
verbose: int
|
||
args: Sequence[str]
|
||
|
||
|
||
class Pinentry:
|
||
|
||
def __init__(
|
||
self,
|
||
title: str = None,
|
||
description: str = None,
|
||
prompt: str = None,
|
||
) -> None:
|
||
self.title = title
|
||
self.description = description
|
||
self.prompt = prompt
|
||
|
||
def getpin(self):
|
||
codec = locale.getpreferredencoding()
|
||
tty = os.ttyname(sys.stdin.fileno())
|
||
p = subprocess.Popen(
|
||
['pinentry', '-T', tty],
|
||
stdin=subprocess.PIPE,
|
||
stdout=subprocess.PIPE,
|
||
)
|
||
|
||
def getline():
|
||
line = p.stdout.readline().rstrip(b'\n').decode(codec)
|
||
if PINENTRY_DEBUG:
|
||
log.debug('pinentry: %s', line)
|
||
if line.startswith('ERR'):
|
||
raise PinentryError(line)
|
||
return line
|
||
|
||
def putline(line):
|
||
p.stdin.write(line.encode(codec) + b'\n')
|
||
p.stdin.flush()
|
||
|
||
try:
|
||
with p.stdin, p.stdout:
|
||
getline()
|
||
if self.title:
|
||
putline(f'SETTITLE {self.title}')
|
||
getline()
|
||
if self.description:
|
||
putline(f'SETDESC {self.description}')
|
||
getline()
|
||
if self.prompt:
|
||
putline(f'SETPROMPT {self.prompt}')
|
||
getline()
|
||
putline('GETPIN')
|
||
d = getline()
|
||
if not d.startswith('D '):
|
||
raise PinentryError(
|
||
f'Unexpected response from pinentry: {d}'
|
||
)
|
||
pin = d[2:]
|
||
getline()
|
||
finally:
|
||
p.wait()
|
||
return pin
|
||
|
||
|
||
class Vault:
|
||
|
||
def __init__(self) -> None:
|
||
self.items: Mapping[str, str] = {}
|
||
self.session_id: Optional[str] = None
|
||
|
||
self.bw_data = os.path.join(BITWARDENCLI_APPDATA_DIR, 'data.json')
|
||
self.cache = os.path.join(BITWARDENCLI_APPDATA_DIR, 'bwpass.json')
|
||
|
||
def __enter__(self) -> 'Vault':
|
||
return self
|
||
|
||
def __exit__(self, exc_type, exc_value, tb):
|
||
with open(self.cache, 'w') as f:
|
||
json.dump(self.items, f)
|
||
|
||
@property
|
||
def _bw_session_file(self) -> str:
|
||
uid = os.getuid()
|
||
if XDG_RUNTIME_DIR:
|
||
try:
|
||
st = os.stat(XDG_RUNTIME_DIR)
|
||
except FileNotFoundError:
|
||
pass
|
||
else:
|
||
if st.st_uid == uid:
|
||
return os.path.join(XDG_RUNTIME_DIR, '.bw_session')
|
||
else:
|
||
return os.path.join(XDG_RUNTIME_DIR, f'.bw_session-{uid}')
|
||
return os.path.join(tempfile.gettempdir(), f'.bw_session-{uid}')
|
||
|
||
@classmethod
|
||
def load(cls) -> 'Vault':
|
||
self = cls()
|
||
try:
|
||
st = os.stat(self.cache)
|
||
st2 = os.stat(self.bw_data)
|
||
except FileNotFoundError:
|
||
pass
|
||
else:
|
||
if st.st_mtime >= st2.st_mtime:
|
||
self.load_cache()
|
||
return self
|
||
self.unlock()
|
||
try:
|
||
self.get_items()
|
||
except LockedVaultError:
|
||
self.lock()
|
||
self.unlock()
|
||
self.get_items()
|
||
return self
|
||
|
||
def get_items(self):
|
||
folders: Mapping[str, str] = {}
|
||
for folder in json.loads(self._run('list', 'folders')):
|
||
folders[folder['id']] = folder['name']
|
||
for item in json.loads(self._run('list', 'items')):
|
||
if item['type'] != 1:
|
||
continue
|
||
if item['folderId'] is None:
|
||
key = item['name']
|
||
else:
|
||
key = f'{folders[item["folderId"]]}/{item["name"]}'
|
||
self.items[key] = item['id']
|
||
|
||
def get_password(self, item):
|
||
item_id = self.items[item]
|
||
log.debug('Getting password for %s', item_id)
|
||
if not self.session_id:
|
||
self.unlock()
|
||
return self._run('get', 'password', item_id)
|
||
|
||
def load_cache(self) -> None:
|
||
log.debug('Loading items from cache')
|
||
with open(self.cache) as f:
|
||
self.items = json.load(f)
|
||
|
||
def lock(self) -> None:
|
||
fn = self._bw_session_file
|
||
try:
|
||
os.unlink(fn)
|
||
except FileNotFoundError:
|
||
pass
|
||
self.session_id = None
|
||
|
||
def unlock(self) -> None:
|
||
fn = self._bw_session_file
|
||
try:
|
||
with open(fn) as f:
|
||
log.debug('Loading session ID from %s', fn)
|
||
self.session_id = f.readline().rstrip('\n')
|
||
return
|
||
except FileNotFoundError:
|
||
pass
|
||
|
||
pinentry = Pinentry(
|
||
title='Unlock Vault',
|
||
description='Enter the master password to unlock the Vault',
|
||
prompt='Master password:',
|
||
)
|
||
for __ in range(3):
|
||
try:
|
||
log.debug('Getting master password')
|
||
password = pinentry.getpin()
|
||
self.session_id = self._run('unlock', '--raw', stdin=password)
|
||
except PinentryError as e:
|
||
log.error('Failed to get master password: %s', e)
|
||
break
|
||
except BitwardenError as e:
|
||
log.error('Error unlocking vault: %s', e)
|
||
else:
|
||
break
|
||
if self.session_id:
|
||
fd = os.open(fn, os.O_WRONLY | os.O_CREAT, 0o0400)
|
||
with open(fd, 'w') as f:
|
||
f.write(self.session_id)
|
||
|
||
def _run(self, *args, stdin: str = None):
|
||
stdin_bytes = stdin.encode('utf-8') if stdin else None
|
||
cmd = ['bw']
|
||
cmd += args
|
||
env = os.environ.copy()
|
||
env['BITWARDENCLI_APPDATA_DIR'] = BITWARDENCLI_APPDATA_DIR
|
||
if self.session_id:
|
||
env['BW_SESSION'] = self.session_id
|
||
log.debug('Running command: %s', cmd)
|
||
p = subprocess.Popen(
|
||
cmd,
|
||
stdin=subprocess.PIPE if stdin else subprocess.DEVNULL,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.DEVNULL,
|
||
env=env,
|
||
)
|
||
data = p.communicate(stdin_bytes)[0].decode('utf-8')
|
||
if p.returncode != 0:
|
||
msg = data.rstrip('\n')
|
||
if msg == 'Vault is locked.':
|
||
raise LockedVaultError(msg)
|
||
else:
|
||
raise BitwardenError(msg)
|
||
return data
|
||
|
||
|
||
def parse_args():
|
||
parser = argparse.ArgumentParser()
|
||
|
||
g_verb = parser.add_mutually_exclusive_group()
|
||
g_verb.add_argument(
|
||
'--verbose', '-v',
|
||
action='count',
|
||
default=0,
|
||
)
|
||
g_verb.add_argument(
|
||
'--quiet', '-q',
|
||
action='count',
|
||
default=0,
|
||
)
|
||
|
||
parser.add_argument(
|
||
'args',
|
||
nargs='*',
|
||
)
|
||
|
||
return parser.parse_args(namespace=MainArguments())
|
||
|
||
|
||
def setup_logging(verbose: int = 0) -> None:
|
||
if verbose < -1:
|
||
level = logging.ERROR
|
||
elif verbose < 0:
|
||
level = logging.WARNING
|
||
elif verbose < 1:
|
||
level = logging.INFO
|
||
else:
|
||
level = logging.DEBUG
|
||
handler = logging.StreamHandler()
|
||
handler.setLevel(level)
|
||
handler.setFormatter(logging.Formatter(LOG_FORMAT))
|
||
logger = logging.getLogger()
|
||
logger.setLevel(level)
|
||
logger.addHandler(handler)
|
||
|
||
|
||
def main():
|
||
args = parse_args()
|
||
|
||
setup_logging(args.verbose or args.quiet * -1)
|
||
|
||
with Vault.load() as vault:
|
||
if args.args:
|
||
if len(args.args) == 1:
|
||
args.args.insert(0, 'show')
|
||
if args.args[0] == 'show':
|
||
item = args.args[1]
|
||
try:
|
||
print(vault.get_password(item), end='')
|
||
except KeyError:
|
||
print('No such item:', item, file=sys.stderr)
|
||
else:
|
||
print('\n'.join(sorted(vault.items)))
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|