136 lines
4.1 KiB
Python
Executable File
136 lines
4.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import base64
|
|
import dataclasses
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
from typing import Optional
|
|
from pathlib import Path
|
|
|
|
import git
|
|
import kubernetes.client
|
|
import kubernetes.config
|
|
import ruamel.yaml
|
|
|
|
|
|
log = logging.getLogger("cert-exporter")
|
|
|
|
|
|
DEBUG = os.environ.get("CERT_EXPORTER_DEBUG") == "1"
|
|
CONFIG_FILE = os.environ.get(
|
|
"CERT_EXPORTER_CONFIG",
|
|
"/etc/cert-exporter/config.yml",
|
|
)
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class CertConfig:
|
|
name: str
|
|
namespace: str
|
|
key: Path
|
|
cert: Path
|
|
bundle: Optional[Path] = None
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Configuration:
|
|
git_repo: str
|
|
certs: list[CertConfig] = dataclasses.field(default_factory=list)
|
|
|
|
@classmethod
|
|
def load(cls, path: Optional[Path] = None) -> "Configuration":
|
|
if path is None:
|
|
path = Path(CONFIG_FILE)
|
|
with path.open("r", encoding="utf-8") as f:
|
|
values = ruamel.yaml.safe_load(f)
|
|
config = Configuration(
|
|
git_repo=values["git_repo"],
|
|
)
|
|
for cert in values.get("certs", ()):
|
|
config.certs.append(
|
|
CertConfig(
|
|
name=cert["name"],
|
|
namespace=cert["namespace"],
|
|
key=Path(cert["key"]),
|
|
cert=Path(cert["cert"]),
|
|
bundle=Path(cert.get("bundle")),
|
|
)
|
|
)
|
|
return config
|
|
|
|
|
|
def update_cert(cert: CertConfig, api: kubernetes.client.ApiClient) -> None:
|
|
core = kubernetes.client.CoreV1Api(api)
|
|
log.info(
|
|
"Fetching certificate from Secret %s in namespace %s",
|
|
cert.name,
|
|
cert.namespace,
|
|
)
|
|
try:
|
|
secret = core.read_namespaced_secret(cert.name, cert.namespace)
|
|
except kubernetes.client.ApiException as e:
|
|
log.error(
|
|
"Could not get certificate from Secret %s in namespace %s: %s",
|
|
cert.name,
|
|
cert.namespace,
|
|
e,
|
|
)
|
|
return
|
|
key = base64.b64decode(secret.data["tls.key"])
|
|
crt = base64.b64decode(secret.data["tls.crt"])
|
|
if not cert.key.parent.exists():
|
|
cert.key.parent.mkdir(parents=True)
|
|
with cert.key.open("wb") as f:
|
|
log.info("Writing certificate private key to %s", f.name)
|
|
f.write(key)
|
|
if not cert.cert.parent.exists():
|
|
cert.cert.parent.mkdir(parents=True)
|
|
with cert.cert.open("wb") as f:
|
|
log.info("Writing certificate to %s", f.name)
|
|
f.write(crt)
|
|
if cert.bundle is not None:
|
|
if not cert.bundle.parent.exists():
|
|
cert.bundle.parent.mkdir(parents=True)
|
|
with cert.bundle.open("wb") as f:
|
|
log.info("Writing certificate bundle to %s", f.name)
|
|
f.write(key)
|
|
f.write(crt)
|
|
|
|
|
|
def main():
|
|
logging.basicConfig(
|
|
level=logging.DEBUG if DEBUG else logging.INFO,
|
|
)
|
|
logging.getLogger("kubernetes.client.rest").setLevel(logging.INFO)
|
|
|
|
config = Configuration.load()
|
|
kubernetes.config.load_config()
|
|
with tempfile.TemporaryDirectory() as d:
|
|
os.chdir(d)
|
|
log.debug("Cloning Git repo %s to %s", config.git_repo, d)
|
|
repo = git.Repo.clone_from(config.git_repo, d)
|
|
with kubernetes.client.ApiClient() as k:
|
|
log.debug("Using Kubernetes API server %s", k.configuration.host)
|
|
for cert in config.certs:
|
|
try:
|
|
update_cert(cert, k)
|
|
except Exception as e:
|
|
log.error(
|
|
"Failed to update certificate %s: %s", cert.name, e
|
|
)
|
|
continue
|
|
if repo.is_dirty():
|
|
log.info("Committing updated certificates")
|
|
for diff in repo.index.diff(None):
|
|
repo.index.add(diff.b_path)
|
|
repo.index.commit("Update certificates")
|
|
log.info("Pushing new refs to origin remote")
|
|
repo.remotes.origin.push().raise_if_error()
|
|
log.info("Successfully updated certificates")
|
|
else:
|
|
log.info("No certificates to update")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|