#!/usr/bin/env python import abc import argparse import logging import functools import os import re import subprocess import tempfile import threading import urllib.parse from pathlib import Path from typing import ClassVar, Iterable, Literal, Union, Optional import colorlog import pydantic import git import requests import ruamel.yaml log = logging.getLogger('updatebot') TRACE = 5 logging.addLevelName(TRACE, 'TRACE') XDG_CONFIG_HOME = ( Path(os.environ['XDG_CONFIG_HOME']) if 'XDG_CONFIG_HOME' in os.environ else Path('~/.config').expanduser() ) tls = threading.local() def _get_session() -> requests.Session: if hasattr(tls, 'session'): session = tls.session else: log.debug('Starting new HTTP/HTTPS client session') session = tls.session = requests.Session() return session class BaseSource(abc.ABC, pydantic.BaseModel): @abc.abstractmethod def get_latest_version(self) -> str: raise NotImplementedError class GithubSource(BaseSource): kind: Literal['github'] organization: str repo: str version_re: str = r'(?P[0-9]+(\.[0-9]+)*(-[0-9]+)?)' def get_latest_version(self) -> str: session = _get_session() url = ( f'https://api.github.com/repos/' f'{self.organization}/{self.repo}/releases/latest' ) r = session.get(url) release = r.json() m = re.search(self.version_re, release['name']) if not m: log.warning( 'Release name "%s" did not match regular expression "%s"', release['name'], self.version_re, ) return release['name'] return m.groupdict()['version'] class DockerHubSource(BaseSource): kind: Literal['docker'] namespace: str repository: str version_re: str = r'^(?P[0-9]+(\.[0-9]+)*(-[0-9]+)?)$' def get_latest_version(self) -> str: session = _get_session() url = ( f'https://hub.docker.com/v2/' f'namespaces/{self.namespace}/repositories/{self.repository}/tags' ) r = session.get(url) data = r.json() versions = [] regex = re.compile(self.version_re) for result in data['results']: m = regex.match(result['name']) if not m: log.debug( 'Skipping tag %s: does not match regex %s', result['name'], self.version_re, ) continue version = m.groupdict()['version'] versions.append((result['last_updated'], version)) versions.sort() return versions[-1][-1] Source = Union[ GithubSource, DockerHubSource, ] class ImageDef(pydantic.BaseModel): name: str image: str source: Source tag_format: str = '{version}' class BaseProject(abc.ABC, pydantic.BaseModel): name: str path: Path def __init__(self, **data) -> None: data.setdefault('path', data.get('name')) super().__init__(**data) @abc.abstractmethod def apply_updates(self, basedir: Path) -> Iterable[tuple[ImageDef, str]]: raise NotImplementedError @abc.abstractmethod def resource_diff(self, basedir: Path) -> Optional[str]: raise NotImplementedError class KustomizeProject(BaseProject): kind: Literal['kustomize'] images: list[ImageDef] kustomize_files: ClassVar[list[str]] = [ 'kustomization.yaml', 'kustomization.yml', 'Kustomization', ] def apply_updates(self, basedir: Path) -> Iterable[tuple[ImageDef, str]]: path = basedir / self.path for filename in self.kustomize_files: filepath = path / filename if filepath.is_file(): break else: raise ValueError( f'Could not find Kustomize config for {self.name}' ) for image in self.images: yaml = ruamel.yaml.YAML() with filepath.open('rb') as f: kustomization = yaml.load(f) images = kustomization.setdefault('images', []) version = image.source.get_latest_version() new_tag = image.tag_format.format(version=version) for i in images: if i['name'] == image.image: i['newTag'] = new_tag break else: images.append({'name': image.image, 'newTag': new_tag}) with filepath.open('wb') as f: yaml.dump(kustomization, f) yield (image, version) def resource_diff(self, basedir: Path) -> Optional[str]: path = basedir / self.path cmd = ['kubectl', 'diff', '-k', path] try: p = subprocess.run( cmd, stdin=subprocess.DEVNULL, capture_output=True, check=False, encoding='utf-8', ) except FileNotFoundError as e: log.warning('Cannot generate resource diff: %s', e) return None if p.returncode != 0 and not p.stdout: log.error('Failed to generate resource diff: %s', p.stderr) return None return p.stdout Project = KustomizeProject class RepoConfig(pydantic.BaseModel): url: str token_file: Optional[Path] = None branch: str = 'master' @functools.cached_property def repo_api_url(self) -> str: urlparts = urllib.parse.urlsplit(self.url) urlparts = urlparts._replace(path=f'/api/v1/repos{urlparts.path}') return urllib.parse.urlunsplit(urlparts) @functools.cached_property def auth_token(self) -> Optional[str]: if self.token_file: return self.token_file.read_text().strip() def get_git_url(self) -> str: session = _get_session() headers = {} if token := self.auth_token: headers['Authorization'] = f'Bearer {token}' r = session.get( self.repo_api_url, headers=headers, ) data = r.json() if ssh_url := data.get('ssh_url'): return ssh_url return data['clone_url'] def create_pr( self, title: str, source_branch: str, target_branch: str, body: Optional[str] = None, ) -> None: session = _get_session() r = session.post( f'{self.repo_api_url}/pulls', headers={ 'Authorization': f'token {self.auth_token}', }, json={ 'title': title, 'base': target_branch, 'head': source_branch, 'body': body, }, ) log.log(TRACE, '%r', r.content) if 300 > r.status_code >= 200: data = r.json() log.info('Created pull request: %s', data['url']) elif r.status_code == 409: data = r.json() log.warning('%s', data['message']) elif r.status_code < 500: data = r.json() log.error('Failed to create PR: %s', data['message']) else: log.error('Failed to create PR: %r', r.content) class Config(pydantic.BaseModel): repo: RepoConfig projects: list[Project] class Arguments: config: Path dry_run: bool projects: list[str] def update_project( repo: git.Repo, project: Project ) -> tuple[Optional[str], Optional[str]]: basedir = Path(repo.working_dir) title = None for image, version in project.apply_updates(basedir): log.info('Updating %s to %s', image.name, version) if repo.index.diff(None): log.debug('Committing changes to %s', project.path) repo.index.add(str(project.path)) c = repo.index.commit(f'{image.name}: Update to {version}') log.info('Commited %s %s', str(c)[:7], c.summary) if not title: if not isinstance(c.summary, str): title = bytes(c.summary).decode('utf-8') else: title = c.summary else: log.info('No changes to commit') diff = project.resource_diff(basedir) return title, diff def parse_args() -> Arguments: parser = argparse.ArgumentParser() parser.add_argument( '--config', '-c', type=Path, default=XDG_CONFIG_HOME / 'updatebot' / 'config.toml', ) parser.add_argument('--dry-run', '-n', action='store_true', default=False) parser.add_argument('projects', metavar='project', nargs='*', default=[]) return parser.parse_args(namespace=Arguments()) def setup_logging() -> None: handler = colorlog.StreamHandler() handler.setFormatter( colorlog.ColoredFormatter( '%(log_color)s%(levelname)8s%(reset)s ' '%(bold_white)s%(name)s%(reset)s ' '%(message)s', log_colors={ 'TRACE': 'purple', 'DEBUG': 'blue', 'INFO': 'green', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'bold_red', }, ) ) handler.setLevel(os.environ.get('LOG_LEVEL', 'DEBUG').upper()) logging.root.addHandler(handler) logging.root.setLevel(logging.DEBUG) log.setLevel(TRACE) def main() -> None: setup_logging() args = parse_args() yaml = ruamel.yaml.YAML() with args.config.open('rb') as f: data = yaml.load(f) config = Config.model_validate(data) log.debug('Using configuration: %s', config) projects = args.projects or [p.name for p in config.projects] if log.isEnabledFor(logging.INFO): log.info('Updating projects: %s', ', '.join(projects)) with tempfile.TemporaryDirectory(prefix='updatebot.') as d: log.debug('Using temporary working directory: %s', d) d = Path(d) log.debug('Retreiving repository Git URL') repo_url = config.repo.get_git_url() repo = git.Repo.clone_from(repo_url, d, depth=1, b=config.repo.branch) for project in config.projects: branch_name = f'updatebot/{project.name}' log.debug('Checking out new branch: %s', branch_name) repo.heads[0].checkout(force=True, B=branch_name) title = None description = None if project.name not in projects: continue title, diff = update_project(repo, project) if not title: log.info('No changes made') continue if diff: description = ( '
\nResource diff\n\n' f'```diff\n{diff}```\n' '
' ) if not args.dry_run: repo.head.reference.set_tracking_branch( git.RemoteReference( repo, f'refs/remotes/origin/{branch_name}' ) ) repo.remote().push(force=True) config.repo.create_pr( title, branch_name, config.repo.branch, description, ) else: print( 'Would create PR', f'{branch_name} → {config.repo.branch}:', title, ) print(description or '') if __name__ == '__main__': main()