Initial commit

master
Dustin 2023-04-23 16:28:28 -05:00
commit 9d6b54fc2b
2 changed files with 154 additions and 0 deletions

19
Containerfile Normal file
View File

@ -0,0 +1,19 @@
FROM registry.fedoraproject.org/fedora-minimal:38
RUN microdnf install -y \
git-core \
openssh-clients \
python3 \
python3-GitPython \
python3-kubernetes \
python3-ruamel-yaml \
&& microdnf clean all \
&& groupadd -g 1000 cert-exporter \
&& useradd -m -u 1000 -g 1000 cert-exporter \
&& :
USER 1000:1000
COPY cert-exporter.py /usr/local/bin/cert-exporter
ENTRYPOINT ["/usr/local/bin/cert-exporter"]

135
cert-exporter.py Executable file
View File

@ -0,0 +1,135 @@
#!/usr/bin/env python3
import base64
import dataclasses
import logging
import os
import tempfile
from typing import Optional
from pathlib import Path
import git
import kubernetes.client
import kubernetes.config
import ruamel.yaml
log = logging.getLogger("cert-exporter")
DEBUG = os.environ.get("CERT_EXPORTER_DEBUG") == "1"
CONFIG_FILE = os.environ.get(
"CERT_EXPORTER_CONFIG",
"/etc/cert-exporter/config.yml",
)
@dataclasses.dataclass
class CertConfig:
name: str
namespace: str
key: Path
cert: Path
bundle: Optional[Path] = None
@dataclasses.dataclass
class Configuration:
git_repo: str
certs: list[CertConfig] = dataclasses.field(default_factory=list)
@classmethod
def load(cls, path: Optional[Path] = None) -> "Configuration":
if path is None:
path = Path(CONFIG_FILE)
with path.open("r", encoding="utf-8") as f:
values = ruamel.yaml.safe_load(f)
config = Configuration(
git_repo=values["git_repo"],
)
for cert in values.get("certs", ()):
config.certs.append(
CertConfig(
name=cert["name"],
namespace=cert["namespace"],
key=Path(cert["key"]),
cert=Path(cert["cert"]),
bundle=Path(cert.get("bundle")),
)
)
return config
def update_cert(cert: CertConfig, api: kubernetes.client.ApiClient) -> None:
core = kubernetes.client.CoreV1Api(api)
log.info(
"Fetching certificate from Secret %s in namespace %s",
cert.name,
cert.namespace,
)
try:
secret = core.read_namespaced_secret(cert.name, cert.namespace)
except kubernetes.client.ApiException as e:
log.error(
"Could not get certificate from Secret %s in namespace %s: %s",
cert.name,
cert.namespace,
e,
)
return
key = base64.b64decode(secret.data["tls.key"])
crt = base64.b64decode(secret.data["tls.crt"])
if not cert.key.parent.exists():
cert.key.parent.mkdir(parents=True)
with cert.key.open("wb") as f:
log.info("Writing certificate private key to %s", f.name)
f.write(key)
if not cert.cert.parent.exists():
cert.cert.parent.mkdir(parents=True)
with cert.cert.open("wb") as f:
log.info("Writing certificate to %s", f.name)
f.write(crt)
if cert.bundle is not None:
if not cert.bundle.parent.exists():
cert.bundle.parent.mkdir(parents=True)
with cert.bundle.open("wb") as f:
log.info("Writing certificate bundle to %s", f.name)
f.write(key)
f.write(crt)
def main():
logging.basicConfig(
level=logging.DEBUG if DEBUG else logging.INFO,
)
logging.getLogger("kubernetes.client.rest").setLevel(logging.INFO)
config = Configuration.load()
kubernetes.config.load_config()
with tempfile.TemporaryDirectory() as d:
os.chdir(d)
log.debug("Cloning Git repo %s to %s", config.git_repo, d)
repo = git.Repo.clone_from(config.git_repo, d)
with kubernetes.client.ApiClient() as k:
log.debug("Using Kubernetes API server %s", k.configuration.host)
for cert in config.certs:
try:
update_cert(cert, k)
except Exception as e:
log.error(
"Failed to update certificate %s: %s", cert.name, e
)
continue
if repo.is_dirty():
log.info("Committing updated certificates")
for diff in repo.index.diff(None):
repo.index.add(diff.b_path)
repo.index.commit("Update certificates")
log.info("Pushing new refs to origin remote")
repo.remotes.origin.push().raise_if_error()
log.info("Successfully updated certificates")
else:
log.info("No certificates to update")
if __name__ == "__main__":
main()