#!/usr/bin/env python3 import base64 import json import logging import os import ssl import urllib.error import urllib.request from pathlib import Path try: from systemd.journal import JournalHandler as LogHandler except ModuleNotFoundError: LogHandler = logging.StreamHandler log = logging.getLogger('fetchcert') CREDENTIALS_DIRECTORY = Path(os.environ['CREDENTIALS_DIRECTORY']) CA_FILE = Path('/etc/pki/ca-trust/kube-root-ca.crt') CERT_OUT = Path('/etc/nextcloud/postgresql.cer') KEY_OUT = Path('/etc/nextcloud/postgresql.key') BASE_URL = 'https://kubernetes.pyrocufflink.blue:6443' NAMESPACE = 'postgresql-ca' SECRET = 'nextcloud-client' def main(): if 'LOG_LEVEL' in os.environ: logging.root.setLevel(os.environ['LOG_LEVEL'].upper()) logging.root.addHandler(LogHandler()) token_path = CREDENTIALS_DIRECTORY / 'nextcloud.fetchcert.token' log.debug('Reading token from %s', token_path) token = token_path.read_text().rstrip() ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.load_verify_locations(cafile=CA_FILE) url = f'{BASE_URL}/api/v1/namespaces/{NAMESPACE}/secrets/{SECRET}' headers = { 'Authorization': f'Bearer {token}' } req = urllib.request.Request(url, headers=headers) log.info('Fetching Secret from %s', url) try: res = urllib.request.urlopen(req, context=ctx) except urllib.error.HTTPError as e: log.error('%s %s', e, e.read()) raise SystemExit(1) with res: body = json.load(res) log.info('Received HTTP reponse %d %s', res.status, res.reason) cert = base64.b64decode(body['data']['tls.crt'].encode('ascii')) key = base64.b64decode(body['data']['tls.key'].encode('ascii')) cert_new = CERT_OUT.with_suffix(f'{CERT_OUT.suffix}.new') key_new = KEY_OUT.with_suffix(f'{KEY_OUT.suffix}.new') if not CERT_OUT.exists() or CERT_OUT.read_bytes() != cert: log.debug('Writing certificate to %s', cert_new) with cert_new.open('wb') as f: os.fchown(f.fileno(), 0, 48) os.fchmod(f.fileno(), 0o0444) f.write(cert) if not KEY_OUT.exists() or KEY_OUT.read_bytes() != key: log.debug('Writing private key to %s', key_new) with key_new.open('wb') as f: os.fchown(f.fileno(), 0, 48) os.fchmod(f.fileno(), 0o0440) f.write(key) if cert_new.exists(): log.debug('Renaming %s to %s', cert_new, CERT_OUT) cert_new.rename(CERT_OUT) if key_new.exists(): log.debug('Renaming %s to %s', key_new, KEY_OUT) key_new.rename(KEY_OUT) log.info('Successfully fetched certificate from Kubernetes Secret') if __name__ == '__main__': main()