updatebot/updatebot.py

329 lines
9.2 KiB
Python

#!/usr/bin/env python
import abc
import argparse
import logging
import functools
import os
import re
import tempfile
import threading
import tomllib
import urllib.parse
from pathlib import Path
from typing import ClassVar, Literal, Union, Optional
import colorlog
import pydantic
import git
import requests
import ruamel.yaml
log = logging.getLogger('updatebot')
TRACE = 5
logging.addLevelName(TRACE, 'TRACE')
XDG_CONFIG_HOME = (
Path(os.environ['XDG_CONFIG_HOME'])
if 'XDG_CONFIG_HOME' in os.environ
else Path('~/.config').expanduser()
)
tls = threading.local()
def _get_session() -> requests.Session:
if hasattr(tls, 'session'):
session = tls.session
else:
log.debug('Starting new HTTP/HTTPS client session')
session = tls.session = requests.Session()
return session
class BaseSource(abc.ABC, pydantic.BaseModel):
@abc.abstractmethod
def get_latest_version(self) -> str:
raise NotImplementedError
class GithubSource(BaseSource):
kind: Literal['github']
organization: str
repo: str
version_re: str = r'(?P<version>[0-9]+(\.[0-9]+)*(-[0-9]+)?)'
def get_latest_version(self) -> str:
session = _get_session()
url = (
f'https://api.github.com/repos/'
f'{self.organization}/{self.repo}/releases/latest'
)
r = session.get(url)
release = r.json()
m = re.search(self.version_re, release['name'])
if not m:
log.warning(
'Release name "%s" did not match regular expression "%s"',
release['name'],
self.version_re,
)
return release['name']
return m.groupdict()['version']
class DockerHubSource(BaseSource):
kind: Literal['docker']
namespace: str
repository: str
def get_latest_version(self) -> str:
session = _get_session()
url = (
f'https://hub.docker.com/v2/'
f'namespaces/{self.namespace}/repositories/{self.repository}/tags'
)
r = session.get(url)
data = r.json()
versions = []
for result in data['results']:
if result['name'] == 'latest':
continue
versions.append((result['last_updated'], result['name']))
versions.sort()
return versions[-1][-1]
Source = Union[
GithubSource,
DockerHubSource,
]
class BaseProject(abc.ABC, pydantic.BaseModel):
path: Optional[Path] = None
source: Source
image: str
tag_format: str = '{version}'
@abc.abstractmethod
def apply_update(self, path: Path, version: str) -> None:
raise NotImplementedError
class KustomizeProject(BaseProject):
kind: Literal['kustomize']
kustomize_files: ClassVar[list[str]] = [
'kustomization.yaml',
'kustomization.yml',
'Kustomization',
]
def apply_update(self, path: Path, version: str) -> None:
for filename in self.kustomize_files:
filepath = path / filename
if filepath.is_file():
break
else:
filepath = path / self.kustomize_files[0]
yaml = ruamel.yaml.YAML()
with filepath.open('rb') as f:
kustomization = yaml.load(f)
images = kustomization.setdefault('images', [])
new_tag = self.tag_format.format(version=version)
for image in images:
if image['image'] == self.image:
image['newTag'] = new_tag
break
else:
images.append({'image': self.image, 'newTag': new_tag})
with filepath.open('wb') as f:
yaml.dump(kustomization, f)
class DirectoryProject(BaseProject):
kind: Literal['dir'] | Literal['directory']
Project = Union[
KustomizeProject,
DirectoryProject,
]
class RepoConfig(pydantic.BaseModel):
url: str
token_file: Path
@functools.cached_property
def repo_api_url(self) -> str:
urlparts = urllib.parse.urlsplit(self.url)
urlparts = urlparts._replace(path=f'/api/v1/repos{urlparts.path}')
return urllib.parse.urlunsplit(urlparts)
@functools.cached_property
def auth_token(self) -> str:
return self.token_file.read_text().strip()
def get_git_url(self) -> str:
session = _get_session()
r = session.get(
self.repo_api_url,
headers={
'Authorization': f'token {self.auth_token}',
},
)
data = r.json()
if ssh_url := data.get('ssh_url'):
return ssh_url
return data['clone_url']
def create_pr(
self, title: str, source_branch: str, target_branch: str
) -> None:
session = _get_session()
r = session.post(
f'{self.repo_api_url}/pulls',
headers={
'Authorization': f'token {self.auth_token}',
},
json={
'title': title,
'base': target_branch,
'head': source_branch,
},
)
log.log(TRACE, '%r', r.content)
if 300 > r.status_code >= 200:
data = r.json()
log.info('Created pull request: %s', data['url'])
elif r.status_code == 409:
data = r.json()
log.warning('%s', data['message'])
elif r.status_code < 500:
data = r.json()
log.error('Failed to create PR: %s', data['message'])
else:
log.error('Failed to create PR: %r', r.content)
class Config(pydantic.BaseModel):
repo: RepoConfig
projects: dict[str, Project]
class Arguments:
config: Path
branch_name: str
projects: list[str]
def update_project(repo: git.Repo, name: str, project: Project) -> git.Commit:
basedir = Path(repo.working_dir)
log.debug('Checking for latest version of %s', name)
latest = project.source.get_latest_version()
log.info('Found version %s for %s', latest, name)
log.debug('Applying update for %s version %s', name, latest)
path = basedir / (project.path or name)
log.debug('Committing changes to %s', path)
project.apply_update(path, latest)
repo.index.add(str(path))
c = repo.index.commit(f'{name}: Update to {latest}')
log.info('Commited %s %s', str(c)[:7], c.summary)
return c
def parse_args() -> Arguments:
parser = argparse.ArgumentParser()
parser.add_argument(
'--config',
'-c',
type=Path,
default=XDG_CONFIG_HOME / 'updatebot' / 'config.toml',
)
parser.add_argument('--branch-name', '-b', default='updatebot')
parser.add_argument('projects', metavar='project', nargs='*', default=[])
return parser.parse_args(namespace=Arguments())
def setup_logging() -> None:
handler = colorlog.StreamHandler()
handler.setFormatter(
colorlog.ColoredFormatter(
'%(log_color)s%(levelname)8s%(reset)s '
'%(bold_white)s%(name)s%(reset)s '
'%(message)s',
log_colors={
'TRACE': 'purple',
'DEBUG': 'blue',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'bold_red',
},
)
)
handler.setLevel(os.environ.get('LOG_LEVEL', 'DEBUG'))
logging.root.addHandler(handler)
logging.root.setLevel(logging.DEBUG)
log.setLevel(TRACE)
def main() -> None:
setup_logging()
args = parse_args()
with args.config.open('rb') as f:
data = tomllib.load(f)
config = Config.model_validate(data)
log.debug('Using configuration: %s', config)
all_projects = list(config.projects.keys())
projects = []
if args.projects:
for project in args.projects:
if project not in all_projects:
log.warning('Unknown project: %s', project)
continue
projects.append(project)
else:
projects = all_projects
if not projects:
log.error('No projects')
raise SystemExit(1)
if log.isEnabledFor(logging.INFO):
log.info('Updating projects: %s', ', '.join(projects))
with tempfile.TemporaryDirectory(prefix='updatebot.') as d:
log.debug('Using temporary working directory: %s', d)
d = Path(d)
log.debug('Retreiving repository Git URL')
repo_url = config.repo.get_git_url()
repo = git.Repo.clone_from(repo_url, d, depth=1)
log.debug('Checking out new branch: %s', args.branch_name)
repo.heads[0].checkout(force=True, B=args.branch_name)
title = None
for project in projects:
commit = update_project(repo, project, config.projects[project])
if not title:
if not isinstance(commit.summary, str):
title = bytes(commit.summary).decode(
'utf-8', errors='replace'
)
else:
title = commit.summary
if not title:
return
repo.head.reference.set_tracking_branch(
git.RemoteReference(
repo, f'refs/remotes/origin/{args.branch_name}'
)
)
repo.remote().push(force=True)
config.repo.create_pr(title, args.branch_name, 'master')
if __name__ == '__main__':
main()