updatebot/updatebot.py

393 lines
11 KiB
Python

#!/usr/bin/env python
import abc
import argparse
import logging
import functools
import os
import re
import subprocess
import tempfile
import threading
import urllib.parse
from pathlib import Path
from typing import ClassVar, Iterable, Literal, Union, Optional
import colorlog
import pydantic
import git
import requests
import ruamel.yaml
log = logging.getLogger('updatebot')
TRACE = 5
logging.addLevelName(TRACE, 'TRACE')
XDG_CONFIG_HOME = (
Path(os.environ['XDG_CONFIG_HOME'])
if 'XDG_CONFIG_HOME' in os.environ
else Path('~/.config').expanduser()
)
tls = threading.local()
def _get_session() -> requests.Session:
if hasattr(tls, 'session'):
session = tls.session
else:
log.debug('Starting new HTTP/HTTPS client session')
session = tls.session = requests.Session()
return session
class BaseSource(abc.ABC, pydantic.BaseModel):
@abc.abstractmethod
def get_latest_version(self) -> str:
raise NotImplementedError
class GithubSource(BaseSource):
kind: Literal['github']
organization: str
repo: str
version_re: str = r'(?P<version>[0-9]+(\.[0-9]+)*(-[0-9]+)?)'
def get_latest_version(self) -> str:
session = _get_session()
url = (
f'https://api.github.com/repos/'
f'{self.organization}/{self.repo}/releases/latest'
)
r = session.get(url)
release = r.json()
m = re.search(self.version_re, release['name'])
if not m:
log.warning(
'Release name "%s" did not match regular expression "%s"',
release['name'],
self.version_re,
)
return release['name']
return m.groupdict()['version']
class DockerHubSource(BaseSource):
kind: Literal['docker']
namespace: str
repository: str
version_re: str = r'^(?P<version>[0-9]+(\.[0-9]+)*(-[0-9]+)?)$'
def get_latest_version(self) -> str:
session = _get_session()
url = (
f'https://hub.docker.com/v2/'
f'namespaces/{self.namespace}/repositories/{self.repository}/tags'
)
r = session.get(url)
data = r.json()
versions = []
regex = re.compile(self.version_re)
for result in data['results']:
m = regex.match(result['name'])
if not m:
log.debug(
'Skipping tag %s: does not match regex %s',
result['name'],
self.version_re,
)
continue
version = m.groupdict()['version']
versions.append((result['last_updated'], version))
versions.sort()
return versions[-1][-1]
Source = Union[
GithubSource,
DockerHubSource,
]
class ImageDef(pydantic.BaseModel):
name: str
image: str
source: Source
tag_format: str = '{version}'
class BaseProject(abc.ABC, pydantic.BaseModel):
name: str
path: Path
def __init__(self, **data) -> None:
data.setdefault('path', data.get('name'))
super().__init__(**data)
@abc.abstractmethod
def apply_updates(self, basedir: Path) -> Iterable[tuple[ImageDef, str]]:
raise NotImplementedError
@abc.abstractmethod
def resource_diff(self, basedir: Path) -> Optional[str]:
raise NotImplementedError
class KustomizeProject(BaseProject):
kind: Literal['kustomize']
images: list[ImageDef]
kustomize_files: ClassVar[list[str]] = [
'kustomization.yaml',
'kustomization.yml',
'Kustomization',
]
def apply_updates(self, basedir: Path) -> Iterable[tuple[ImageDef, str]]:
path = basedir / self.path
for filename in self.kustomize_files:
filepath = path / filename
if filepath.is_file():
break
else:
raise ValueError(
f'Could not find Kustomize config for {self.name}'
)
for image in self.images:
yaml = ruamel.yaml.YAML()
with filepath.open('rb') as f:
kustomization = yaml.load(f)
images = kustomization.setdefault('images', [])
version = image.source.get_latest_version()
new_tag = image.tag_format.format(version=version)
for i in images:
if i['name'] == image.image:
i['newTag'] = new_tag
break
else:
images.append({'name': image.image, 'newTag': new_tag})
with filepath.open('wb') as f:
yaml.dump(kustomization, f)
yield (image, version)
def resource_diff(self, basedir: Path) -> Optional[str]:
path = basedir / self.path
cmd = ['kubectl', 'diff', '-k', path]
try:
p = subprocess.run(
cmd,
stdin=subprocess.DEVNULL,
capture_output=True,
check=False,
encoding='utf-8',
)
except FileNotFoundError as e:
log.warning('Cannot generate resource diff: %s', e)
return None
if p.returncode != 0 and not p.stdout:
log.error('Failed to generate resource diff: %s', p.stderr)
return None
return p.stdout
Project = KustomizeProject
class RepoConfig(pydantic.BaseModel):
url: str
token_file: Optional[Path] = None
branch: str = 'master'
@functools.cached_property
def repo_api_url(self) -> str:
urlparts = urllib.parse.urlsplit(self.url)
urlparts = urlparts._replace(path=f'/api/v1/repos{urlparts.path}')
return urllib.parse.urlunsplit(urlparts)
@functools.cached_property
def auth_token(self) -> Optional[str]:
if self.token_file:
return self.token_file.read_text().strip()
def get_git_url(self) -> str:
session = _get_session()
headers = {}
if token := self.auth_token:
headers['Authorization'] = f'Bearer {token}'
r = session.get(
self.repo_api_url,
headers=headers,
)
data = r.json()
if ssh_url := data.get('ssh_url'):
return ssh_url
return data['clone_url']
def create_pr(
self,
title: str,
source_branch: str,
target_branch: str,
body: Optional[str] = None,
) -> None:
session = _get_session()
r = session.post(
f'{self.repo_api_url}/pulls',
headers={
'Authorization': f'token {self.auth_token}',
},
json={
'title': title,
'base': target_branch,
'head': source_branch,
'body': body,
},
)
log.log(TRACE, '%r', r.content)
if 300 > r.status_code >= 200:
data = r.json()
log.info('Created pull request: %s', data['url'])
elif r.status_code == 409:
data = r.json()
log.warning('%s', data['message'])
elif r.status_code < 500:
data = r.json()
log.error('Failed to create PR: %s', data['message'])
else:
log.error('Failed to create PR: %r', r.content)
class Config(pydantic.BaseModel):
repo: RepoConfig
projects: list[Project]
class Arguments:
config: Path
dry_run: bool
projects: list[str]
def update_project(
repo: git.Repo, project: Project
) -> tuple[Optional[str], Optional[str]]:
basedir = Path(repo.working_dir)
title = None
for image, version in project.apply_updates(basedir):
log.info('Updating %s to %s', image.name, version)
if repo.index.diff(None):
log.debug('Committing changes to %s', project.path)
repo.index.add(str(project.path))
c = repo.index.commit(f'{image.name}: Update to {version}')
log.info('Commited %s %s', str(c)[:7], c.summary)
if not title:
if not isinstance(c.summary, str):
title = bytes(c.summary).decode('utf-8')
else:
title = c.summary
else:
log.info('No changes to commit')
diff = project.resource_diff(basedir)
return title, diff
def parse_args() -> Arguments:
parser = argparse.ArgumentParser()
parser.add_argument(
'--config',
'-c',
type=Path,
default=XDG_CONFIG_HOME / 'updatebot' / 'config.yml',
)
parser.add_argument('--dry-run', '-n', action='store_true', default=False)
parser.add_argument('projects', metavar='project', nargs='*', default=[])
return parser.parse_args(namespace=Arguments())
def setup_logging() -> None:
handler = colorlog.StreamHandler()
handler.setFormatter(
colorlog.ColoredFormatter(
'%(log_color)s%(levelname)8s%(reset)s '
'%(bold_white)s%(name)s%(reset)s '
'%(message)s',
log_colors={
'TRACE': 'purple',
'DEBUG': 'blue',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'bold_red',
},
)
)
handler.setLevel(os.environ.get('LOG_LEVEL', 'DEBUG').upper())
logging.root.addHandler(handler)
logging.root.setLevel(logging.DEBUG)
log.setLevel(TRACE)
def main() -> None:
setup_logging()
args = parse_args()
yaml = ruamel.yaml.YAML()
with args.config.open('rb') as f:
data = yaml.load(f)
config = Config.model_validate(data)
log.debug('Using configuration: %s', config)
projects = args.projects or [p.name for p in config.projects]
if log.isEnabledFor(logging.INFO):
log.info('Updating projects: %s', ', '.join(projects))
with tempfile.TemporaryDirectory(prefix='updatebot.') as d:
log.debug('Using temporary working directory: %s', d)
d = Path(d)
log.debug('Retreiving repository Git URL')
repo_url = config.repo.get_git_url()
repo = git.Repo.clone_from(repo_url, d, depth=1, b=config.repo.branch)
for project in config.projects:
branch_name = f'updatebot/{project.name}'
log.debug('Checking out new branch: %s', branch_name)
repo.heads[0].checkout(force=True, B=branch_name)
title = None
description = None
if project.name not in projects:
continue
title, diff = update_project(repo, project)
if not title:
log.info('No changes made')
continue
if diff:
description = (
'<details>\n<summary>Resource diff</summary>\n\n'
f'```diff\n{diff}```\n'
'</details>'
)
if not args.dry_run:
repo.head.reference.set_tracking_branch(
git.RemoteReference(
repo, f'refs/remotes/origin/{branch_name}'
)
)
repo.remote().push(force=True)
config.repo.create_pr(
title,
branch_name,
config.repo.branch,
description,
)
else:
print(
'Would create PR',
f'{branch_name}{config.repo.branch}:',
title,
)
print(description or '')
if __name__ == '__main__':
main()