#!/usr/bin/env python import abc import argparse import logging import functools import os import re import tempfile import threading import tomllib import urllib.parse from pathlib import Path from typing import ClassVar, Literal, Union, Optional import colorlog import pydantic import git import requests import ruamel.yaml log = logging.getLogger('updatebot') TRACE = 5 logging.addLevelName(TRACE, 'TRACE') XDG_CONFIG_HOME = ( Path(os.environ['XDG_CONFIG_HOME']) if 'XDG_CONFIG_HOME' in os.environ else Path('~/.config').expanduser() ) tls = threading.local() def _get_session() -> requests.Session: if hasattr(tls, 'session'): session = tls.session else: log.debug('Starting new HTTP/HTTPS client session') session = tls.session = requests.Session() return session class BaseSource(abc.ABC, pydantic.BaseModel): @abc.abstractmethod def get_latest_version(self) -> str: raise NotImplementedError class GithubSource(BaseSource): kind: Literal['github'] organization: str repo: str version_re: str = r'(?P[0-9]+(\.[0-9]+)*(-[0-9]+)?)' def get_latest_version(self) -> str: session = _get_session() url = ( f'https://api.github.com/repos/' f'{self.organization}/{self.repo}/releases/latest' ) r = session.get(url) release = r.json() m = re.search(self.version_re, release['name']) if not m: log.warning( 'Release name "%s" did not match regular expression "%s"', release['name'], self.version_re, ) return release['name'] return m.groupdict()['version'] class DockerHubSource(BaseSource): kind: Literal['docker'] namespace: str repository: str def get_latest_version(self) -> str: session = _get_session() url = ( f'https://hub.docker.com/v2/' f'namespaces/{self.namespace}/repositories/{self.repository}/tags' ) r = session.get(url) data = r.json() versions = [] for result in data['results']: if result['name'] == 'latest': continue versions.append((result['last_updated'], result['name'])) versions.sort() return versions[-1][-1] Source = Union[ GithubSource, DockerHubSource, ] class BaseProject(abc.ABC, pydantic.BaseModel): path: Optional[Path] = None source: Source image: str tag_format: str = '{version}' @abc.abstractmethod def apply_update(self, path: Path, version: str) -> None: raise NotImplementedError class KustomizeProject(BaseProject): kind: Literal['kustomize'] kustomize_files: ClassVar[list[str]] = [ 'kustomization.yaml', 'kustomization.yml', 'Kustomization', ] def apply_update(self, path: Path, version: str) -> None: for filename in self.kustomize_files: filepath = path / filename if filepath.is_file(): break else: filepath = path / self.kustomize_files[0] yaml = ruamel.yaml.YAML() with filepath.open('rb') as f: kustomization = yaml.load(f) images = kustomization.setdefault('images', []) new_tag = self.tag_format.format(version=version) for image in images: if image['name'] == self.image: image['newTag'] = new_tag break else: images.append({'name': self.image, 'newTag': new_tag}) with filepath.open('wb') as f: yaml.dump(kustomization, f) class DirectoryProject(BaseProject): kind: Literal['dir'] | Literal['directory'] Project = Union[ KustomizeProject, DirectoryProject, ] class RepoConfig(pydantic.BaseModel): url: str token_file: Path branch: str = 'master' @functools.cached_property def repo_api_url(self) -> str: urlparts = urllib.parse.urlsplit(self.url) urlparts = urlparts._replace(path=f'/api/v1/repos{urlparts.path}') return urllib.parse.urlunsplit(urlparts) @functools.cached_property def auth_token(self) -> str: return self.token_file.read_text().strip() def get_git_url(self) -> str: session = _get_session() r = session.get( self.repo_api_url, headers={ 'Authorization': f'token {self.auth_token}', }, ) data = r.json() if ssh_url := data.get('ssh_url'): return ssh_url return data['clone_url'] def create_pr( self, title: str, source_branch: str, target_branch: str ) -> None: session = _get_session() r = session.post( f'{self.repo_api_url}/pulls', headers={ 'Authorization': f'token {self.auth_token}', }, json={ 'title': title, 'base': target_branch, 'head': source_branch, }, ) log.log(TRACE, '%r', r.content) if 300 > r.status_code >= 200: data = r.json() log.info('Created pull request: %s', data['url']) elif r.status_code == 409: data = r.json() log.warning('%s', data['message']) elif r.status_code < 500: data = r.json() log.error('Failed to create PR: %s', data['message']) else: log.error('Failed to create PR: %r', r.content) class Config(pydantic.BaseModel): repo: RepoConfig projects: dict[str, Project] class Arguments: config: Path branch_name: str dry_run: bool projects: list[str] def update_project( repo: git.Repo, name: str, project: Project ) -> Optional[git.Commit]: basedir = Path(repo.working_dir) log.debug('Checking for latest version of %s', name) latest = project.source.get_latest_version() log.info('Found version %s for %s', latest, name) log.debug('Applying update for %s version %s', name, latest) path = basedir / (project.path or name) project.apply_update(path, latest) if repo.index.diff(None): log.debug('Committing changes to %s', path) repo.index.add(str(path)) c = repo.index.commit(f'{name}: Update to {latest}') log.info('Commited %s %s', str(c)[:7], c.summary) return c else: log.info('No changes to commit') return None def parse_args() -> Arguments: parser = argparse.ArgumentParser() parser.add_argument( '--config', '-c', type=Path, default=XDG_CONFIG_HOME / 'updatebot' / 'config.toml', ) parser.add_argument('--branch-name', '-b', default='updatebot') parser.add_argument('--dry-run', '-n', action='store_true', default=False) parser.add_argument('projects', metavar='project', nargs='*', default=[]) return parser.parse_args(namespace=Arguments()) def setup_logging() -> None: handler = colorlog.StreamHandler() handler.setFormatter( colorlog.ColoredFormatter( '%(log_color)s%(levelname)8s%(reset)s ' '%(bold_white)s%(name)s%(reset)s ' '%(message)s', log_colors={ 'TRACE': 'purple', 'DEBUG': 'blue', 'INFO': 'green', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'bold_red', }, ) ) handler.setLevel(os.environ.get('LOG_LEVEL', 'DEBUG').upper()) logging.root.addHandler(handler) logging.root.setLevel(logging.DEBUG) log.setLevel(TRACE) def main() -> None: setup_logging() args = parse_args() with args.config.open('rb') as f: data = tomllib.load(f) config = Config.model_validate(data) log.debug('Using configuration: %s', config) all_projects = list(config.projects.keys()) projects = [] if args.projects: for project in args.projects: if project not in all_projects: log.warning('Unknown project: %s', project) continue projects.append(project) else: projects = all_projects if not projects: log.error('No projects') raise SystemExit(1) if log.isEnabledFor(logging.INFO): log.info('Updating projects: %s', ', '.join(projects)) with tempfile.TemporaryDirectory(prefix='updatebot.') as d: log.debug('Using temporary working directory: %s', d) d = Path(d) log.debug('Retreiving repository Git URL') repo_url = config.repo.get_git_url() repo = git.Repo.clone_from(repo_url, d, depth=1, b=config.repo.branch) log.debug('Checking out new branch: %s', args.branch_name) repo.heads[0].checkout(force=True, B=args.branch_name) title = None for project in projects: commit = update_project(repo, project, config.projects[project]) if commit and not title: if not isinstance(commit.summary, str): title = bytes(commit.summary).decode( 'utf-8', errors='replace' ) else: title = commit.summary if not title: log.info('No changes made') return repo.head.reference.set_tracking_branch( git.RemoteReference( repo, f'refs/remotes/origin/{args.branch_name}' ) ) if not args.dry_run: repo.remote().push(force=True) config.repo.create_pr(title, args.branch_name, config.repo.branch) if __name__ == '__main__': main()