#!/usr/bin/env python import abc import argparse import logging import functools import os import re import subprocess import tempfile import threading import tomllib import urllib.parse from pathlib import Path from typing import ClassVar, Literal, Union, Optional import colorlog import pydantic import git import requests import ruamel.yaml log = logging.getLogger('updatebot') TRACE = 5 logging.addLevelName(TRACE, 'TRACE') XDG_CONFIG_HOME = ( Path(os.environ['XDG_CONFIG_HOME']) if 'XDG_CONFIG_HOME' in os.environ else Path('~/.config').expanduser() ) tls = threading.local() def _get_session() -> requests.Session: if hasattr(tls, 'session'): session = tls.session else: log.debug('Starting new HTTP/HTTPS client session') session = tls.session = requests.Session() return session class BaseSource(abc.ABC, pydantic.BaseModel): @abc.abstractmethod def get_latest_version(self) -> str: raise NotImplementedError class GithubSource(BaseSource): kind: Literal['github'] organization: str repo: str version_re: str = r'(?P[0-9]+(\.[0-9]+)*(-[0-9]+)?)' def get_latest_version(self) -> str: session = _get_session() url = ( f'https://api.github.com/repos/' f'{self.organization}/{self.repo}/releases/latest' ) r = session.get(url) release = r.json() m = re.search(self.version_re, release['name']) if not m: log.warning( 'Release name "%s" did not match regular expression "%s"', release['name'], self.version_re, ) return release['name'] return m.groupdict()['version'] class DockerHubSource(BaseSource): kind: Literal['docker'] namespace: str repository: str version_re: str = r'^(?P[0-9]+(\.[0-9]+)*(-[0-9]+)?)$' def get_latest_version(self) -> str: session = _get_session() url = ( f'https://hub.docker.com/v2/' f'namespaces/{self.namespace}/repositories/{self.repository}/tags' ) r = session.get(url) data = r.json() versions = [] regex = re.compile(self.version_re) for result in data['results']: m = regex.match(result['name']) if not m: log.debug( 'Skipping tag %s: does not match regex %s', result['name'], self.version_re, ) continue version = m.groupdict()['version'] versions.append((result['last_updated'], version)) versions.sort() return versions[-1][-1] Source = Union[ GithubSource, DockerHubSource, ] class BaseProject(abc.ABC, pydantic.BaseModel): path: Optional[Path] = None source: Source image: str tag_format: str = '{version}' @abc.abstractmethod def apply_update(self, path: Path, version: str) -> None: raise NotImplementedError @abc.abstractmethod def diff(self, path: Path) -> str: raise NotImplementedError class KustomizeProject(BaseProject): kind: Literal['kustomize'] kustomize_files: ClassVar[list[str]] = [ 'kustomization.yaml', 'kustomization.yml', 'Kustomization', ] def apply_update(self, path: Path, version: str) -> None: for filename in self.kustomize_files: filepath = path / filename if filepath.is_file(): break else: filepath = path / self.kustomize_files[0] yaml = ruamel.yaml.YAML() with filepath.open('rb') as f: kustomization = yaml.load(f) images = kustomization.setdefault('images', []) new_tag = self.tag_format.format(version=version) for image in images: if image['name'] == self.image: image['newTag'] = new_tag break else: images.append({'name': self.image, 'newTag': new_tag}) with filepath.open('wb') as f: yaml.dump(kustomization, f) def diff(self, path: Path) -> str: try: p = subprocess.run( ['kubectl', 'diff', '-k', '.'], cwd=path, check=False, capture_output=True, stdin=subprocess.DEVNULL, encoding='utf-8', ) except FileNotFoundError as e: log.warning('Cannot compute manifest diff: %s', e) return '' if not p.stdout and p.returncode != 0: log.error('Error computing manifest diff: %s', p.stderr) return '' else: assert p.stdout is not None return p.stdout class DirectoryProject(BaseProject): kind: Literal['dir'] | Literal['directory'] Project = Union[ KustomizeProject, DirectoryProject, ] class RepoConfig(pydantic.BaseModel): url: str token_file: Optional[Path] = None branch: str = 'master' @functools.cached_property def repo_api_url(self) -> str: urlparts = urllib.parse.urlsplit(self.url) urlparts = urlparts._replace(path=f'/api/v1/repos{urlparts.path}') return urllib.parse.urlunsplit(urlparts) @functools.cached_property def auth_token(self) -> Optional[str]: if self.token_file: return self.token_file.read_text().strip() def get_git_url(self) -> str: session = _get_session() headers = {} if token := self.auth_token: headers['Authorization'] = f'Bearer {token}' r = session.get( self.repo_api_url, headers=headers, ) data = r.json() if ssh_url := data.get('ssh_url'): return ssh_url return data['clone_url'] def create_pr( self, title: str, source_branch: str, target_branch: str, body: Optional[str] = None, ) -> None: session = _get_session() r = session.post( f'{self.repo_api_url}/pulls', headers={ 'Authorization': f'token {self.auth_token}', }, json={ 'title': title, 'base': target_branch, 'head': source_branch, 'body': body, }, ) log.log(TRACE, '%r', r.content) if 300 > r.status_code >= 200: data = r.json() log.info('Created pull request: %s', data['url']) elif r.status_code == 409: data = r.json() log.warning('%s', data['message']) elif r.status_code < 500: data = r.json() log.error('Failed to create PR: %s', data['message']) else: log.error('Failed to create PR: %r', r.content) class Config(pydantic.BaseModel): repo: RepoConfig projects: dict[str, Project] class Arguments: config: Path branch_name: str dry_run: bool projects: list[str] def update_project( repo: git.Repo, name: str, project: Project ) -> tuple[Optional[git.Commit], Optional[str]]: basedir = Path(repo.working_dir) log.debug('Checking for latest version of %s', name) latest = project.source.get_latest_version() log.info('Found version %s for %s', latest, name) log.debug('Applying update for %s version %s', name, latest) path = basedir / (project.path or name) project.apply_update(path, latest) if repo.index.diff(None): log.debug('Committing changes to %s', path) repo.index.add(str(path)) c = repo.index.commit(f'{name}: Update to {latest}') log.info('Commited %s %s', str(c)[:7], c.summary) log.debug('Computing manifest diff') diff = project.diff(path) return (c, diff) else: log.info('No changes to commit') return (None, None) def parse_args() -> Arguments: parser = argparse.ArgumentParser() parser.add_argument( '--config', '-c', type=Path, default=XDG_CONFIG_HOME / 'updatebot' / 'config.toml', ) parser.add_argument('--branch-name', '-b', default='updatebot') parser.add_argument('--dry-run', '-n', action='store_true', default=False) parser.add_argument('projects', metavar='project', nargs='*', default=[]) return parser.parse_args(namespace=Arguments()) def setup_logging() -> None: handler = colorlog.StreamHandler() handler.setFormatter( colorlog.ColoredFormatter( '%(log_color)s%(levelname)8s%(reset)s ' '%(bold_white)s%(name)s%(reset)s ' '%(message)s', log_colors={ 'TRACE': 'purple', 'DEBUG': 'blue', 'INFO': 'green', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'bold_red', }, ) ) handler.setLevel(os.environ.get('LOG_LEVEL', 'DEBUG').upper()) logging.root.addHandler(handler) logging.root.setLevel(logging.DEBUG) log.setLevel(TRACE) def main() -> None: setup_logging() args = parse_args() with args.config.open('rb') as f: data = tomllib.load(f) config = Config.model_validate(data) log.debug('Using configuration: %s', config) all_projects = list(config.projects.keys()) projects = [] if args.projects: for project in args.projects: if project not in all_projects: log.warning('Unknown project: %s', project) continue projects.append(project) else: projects = all_projects if not projects: log.error('No projects') raise SystemExit(1) if log.isEnabledFor(logging.INFO): log.info('Updating projects: %s', ', '.join(projects)) with tempfile.TemporaryDirectory(prefix='updatebot.') as d: log.debug('Using temporary working directory: %s', d) d = Path(d) log.debug('Retreiving repository Git URL') repo_url = config.repo.get_git_url() repo = git.Repo.clone_from(repo_url, d, depth=1, b=config.repo.branch) log.debug('Checking out new branch: %s', args.branch_name) repo.heads[0].checkout(force=True, B=args.branch_name) title = None pr_desc = '' for project in projects: commit, diff = update_project( repo, project, config.projects[project] ) if commit and not title: if not isinstance(commit.summary, str): title = bytes(commit.summary).decode( 'utf-8', errors='replace' ) else: title = commit.summary if diff: pr_desc += diff if not title: log.info('No changes made') return if not args.dry_run: repo.head.reference.set_tracking_branch( git.RemoteReference( repo, f'refs/remotes/origin/{args.branch_name}' ) ) repo.remote().push(force=True) if pr_desc: pr_desc = ( '
\nManifest diff\n\n' f'```diff\n{pr_desc}```\n' '
' ) config.repo.create_pr( title, args.branch_name, config.repo.branch, pr_desc ) elif pr_desc: print(pr_desc) if __name__ == '__main__': main()