cert-exporter/cert-exporter.py

136 lines
4.1 KiB
Python
Executable File

#!/usr/bin/env python3
import base64
import dataclasses
import logging
import os
import tempfile
from typing import Optional
from pathlib import Path
import git
import kubernetes.client
import kubernetes.config
import ruamel.yaml
log = logging.getLogger("cert-exporter")
DEBUG = os.environ.get("CERT_EXPORTER_DEBUG") == "1"
CONFIG_FILE = os.environ.get(
"CERT_EXPORTER_CONFIG",
"/etc/cert-exporter/config.yml",
)
@dataclasses.dataclass
class CertConfig:
name: str
namespace: str
key: Path
cert: Path
bundle: Optional[Path] = None
@dataclasses.dataclass
class Configuration:
git_repo: str
certs: list[CertConfig] = dataclasses.field(default_factory=list)
@classmethod
def load(cls, path: Optional[Path] = None) -> "Configuration":
if path is None:
path = Path(CONFIG_FILE)
with path.open("r", encoding="utf-8") as f:
values = ruamel.yaml.safe_load(f)
config = Configuration(
git_repo=values["git_repo"],
)
for cert in values.get("certs", ()):
config.certs.append(
CertConfig(
name=cert["name"],
namespace=cert["namespace"],
key=Path(cert["key"]),
cert=Path(cert["cert"]),
bundle=Path(cert.get("bundle")),
)
)
return config
def update_cert(cert: CertConfig, api: kubernetes.client.ApiClient) -> None:
core = kubernetes.client.CoreV1Api(api)
log.info(
"Fetching certificate from Secret %s in namespace %s",
cert.name,
cert.namespace,
)
try:
secret = core.read_namespaced_secret(cert.name, cert.namespace)
except kubernetes.client.ApiException as e:
log.error(
"Could not get certificate from Secret %s in namespace %s: %s",
cert.name,
cert.namespace,
e,
)
return
key = base64.b64decode(secret.data["tls.key"])
crt = base64.b64decode(secret.data["tls.crt"])
if not cert.key.parent.exists():
cert.key.parent.mkdir(parents=True)
with cert.key.open("wb") as f:
log.info("Writing certificate private key to %s", f.name)
f.write(key)
if not cert.cert.parent.exists():
cert.cert.parent.mkdir(parents=True)
with cert.cert.open("wb") as f:
log.info("Writing certificate to %s", f.name)
f.write(crt)
if cert.bundle is not None:
if not cert.bundle.parent.exists():
cert.bundle.parent.mkdir(parents=True)
with cert.bundle.open("wb") as f:
log.info("Writing certificate bundle to %s", f.name)
f.write(key)
f.write(crt)
def main():
logging.basicConfig(
level=logging.DEBUG if DEBUG else logging.INFO,
)
logging.getLogger("kubernetes.client.rest").setLevel(logging.INFO)
config = Configuration.load()
kubernetes.config.load_config()
with tempfile.TemporaryDirectory() as d:
os.chdir(d)
log.debug("Cloning Git repo %s to %s", config.git_repo, d)
repo = git.Repo.clone_from(config.git_repo, d)
with kubernetes.client.ApiClient() as k:
log.debug("Using Kubernetes API server %s", k.configuration.host)
for cert in config.certs:
try:
update_cert(cert, k)
except Exception as e:
log.error(
"Failed to update certificate %s: %s", cert.name, e
)
continue
if repo.is_dirty():
log.info("Committing updated certificates")
for diff in repo.index.diff(None):
repo.index.add(diff.b_path)
repo.index.commit("Update certificates")
log.info("Pushing new refs to origin remote")
repo.remotes.origin.push().raise_if_error()
log.info("Successfully updated certificates")
else:
log.info("No certificates to update")
if __name__ == "__main__":
main()