Currently, the certificate authority that issues certificates for PostgreSQL clients is hosted in Kubernetes and managed by _cert-manager_. Certificates it issues are stored in Kubernetes Secret resources, making them easy to consume by applications running in the cluster, but not for anything outside. Since Nextcloud runs on its own VM, we need a way to get the certificate out of the Secret and into a file on that machine. To that end, I've written the `nextcloud-fetch-cert.py` script. This script uses a Kubernetes Service Account token to authenticate to the Kubernetes API and download the contents of the Secret. It runs periodically, triggered by a systemd timer unit, to ensure the certificate is always up-to-date. The obvious drawback to this approach is the requirement for a static token. Since there's not really a way to "renew" Service Account tokens, it needs to be issued with a fairly long duration, to mitigate the risk of being unable to fetch a new certificate once it has expired because the token has also expired. This somewhat negates the advantage of using certificates for authentication, since now the machine needs a static, pre-defined secret. At some point, I may deploy another instance of _step-ca_ to manage the PostgreSQL client CA. Clients can then use e.g. `certbot` or `step ca certificate` to obtain their certificates. I chose not to implement this yet, though for a couple of reasons. First, I need to move the Nextcloud database very soon, so we switch to using `restic` for backups without having to deal with the database. Second, I am still considering moving Nextcloud into Kubernetes eventually, where it will be able to get the Secret directly; since Nextcloud is the only client outside the cluster, it may not be worth setting up _step-ca_ in that case.
84 lines
2.7 KiB
Python
84 lines
2.7 KiB
Python
#!/usr/bin/env python3
|
|
import base64
|
|
import json
|
|
import logging
|
|
import os
|
|
import ssl
|
|
import urllib.error
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
try:
|
|
from systemd.journal import JournalHandler as LogHandler
|
|
except ModuleNotFoundError:
|
|
LogHandler = logging.StreamHandler
|
|
|
|
|
|
log = logging.getLogger('fetchcert')
|
|
|
|
|
|
CREDENTIALS_DIRECTORY = Path(os.environ['CREDENTIALS_DIRECTORY'])
|
|
|
|
CA_FILE = Path('/etc/pki/ca-trust/kube-root-ca.crt')
|
|
CERT_OUT = Path('/etc/nextcloud/postgresql.cer')
|
|
KEY_OUT = Path('/etc/nextcloud/postgresql.key')
|
|
|
|
BASE_URL = 'https://kubernetes.pyrocufflink.blue:6443'
|
|
NAMESPACE = 'postgresql-ca'
|
|
SECRET = 'nextcloud-client'
|
|
|
|
|
|
def main():
|
|
if 'LOG_LEVEL' in os.environ:
|
|
logging.root.setLevel(os.environ['LOG_LEVEL'].upper())
|
|
logging.root.addHandler(LogHandler())
|
|
|
|
token_path = CREDENTIALS_DIRECTORY / 'nextcloud.fetchcert.token'
|
|
log.debug('Reading token from %s', token_path)
|
|
token = token_path.read_text().rstrip()
|
|
|
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
ctx.load_verify_locations(cafile=CA_FILE)
|
|
|
|
url = f'{BASE_URL}/api/v1/namespaces/{NAMESPACE}/secrets/{SECRET}'
|
|
headers = {
|
|
'Authorization': f'Bearer {token}'
|
|
}
|
|
req = urllib.request.Request(url, headers=headers)
|
|
log.info('Fetching Secret from %s', url)
|
|
try:
|
|
res = urllib.request.urlopen(req, context=ctx)
|
|
except urllib.error.HTTPError as e:
|
|
log.error('%s %s', e, e.read())
|
|
raise SystemExit(1)
|
|
with res:
|
|
body = json.load(res)
|
|
log.info('Received HTTP reponse %d %s', res.status, res.reason)
|
|
cert = base64.b64decode(body['data']['tls.crt'].encode('ascii'))
|
|
key = base64.b64decode(body['data']['tls.key'].encode('ascii'))
|
|
cert_new = CERT_OUT.with_suffix(f'{CERT_OUT.suffix}.new')
|
|
key_new = KEY_OUT.with_suffix(f'{KEY_OUT.suffix}.new')
|
|
if not CERT_OUT.exists() or CERT_OUT.read_bytes() != cert:
|
|
log.debug('Writing certificate to %s', cert_new)
|
|
with cert_new.open('wb') as f:
|
|
os.fchown(f.fileno(), 0, 48)
|
|
os.fchmod(f.fileno(), 0o0444)
|
|
f.write(cert)
|
|
if not KEY_OUT.exists() or KEY_OUT.read_bytes() != key:
|
|
log.debug('Writing private key to %s', key_new)
|
|
with key_new.open('wb') as f:
|
|
os.fchown(f.fileno(), 0, 48)
|
|
os.fchmod(f.fileno(), 0o0440)
|
|
f.write(key)
|
|
if cert_new.exists():
|
|
log.debug('Renaming %s to %s', cert_new, CERT_OUT)
|
|
cert_new.rename(CERT_OUT)
|
|
if key_new.exists():
|
|
log.debug('Renaming %s to %s', key_new, KEY_OUT)
|
|
key_new.rename(KEY_OUT)
|
|
log.info('Successfully fetched certificate from Kubernetes Secret')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|