393 lines
11 KiB
Python
393 lines
11 KiB
Python
#!/usr/bin/env python
|
|
import abc
|
|
import argparse
|
|
import logging
|
|
import functools
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import tempfile
|
|
import threading
|
|
import urllib.parse
|
|
from pathlib import Path
|
|
from typing import ClassVar, Iterable, Literal, Union, Optional
|
|
|
|
import colorlog
|
|
import pydantic
|
|
import git
|
|
import requests
|
|
import ruamel.yaml
|
|
|
|
|
|
log = logging.getLogger('updatebot')
|
|
|
|
TRACE = 5
|
|
logging.addLevelName(TRACE, 'TRACE')
|
|
|
|
|
|
XDG_CONFIG_HOME = (
|
|
Path(os.environ['XDG_CONFIG_HOME'])
|
|
if 'XDG_CONFIG_HOME' in os.environ
|
|
else Path('~/.config').expanduser()
|
|
)
|
|
|
|
|
|
tls = threading.local()
|
|
|
|
|
|
def _get_session() -> requests.Session:
|
|
if hasattr(tls, 'session'):
|
|
session = tls.session
|
|
else:
|
|
log.debug('Starting new HTTP/HTTPS client session')
|
|
session = tls.session = requests.Session()
|
|
return session
|
|
|
|
|
|
class BaseSource(abc.ABC, pydantic.BaseModel):
|
|
@abc.abstractmethod
|
|
def get_latest_version(self) -> str:
|
|
raise NotImplementedError
|
|
|
|
|
|
class GithubSource(BaseSource):
|
|
kind: Literal['github']
|
|
organization: str
|
|
repo: str
|
|
version_re: str = r'(?P<version>[0-9]+(\.[0-9]+)*(-[0-9]+)?)'
|
|
|
|
def get_latest_version(self) -> str:
|
|
session = _get_session()
|
|
url = (
|
|
f'https://api.github.com/repos/'
|
|
f'{self.organization}/{self.repo}/releases/latest'
|
|
)
|
|
r = session.get(url)
|
|
release = r.json()
|
|
m = re.search(self.version_re, release['name'])
|
|
if not m:
|
|
log.warning(
|
|
'Release name "%s" did not match regular expression "%s"',
|
|
release['name'],
|
|
self.version_re,
|
|
)
|
|
return release['name']
|
|
return m.groupdict()['version']
|
|
|
|
|
|
class DockerHubSource(BaseSource):
|
|
kind: Literal['docker']
|
|
namespace: str
|
|
repository: str
|
|
version_re: str = r'^(?P<version>[0-9]+(\.[0-9]+)*(-[0-9]+)?)$'
|
|
|
|
def get_latest_version(self) -> str:
|
|
session = _get_session()
|
|
url = (
|
|
f'https://hub.docker.com/v2/'
|
|
f'namespaces/{self.namespace}/repositories/{self.repository}/tags'
|
|
)
|
|
r = session.get(url)
|
|
data = r.json()
|
|
versions = []
|
|
regex = re.compile(self.version_re)
|
|
for result in data['results']:
|
|
m = regex.match(result['name'])
|
|
if not m:
|
|
log.debug(
|
|
'Skipping tag %s: does not match regex %s',
|
|
result['name'],
|
|
self.version_re,
|
|
)
|
|
continue
|
|
version = m.groupdict()['version']
|
|
versions.append((result['last_updated'], version))
|
|
versions.sort()
|
|
return versions[-1][-1]
|
|
|
|
|
|
Source = Union[
|
|
GithubSource,
|
|
DockerHubSource,
|
|
]
|
|
|
|
|
|
class ImageDef(pydantic.BaseModel):
|
|
name: str
|
|
image: str
|
|
source: Source
|
|
tag_format: str = '{version}'
|
|
|
|
|
|
class BaseProject(abc.ABC, pydantic.BaseModel):
|
|
name: str
|
|
path: Path
|
|
|
|
def __init__(self, **data) -> None:
|
|
data.setdefault('path', data.get('name'))
|
|
super().__init__(**data)
|
|
|
|
@abc.abstractmethod
|
|
def apply_updates(self, basedir: Path) -> Iterable[tuple[ImageDef, str]]:
|
|
raise NotImplementedError
|
|
|
|
@abc.abstractmethod
|
|
def resource_diff(self, basedir: Path) -> Optional[str]:
|
|
raise NotImplementedError
|
|
|
|
|
|
class KustomizeProject(BaseProject):
|
|
kind: Literal['kustomize']
|
|
images: list[ImageDef]
|
|
|
|
kustomize_files: ClassVar[list[str]] = [
|
|
'kustomization.yaml',
|
|
'kustomization.yml',
|
|
'Kustomization',
|
|
]
|
|
|
|
def apply_updates(self, basedir: Path) -> Iterable[tuple[ImageDef, str]]:
|
|
path = basedir / self.path
|
|
for filename in self.kustomize_files:
|
|
filepath = path / filename
|
|
if filepath.is_file():
|
|
break
|
|
else:
|
|
raise ValueError(
|
|
f'Could not find Kustomize config for {self.name}'
|
|
)
|
|
|
|
for image in self.images:
|
|
yaml = ruamel.yaml.YAML()
|
|
with filepath.open('rb') as f:
|
|
kustomization = yaml.load(f)
|
|
images = kustomization.setdefault('images', [])
|
|
version = image.source.get_latest_version()
|
|
new_tag = image.tag_format.format(version=version)
|
|
for i in images:
|
|
if i['name'] == image.image:
|
|
i['newTag'] = new_tag
|
|
break
|
|
else:
|
|
images.append({'name': image.image, 'newTag': new_tag})
|
|
with filepath.open('wb') as f:
|
|
yaml.dump(kustomization, f)
|
|
yield (image, version)
|
|
|
|
def resource_diff(self, basedir: Path) -> Optional[str]:
|
|
path = basedir / self.path
|
|
cmd = ['kubectl', 'diff', '-k', path]
|
|
try:
|
|
p = subprocess.run(
|
|
cmd,
|
|
stdin=subprocess.DEVNULL,
|
|
capture_output=True,
|
|
check=False,
|
|
encoding='utf-8',
|
|
)
|
|
except FileNotFoundError as e:
|
|
log.warning('Cannot generate resource diff: %s', e)
|
|
return None
|
|
if p.returncode != 0 and not p.stdout:
|
|
log.error('Failed to generate resource diff: %s', p.stderr)
|
|
return None
|
|
return p.stdout
|
|
|
|
|
|
Project = KustomizeProject
|
|
|
|
|
|
class RepoConfig(pydantic.BaseModel):
|
|
url: str
|
|
token_file: Optional[Path] = None
|
|
branch: str = 'master'
|
|
|
|
@functools.cached_property
|
|
def repo_api_url(self) -> str:
|
|
urlparts = urllib.parse.urlsplit(self.url)
|
|
urlparts = urlparts._replace(path=f'/api/v1/repos{urlparts.path}')
|
|
return urllib.parse.urlunsplit(urlparts)
|
|
|
|
@functools.cached_property
|
|
def auth_token(self) -> Optional[str]:
|
|
if self.token_file:
|
|
return self.token_file.read_text().strip()
|
|
|
|
def get_git_url(self) -> str:
|
|
session = _get_session()
|
|
headers = {}
|
|
if token := self.auth_token:
|
|
headers['Authorization'] = f'Bearer {token}'
|
|
r = session.get(
|
|
self.repo_api_url,
|
|
headers=headers,
|
|
)
|
|
data = r.json()
|
|
if ssh_url := data.get('ssh_url'):
|
|
return ssh_url
|
|
return data['clone_url']
|
|
|
|
def create_pr(
|
|
self,
|
|
title: str,
|
|
source_branch: str,
|
|
target_branch: str,
|
|
body: Optional[str] = None,
|
|
) -> None:
|
|
session = _get_session()
|
|
r = session.post(
|
|
f'{self.repo_api_url}/pulls',
|
|
headers={
|
|
'Authorization': f'token {self.auth_token}',
|
|
},
|
|
json={
|
|
'title': title,
|
|
'base': target_branch,
|
|
'head': source_branch,
|
|
'body': body,
|
|
},
|
|
)
|
|
log.log(TRACE, '%r', r.content)
|
|
if 300 > r.status_code >= 200:
|
|
data = r.json()
|
|
log.info('Created pull request: %s', data['url'])
|
|
elif r.status_code == 409:
|
|
data = r.json()
|
|
log.warning('%s', data['message'])
|
|
elif r.status_code < 500:
|
|
data = r.json()
|
|
log.error('Failed to create PR: %s', data['message'])
|
|
else:
|
|
log.error('Failed to create PR: %r', r.content)
|
|
|
|
|
|
class Config(pydantic.BaseModel):
|
|
repo: RepoConfig
|
|
projects: list[Project]
|
|
|
|
|
|
class Arguments:
|
|
config: Path
|
|
dry_run: bool
|
|
projects: list[str]
|
|
|
|
|
|
def update_project(
|
|
repo: git.Repo, project: Project
|
|
) -> tuple[Optional[str], Optional[str]]:
|
|
basedir = Path(repo.working_dir)
|
|
title = None
|
|
for image, version in project.apply_updates(basedir):
|
|
log.info('Updating %s to %s', image.name, version)
|
|
if repo.index.diff(None):
|
|
log.debug('Committing changes to %s', project.path)
|
|
repo.index.add(str(project.path))
|
|
c = repo.index.commit(f'{image.name}: Update to {version}')
|
|
log.info('Commited %s %s', str(c)[:7], c.summary)
|
|
if not title:
|
|
if not isinstance(c.summary, str):
|
|
title = bytes(c.summary).decode('utf-8')
|
|
else:
|
|
title = c.summary
|
|
else:
|
|
log.info('No changes to commit')
|
|
diff = project.resource_diff(basedir)
|
|
return title, diff
|
|
|
|
|
|
def parse_args() -> Arguments:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument(
|
|
'--config',
|
|
'-c',
|
|
type=Path,
|
|
default=XDG_CONFIG_HOME / 'updatebot' / 'config.yml',
|
|
)
|
|
parser.add_argument('--dry-run', '-n', action='store_true', default=False)
|
|
parser.add_argument('projects', metavar='project', nargs='*', default=[])
|
|
return parser.parse_args(namespace=Arguments())
|
|
|
|
|
|
def setup_logging() -> None:
|
|
handler = colorlog.StreamHandler()
|
|
handler.setFormatter(
|
|
colorlog.ColoredFormatter(
|
|
'%(log_color)s%(levelname)8s%(reset)s '
|
|
'%(bold_white)s%(name)s%(reset)s '
|
|
'%(message)s',
|
|
log_colors={
|
|
'TRACE': 'purple',
|
|
'DEBUG': 'blue',
|
|
'INFO': 'green',
|
|
'WARNING': 'yellow',
|
|
'ERROR': 'red',
|
|
'CRITICAL': 'bold_red',
|
|
},
|
|
)
|
|
)
|
|
handler.setLevel(os.environ.get('LOG_LEVEL', 'DEBUG').upper())
|
|
logging.root.addHandler(handler)
|
|
logging.root.setLevel(logging.DEBUG)
|
|
log.setLevel(TRACE)
|
|
|
|
|
|
def main() -> None:
|
|
setup_logging()
|
|
args = parse_args()
|
|
yaml = ruamel.yaml.YAML()
|
|
with args.config.open('rb') as f:
|
|
data = yaml.load(f)
|
|
config = Config.model_validate(data)
|
|
log.debug('Using configuration: %s', config)
|
|
projects = args.projects or [p.name for p in config.projects]
|
|
if log.isEnabledFor(logging.INFO):
|
|
log.info('Updating projects: %s', ', '.join(projects))
|
|
with tempfile.TemporaryDirectory(prefix='updatebot.') as d:
|
|
log.debug('Using temporary working directory: %s', d)
|
|
d = Path(d)
|
|
log.debug('Retreiving repository Git URL')
|
|
repo_url = config.repo.get_git_url()
|
|
repo = git.Repo.clone_from(repo_url, d, depth=1, b=config.repo.branch)
|
|
for project in config.projects:
|
|
branch_name = f'updatebot/{project.name}'
|
|
log.debug('Checking out new branch: %s', branch_name)
|
|
repo.heads[0].checkout(force=True, B=branch_name)
|
|
title = None
|
|
description = None
|
|
if project.name not in projects:
|
|
continue
|
|
title, diff = update_project(repo, project)
|
|
if not title:
|
|
log.info('No changes made')
|
|
continue
|
|
if diff:
|
|
description = (
|
|
'<details>\n<summary>Resource diff</summary>\n\n'
|
|
f'```diff\n{diff}```\n'
|
|
'</details>'
|
|
)
|
|
if not args.dry_run:
|
|
repo.head.reference.set_tracking_branch(
|
|
git.RemoteReference(
|
|
repo, f'refs/remotes/origin/{branch_name}'
|
|
)
|
|
)
|
|
repo.remote().push(force=True)
|
|
config.repo.create_pr(
|
|
title,
|
|
branch_name,
|
|
config.repo.branch,
|
|
description,
|
|
)
|
|
else:
|
|
print(
|
|
'Would create PR',
|
|
f'{branch_name} → {config.repo.branch}:',
|
|
title,
|
|
)
|
|
print(description or '')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|