#!/usr/bin/env python import abc import argparse import logging import functools import os import re import tempfile import threading import urllib.parse from pathlib import Path from typing import ClassVar, Iterable, Literal, Union, Optional import colorlog import pydantic import git import requests import ruamel.yaml log = logging.getLogger('updatebot') TRACE = 5 logging.addLevelName(TRACE, 'TRACE') XDG_CONFIG_HOME = ( Path(os.environ['XDG_CONFIG_HOME']) if 'XDG_CONFIG_HOME' in os.environ else Path('~/.config').expanduser() ) tls = threading.local() def _get_session() -> requests.Session: if hasattr(tls, 'session'): session = tls.session else: log.debug('Starting new HTTP/HTTPS client session') session = tls.session = requests.Session() return session class BaseSource(abc.ABC, pydantic.BaseModel): @abc.abstractmethod def get_latest_version(self) -> str: raise NotImplementedError class GithubSource(BaseSource): kind: Literal['github'] organization: str repo: str version_re: str = r'(?P[0-9]+(\.[0-9]+)*(-[0-9]+)?)' def get_latest_version(self) -> str: session = _get_session() url = ( f'https://api.github.com/repos/' f'{self.organization}/{self.repo}/releases/latest' ) r = session.get(url) release = r.json() m = re.search(self.version_re, release['name']) if not m: log.warning( 'Release name "%s" did not match regular expression "%s"', release['name'], self.version_re, ) return release['name'] return m.groupdict()['version'] class DockerHubSource(BaseSource): kind: Literal['docker'] namespace: str repository: str version_re: str = r'^(?P[0-9]+(\.[0-9]+)*(-[0-9]+)?)$' def get_latest_version(self) -> str: session = _get_session() url = ( f'https://hub.docker.com/v2/' f'namespaces/{self.namespace}/repositories/{self.repository}/tags' ) r = session.get(url) data = r.json() versions = [] regex = re.compile(self.version_re) for result in data['results']: m = regex.match(result['name']) if not m: log.debug( 'Skipping tag %s: does not match regex %s', result['name'], self.version_re, ) continue version = m.groupdict()['version'] versions.append((result['last_updated'], version)) versions.sort() return versions[-1][-1] Source = Union[ GithubSource, DockerHubSource, ] class ImageDef(pydantic.BaseModel): name: str image: str source: Source tag_format: str = '{version}' class BaseProject(abc.ABC, pydantic.BaseModel): name: str path: Path def __init__(self, **data) -> None: data.setdefault('path', data.get('name')) super().__init__(**data) @abc.abstractmethod def apply_updates(self, basedir: Path) -> Iterable[tuple[ImageDef, str]]: raise NotImplementedError class KustomizeProject(BaseProject): kind: Literal['kustomize'] images: list[ImageDef] kustomize_files: ClassVar[list[str]] = [ 'kustomization.yaml', 'kustomization.yml', 'Kustomization', ] def apply_updates(self, basedir: Path) -> Iterable[tuple[ImageDef, str]]: path = basedir / self.path for filename in self.kustomize_files: filepath = path / filename if filepath.is_file(): break else: raise ValueError( f'Could not find Kustomize config for {self.name}' ) for image in self.images: yaml = ruamel.yaml.YAML() with filepath.open('rb') as f: kustomization = yaml.load(f) images = kustomization.setdefault('images', []) version = image.source.get_latest_version() new_tag = image.tag_format.format(version=version) for i in images: if i['name'] == image.image: i['newTag'] = new_tag break else: images.append({'name': image.image, 'newTag': new_tag}) with filepath.open('wb') as f: yaml.dump(kustomization, f) yield (image, version) Project = KustomizeProject class RepoConfig(pydantic.BaseModel): url: str token_file: Optional[Path] = None branch: str = 'master' @functools.cached_property def repo_api_url(self) -> str: urlparts = urllib.parse.urlsplit(self.url) urlparts = urlparts._replace(path=f'/api/v1/repos{urlparts.path}') return urllib.parse.urlunsplit(urlparts) @functools.cached_property def auth_token(self) -> Optional[str]: if self.token_file: return self.token_file.read_text().strip() def get_git_url(self) -> str: session = _get_session() headers = {} if token := self.auth_token: headers['Authorization'] = f'Bearer {token}' r = session.get( self.repo_api_url, headers=headers, ) data = r.json() if ssh_url := data.get('ssh_url'): return ssh_url return data['clone_url'] def create_pr( self, title: str, source_branch: str, target_branch: str ) -> None: session = _get_session() r = session.post( f'{self.repo_api_url}/pulls', headers={ 'Authorization': f'token {self.auth_token}', }, json={ 'title': title, 'base': target_branch, 'head': source_branch, }, ) log.log(TRACE, '%r', r.content) if 300 > r.status_code >= 200: data = r.json() log.info('Created pull request: %s', data['url']) elif r.status_code == 409: data = r.json() log.warning('%s', data['message']) elif r.status_code < 500: data = r.json() log.error('Failed to create PR: %s', data['message']) else: log.error('Failed to create PR: %r', r.content) class Config(pydantic.BaseModel): repo: RepoConfig projects: list[Project] class Arguments: config: Path branch_name: str dry_run: bool projects: list[str] def update_project(repo: git.Repo, project: Project) -> Optional[str]: basedir = Path(repo.working_dir) title = None for (image, version) in project.apply_updates(basedir): log.info('Updating %s to %s', image.name, version) if repo.index.diff(None): log.debug('Committing changes to %s', project.path) repo.index.add(str(project.path)) c = repo.index.commit(f'{image.name}: Update to {version}') log.info('Commited %s %s', str(c)[:7], c.summary) if not title: if not isinstance(c.summary, str): title = bytes(c.summary).decode('utf-8') else: title = c.summary else: log.info('No changes to commit') return title def parse_args() -> Arguments: parser = argparse.ArgumentParser() parser.add_argument( '--config', '-c', type=Path, default=XDG_CONFIG_HOME / 'updatebot' / 'config.toml', ) parser.add_argument('--branch-name', '-b', default='updatebot') parser.add_argument('--dry-run', '-n', action='store_true', default=False) parser.add_argument('projects', metavar='project', nargs='*', default=[]) return parser.parse_args(namespace=Arguments()) def setup_logging() -> None: handler = colorlog.StreamHandler() handler.setFormatter( colorlog.ColoredFormatter( '%(log_color)s%(levelname)8s%(reset)s ' '%(bold_white)s%(name)s%(reset)s ' '%(message)s', log_colors={ 'TRACE': 'purple', 'DEBUG': 'blue', 'INFO': 'green', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'bold_red', }, ) ) handler.setLevel(os.environ.get('LOG_LEVEL', 'DEBUG').upper()) logging.root.addHandler(handler) logging.root.setLevel(logging.DEBUG) log.setLevel(TRACE) def main() -> None: setup_logging() args = parse_args() yaml = ruamel.yaml.YAML() with args.config.open('rb') as f: data = yaml.load(f) config = Config.model_validate(data) log.debug('Using configuration: %s', config) projects = args.projects or [p.name for p in config.projects] if log.isEnabledFor(logging.INFO): log.info('Updating projects: %s', ', '.join(projects)) with tempfile.TemporaryDirectory(prefix='updatebot.') as d: log.debug('Using temporary working directory: %s', d) d = Path(d) log.debug('Retreiving repository Git URL') repo_url = config.repo.get_git_url() repo = git.Repo.clone_from(repo_url, d, depth=1, b=config.repo.branch) for project in config.projects: log.debug('Checking out new branch: %s', args.branch_name) repo.heads[0].checkout(force=True, B=args.branch_name) title = None if project.name not in projects: continue title = update_project(repo, project) if not title: log.info('No changes made') continue repo.head.reference.set_tracking_branch( git.RemoteReference( repo, f'refs/remotes/origin/{args.branch_name}' ) ) if not args.dry_run: repo.remote().push(force=True) config.repo.create_pr( title, args.branch_name, config.repo.branch ) else: log.info( 'Would create PR %s → %s: %s', config.repo.branch, args.branch_name, title, ) if __name__ == '__main__': main()