349 lines
9.9 KiB
Python
349 lines
9.9 KiB
Python
#!/usr/bin/env python
|
|
import abc
|
|
import argparse
|
|
import logging
|
|
import functools
|
|
import os
|
|
import re
|
|
import tempfile
|
|
import threading
|
|
import tomllib
|
|
import urllib.parse
|
|
from pathlib import Path
|
|
from typing import ClassVar, Literal, Union, Optional
|
|
|
|
import colorlog
|
|
import pydantic
|
|
import git
|
|
import requests
|
|
import ruamel.yaml
|
|
|
|
|
|
log = logging.getLogger('updatebot')
|
|
|
|
TRACE = 5
|
|
logging.addLevelName(TRACE, 'TRACE')
|
|
|
|
|
|
XDG_CONFIG_HOME = (
|
|
Path(os.environ['XDG_CONFIG_HOME'])
|
|
if 'XDG_CONFIG_HOME' in os.environ
|
|
else Path('~/.config').expanduser()
|
|
)
|
|
|
|
|
|
tls = threading.local()
|
|
|
|
|
|
def _get_session() -> requests.Session:
|
|
if hasattr(tls, 'session'):
|
|
session = tls.session
|
|
else:
|
|
log.debug('Starting new HTTP/HTTPS client session')
|
|
session = tls.session = requests.Session()
|
|
return session
|
|
|
|
|
|
class BaseSource(abc.ABC, pydantic.BaseModel):
|
|
@abc.abstractmethod
|
|
def get_latest_version(self) -> str:
|
|
raise NotImplementedError
|
|
|
|
|
|
class GithubSource(BaseSource):
|
|
kind: Literal['github']
|
|
organization: str
|
|
repo: str
|
|
version_re: str = r'(?P<version>[0-9]+(\.[0-9]+)*(-[0-9]+)?)'
|
|
|
|
def get_latest_version(self) -> str:
|
|
session = _get_session()
|
|
url = (
|
|
f'https://api.github.com/repos/'
|
|
f'{self.organization}/{self.repo}/releases/latest'
|
|
)
|
|
r = session.get(url)
|
|
release = r.json()
|
|
m = re.search(self.version_re, release['name'])
|
|
if not m:
|
|
log.warning(
|
|
'Release name "%s" did not match regular expression "%s"',
|
|
release['name'],
|
|
self.version_re,
|
|
)
|
|
return release['name']
|
|
return m.groupdict()['version']
|
|
|
|
|
|
class DockerHubSource(BaseSource):
|
|
kind: Literal['docker']
|
|
namespace: str
|
|
repository: str
|
|
version_re: str = r'^(?P<version>[0-9]+(\.[0-9]+)*(-[0-9]+)?)$'
|
|
|
|
def get_latest_version(self) -> str:
|
|
session = _get_session()
|
|
url = (
|
|
f'https://hub.docker.com/v2/'
|
|
f'namespaces/{self.namespace}/repositories/{self.repository}/tags'
|
|
)
|
|
r = session.get(url)
|
|
data = r.json()
|
|
versions = []
|
|
regex = re.compile(self.version_re)
|
|
for result in data['results']:
|
|
m = regex.match(result['name'])
|
|
if not m:
|
|
log.debug(
|
|
'Skipping tag %s: does not match regex %s',
|
|
result['name'],
|
|
self.version_re,
|
|
)
|
|
continue
|
|
version = m.groupdict()['version']
|
|
versions.append((result['last_updated'], version))
|
|
versions.sort()
|
|
return versions[-1][-1]
|
|
|
|
|
|
Source = Union[
|
|
GithubSource,
|
|
DockerHubSource,
|
|
]
|
|
|
|
|
|
class BaseProject(abc.ABC, pydantic.BaseModel):
|
|
path: Optional[Path] = None
|
|
source: Source
|
|
image: str
|
|
tag_format: str = '{version}'
|
|
|
|
@abc.abstractmethod
|
|
def apply_update(self, path: Path, version: str) -> None:
|
|
raise NotImplementedError
|
|
|
|
|
|
class KustomizeProject(BaseProject):
|
|
kind: Literal['kustomize']
|
|
|
|
kustomize_files: ClassVar[list[str]] = [
|
|
'kustomization.yaml',
|
|
'kustomization.yml',
|
|
'Kustomization',
|
|
]
|
|
|
|
def apply_update(self, path: Path, version: str) -> None:
|
|
for filename in self.kustomize_files:
|
|
filepath = path / filename
|
|
if filepath.is_file():
|
|
break
|
|
else:
|
|
filepath = path / self.kustomize_files[0]
|
|
yaml = ruamel.yaml.YAML()
|
|
with filepath.open('rb') as f:
|
|
kustomization = yaml.load(f)
|
|
images = kustomization.setdefault('images', [])
|
|
new_tag = self.tag_format.format(version=version)
|
|
for image in images:
|
|
if image['name'] == self.image:
|
|
image['newTag'] = new_tag
|
|
break
|
|
else:
|
|
images.append({'name': self.image, 'newTag': new_tag})
|
|
with filepath.open('wb') as f:
|
|
yaml.dump(kustomization, f)
|
|
|
|
|
|
class DirectoryProject(BaseProject):
|
|
kind: Literal['dir'] | Literal['directory']
|
|
|
|
|
|
Project = Union[
|
|
KustomizeProject,
|
|
DirectoryProject,
|
|
]
|
|
|
|
|
|
class RepoConfig(pydantic.BaseModel):
|
|
url: str
|
|
token_file: Path
|
|
branch: str = 'master'
|
|
|
|
@functools.cached_property
|
|
def repo_api_url(self) -> str:
|
|
urlparts = urllib.parse.urlsplit(self.url)
|
|
urlparts = urlparts._replace(path=f'/api/v1/repos{urlparts.path}')
|
|
return urllib.parse.urlunsplit(urlparts)
|
|
|
|
@functools.cached_property
|
|
def auth_token(self) -> str:
|
|
return self.token_file.read_text().strip()
|
|
|
|
def get_git_url(self) -> str:
|
|
session = _get_session()
|
|
r = session.get(
|
|
self.repo_api_url,
|
|
headers={
|
|
'Authorization': f'token {self.auth_token}',
|
|
},
|
|
)
|
|
data = r.json()
|
|
if ssh_url := data.get('ssh_url'):
|
|
return ssh_url
|
|
return data['clone_url']
|
|
|
|
def create_pr(
|
|
self, title: str, source_branch: str, target_branch: str
|
|
) -> None:
|
|
session = _get_session()
|
|
r = session.post(
|
|
f'{self.repo_api_url}/pulls',
|
|
headers={
|
|
'Authorization': f'token {self.auth_token}',
|
|
},
|
|
json={
|
|
'title': title,
|
|
'base': target_branch,
|
|
'head': source_branch,
|
|
},
|
|
)
|
|
log.log(TRACE, '%r', r.content)
|
|
if 300 > r.status_code >= 200:
|
|
data = r.json()
|
|
log.info('Created pull request: %s', data['url'])
|
|
elif r.status_code == 409:
|
|
data = r.json()
|
|
log.warning('%s', data['message'])
|
|
elif r.status_code < 500:
|
|
data = r.json()
|
|
log.error('Failed to create PR: %s', data['message'])
|
|
else:
|
|
log.error('Failed to create PR: %r', r.content)
|
|
|
|
|
|
class Config(pydantic.BaseModel):
|
|
repo: RepoConfig
|
|
projects: dict[str, Project]
|
|
|
|
|
|
class Arguments:
|
|
config: Path
|
|
branch_name: str
|
|
dry_run: bool
|
|
projects: list[str]
|
|
|
|
|
|
def update_project(
|
|
repo: git.Repo, name: str, project: Project
|
|
) -> Optional[git.Commit]:
|
|
basedir = Path(repo.working_dir)
|
|
log.debug('Checking for latest version of %s', name)
|
|
latest = project.source.get_latest_version()
|
|
log.info('Found version %s for %s', latest, name)
|
|
log.debug('Applying update for %s version %s', name, latest)
|
|
path = basedir / (project.path or name)
|
|
project.apply_update(path, latest)
|
|
if repo.index.diff(None):
|
|
log.debug('Committing changes to %s', path)
|
|
repo.index.add(str(path))
|
|
c = repo.index.commit(f'{name}: Update to {latest}')
|
|
log.info('Commited %s %s', str(c)[:7], c.summary)
|
|
return c
|
|
else:
|
|
log.info('No changes to commit')
|
|
return None
|
|
|
|
|
|
def parse_args() -> Arguments:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument(
|
|
'--config',
|
|
'-c',
|
|
type=Path,
|
|
default=XDG_CONFIG_HOME / 'updatebot' / 'config.toml',
|
|
)
|
|
parser.add_argument('--branch-name', '-b', default='updatebot')
|
|
parser.add_argument('--dry-run', '-n', action='store_true', default=False)
|
|
parser.add_argument('projects', metavar='project', nargs='*', default=[])
|
|
return parser.parse_args(namespace=Arguments())
|
|
|
|
|
|
def setup_logging() -> None:
|
|
handler = colorlog.StreamHandler()
|
|
handler.setFormatter(
|
|
colorlog.ColoredFormatter(
|
|
'%(log_color)s%(levelname)8s%(reset)s '
|
|
'%(bold_white)s%(name)s%(reset)s '
|
|
'%(message)s',
|
|
log_colors={
|
|
'TRACE': 'purple',
|
|
'DEBUG': 'blue',
|
|
'INFO': 'green',
|
|
'WARNING': 'yellow',
|
|
'ERROR': 'red',
|
|
'CRITICAL': 'bold_red',
|
|
},
|
|
)
|
|
)
|
|
handler.setLevel(os.environ.get('LOG_LEVEL', 'DEBUG').upper())
|
|
logging.root.addHandler(handler)
|
|
logging.root.setLevel(logging.DEBUG)
|
|
log.setLevel(TRACE)
|
|
|
|
|
|
def main() -> None:
|
|
setup_logging()
|
|
args = parse_args()
|
|
with args.config.open('rb') as f:
|
|
data = tomllib.load(f)
|
|
config = Config.model_validate(data)
|
|
log.debug('Using configuration: %s', config)
|
|
all_projects = list(config.projects.keys())
|
|
projects = []
|
|
if args.projects:
|
|
for project in args.projects:
|
|
if project not in all_projects:
|
|
log.warning('Unknown project: %s', project)
|
|
continue
|
|
projects.append(project)
|
|
else:
|
|
projects = all_projects
|
|
if not projects:
|
|
log.error('No projects')
|
|
raise SystemExit(1)
|
|
if log.isEnabledFor(logging.INFO):
|
|
log.info('Updating projects: %s', ', '.join(projects))
|
|
with tempfile.TemporaryDirectory(prefix='updatebot.') as d:
|
|
log.debug('Using temporary working directory: %s', d)
|
|
d = Path(d)
|
|
log.debug('Retreiving repository Git URL')
|
|
repo_url = config.repo.get_git_url()
|
|
repo = git.Repo.clone_from(repo_url, d, depth=1, b=config.repo.branch)
|
|
log.debug('Checking out new branch: %s', args.branch_name)
|
|
repo.heads[0].checkout(force=True, B=args.branch_name)
|
|
title = None
|
|
for project in projects:
|
|
commit = update_project(repo, project, config.projects[project])
|
|
if commit and not title:
|
|
if not isinstance(commit.summary, str):
|
|
title = bytes(commit.summary).decode(
|
|
'utf-8', errors='replace'
|
|
)
|
|
else:
|
|
title = commit.summary
|
|
if not title:
|
|
log.info('No changes made')
|
|
return
|
|
repo.head.reference.set_tracking_branch(
|
|
git.RemoteReference(
|
|
repo, f'refs/remotes/origin/{args.branch_name}'
|
|
)
|
|
)
|
|
if not args.dry_run:
|
|
repo.remote().push(force=True)
|
|
config.repo.create_pr(title, args.branch_name, config.repo.branch)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|