configpolicy/roles/nextcloud-db-cert/files/fetch-cert.py

84 lines
2.7 KiB
Python

#!/usr/bin/env python3
import base64
import json
import logging
import os
import ssl
import urllib.error
import urllib.request
from pathlib import Path
try:
from systemd.journal import JournalHandler as LogHandler
except ModuleNotFoundError:
LogHandler = logging.StreamHandler
log = logging.getLogger('fetchcert')
CREDENTIALS_DIRECTORY = Path(os.environ['CREDENTIALS_DIRECTORY'])
CA_FILE = Path('/etc/pki/ca-trust/kube-root-ca.crt')
CERT_OUT = Path('/etc/nextcloud/postgresql.cer')
KEY_OUT = Path('/etc/nextcloud/postgresql.key')
BASE_URL = 'https://kubernetes.pyrocufflink.blue:6443'
NAMESPACE = 'postgresql-ca'
SECRET = 'nextcloud-client'
def main():
if 'LOG_LEVEL' in os.environ:
logging.root.setLevel(os.environ['LOG_LEVEL'].upper())
logging.root.addHandler(LogHandler())
token_path = CREDENTIALS_DIRECTORY / 'nextcloud.fetchcert.token'
log.debug('Reading token from %s', token_path)
token = token_path.read_text().rstrip()
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.load_verify_locations(cafile=CA_FILE)
url = f'{BASE_URL}/api/v1/namespaces/{NAMESPACE}/secrets/{SECRET}'
headers = {
'Authorization': f'Bearer {token}'
}
req = urllib.request.Request(url, headers=headers)
log.info('Fetching Secret from %s', url)
try:
res = urllib.request.urlopen(req, context=ctx)
except urllib.error.HTTPError as e:
log.error('%s %s', e, e.read())
raise SystemExit(1)
with res:
body = json.load(res)
log.info('Received HTTP reponse %d %s', res.status, res.reason)
cert = base64.b64decode(body['data']['tls.crt'].encode('ascii'))
key = base64.b64decode(body['data']['tls.key'].encode('ascii'))
cert_new = CERT_OUT.with_suffix(f'{CERT_OUT.suffix}.new')
key_new = KEY_OUT.with_suffix(f'{KEY_OUT.suffix}.new')
if not CERT_OUT.exists() or CERT_OUT.read_bytes() != cert:
log.debug('Writing certificate to %s', cert_new)
with cert_new.open('wb') as f:
os.fchown(f.fileno(), 0, 48)
os.fchmod(f.fileno(), 0o0444)
f.write(cert)
if not KEY_OUT.exists() or KEY_OUT.read_bytes() != key:
log.debug('Writing private key to %s', key_new)
with key_new.open('wb') as f:
os.fchown(f.fileno(), 0, 48)
os.fchmod(f.fileno(), 0o0440)
f.write(key)
if cert_new.exists():
log.debug('Renaming %s to %s', cert_new, CERT_OUT)
cert_new.rename(CERT_OUT)
if key_new.exists():
log.debug('Renaming %s to %s', key_new, KEY_OUT)
key_new.rename(KEY_OUT)
log.info('Successfully fetched certificate from Kubernetes Secret')
if __name__ == '__main__':
main()