84 lines
2.7 KiB
Python
84 lines
2.7 KiB
Python
#!/usr/bin/env python3
|
|
import base64
|
|
import json
|
|
import logging
|
|
import os
|
|
import ssl
|
|
import urllib.error
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
try:
|
|
from systemd.journal import JournalHandler as LogHandler
|
|
except ModuleNotFoundError:
|
|
LogHandler = logging.StreamHandler
|
|
|
|
|
|
log = logging.getLogger('fetchcert')
|
|
|
|
|
|
CREDENTIALS_DIRECTORY = Path(os.environ['CREDENTIALS_DIRECTORY'])
|
|
|
|
CA_FILE = Path('/etc/pki/ca-trust/kube-root-ca.crt')
|
|
CERT_OUT = Path('/etc/nextcloud/postgresql.cer')
|
|
KEY_OUT = Path('/etc/nextcloud/postgresql.key')
|
|
|
|
BASE_URL = 'https://kubernetes.pyrocufflink.blue:6443'
|
|
NAMESPACE = 'postgresql-ca'
|
|
SECRET = 'nextcloud-client'
|
|
|
|
|
|
def main():
|
|
if 'LOG_LEVEL' in os.environ:
|
|
logging.root.setLevel(os.environ['LOG_LEVEL'].upper())
|
|
logging.root.addHandler(LogHandler())
|
|
|
|
token_path = CREDENTIALS_DIRECTORY / 'nextcloud.fetchcert.token'
|
|
log.debug('Reading token from %s', token_path)
|
|
token = token_path.read_text().rstrip()
|
|
|
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
ctx.load_verify_locations(cafile=CA_FILE)
|
|
|
|
url = f'{BASE_URL}/api/v1/namespaces/{NAMESPACE}/secrets/{SECRET}'
|
|
headers = {
|
|
'Authorization': f'Bearer {token}'
|
|
}
|
|
req = urllib.request.Request(url, headers=headers)
|
|
log.info('Fetching Secret from %s', url)
|
|
try:
|
|
res = urllib.request.urlopen(req, context=ctx)
|
|
except urllib.error.HTTPError as e:
|
|
log.error('%s %s', e, e.read())
|
|
raise SystemExit(1)
|
|
with res:
|
|
body = json.load(res)
|
|
log.info('Received HTTP reponse %d %s', res.status, res.reason)
|
|
cert = base64.b64decode(body['data']['tls.crt'].encode('ascii'))
|
|
key = base64.b64decode(body['data']['tls.key'].encode('ascii'))
|
|
cert_new = CERT_OUT.with_suffix(f'{CERT_OUT.suffix}.new')
|
|
key_new = KEY_OUT.with_suffix(f'{KEY_OUT.suffix}.new')
|
|
if not CERT_OUT.exists() or CERT_OUT.read_bytes() != cert:
|
|
log.debug('Writing certificate to %s', cert_new)
|
|
with cert_new.open('wb') as f:
|
|
os.fchown(f.fileno(), 0, 48)
|
|
os.fchmod(f.fileno(), 0o0444)
|
|
f.write(cert)
|
|
if not KEY_OUT.exists() or KEY_OUT.read_bytes() != key:
|
|
log.debug('Writing private key to %s', key_new)
|
|
with key_new.open('wb') as f:
|
|
os.fchown(f.fileno(), 0, 48)
|
|
os.fchmod(f.fileno(), 0o0440)
|
|
f.write(key)
|
|
if cert_new.exists():
|
|
log.debug('Renaming %s to %s', cert_new, CERT_OUT)
|
|
cert_new.rename(CERT_OUT)
|
|
if key_new.exists():
|
|
log.debug('Renaming %s to %s', key_new, KEY_OUT)
|
|
key_new.rename(KEY_OUT)
|
|
log.info('Successfully fetched certificate from Kubernetes Secret')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|