1
0
Fork 0

host/online: AMQP connection/auth config

The `AMQPContext` now supports reading connection and authentication
information from environment variables, allowing it to connect to
RabbitMQ servers other than the default (`localhost:6372`, user
_guest_).  It supports plain and TLS connection modes, as well as plain
username+password or EXTERNAL authentication.
master
Dustin 2025-02-03 20:22:09 -06:00
parent 34dbcdece6
commit 88130dd72f
1 changed files with 51 additions and 5 deletions

View File

@ -7,6 +7,7 @@ import json
import logging import logging
import os import os
import re import re
import ssl
from pathlib import Path from pathlib import Path
from types import TracebackType from types import TracebackType
from typing import Any, Optional, Self, Type from typing import Any, Optional, Self, Type
@ -15,6 +16,7 @@ import fastapi
import httpx import httpx
import pika import pika
import pika.channel import pika.channel
import pika.credentials
import pydantic import pydantic
import pyrfc6266 import pyrfc6266
import ruamel.yaml import ruamel.yaml
@ -64,6 +66,7 @@ PAPERLESS_URL = os.environ.get(
ANSIBLE_JOB_YAML = Path(os.environ.get('ANSIBLE_JOB_YAML', 'ansible-job.yaml')) ANSIBLE_JOB_YAML = Path(os.environ.get('ANSIBLE_JOB_YAML', 'ansible-job.yaml'))
ANSIBLE_JOB_NAMESPACE = os.environ.get('ANSIBLE_JOB_NAMESPACE', 'ansible') ANSIBLE_JOB_NAMESPACE = os.environ.get('ANSIBLE_JOB_NAMESPACE', 'ansible')
HOST_INFO_QUEUE = os.environ.get('HOST_INFO_QUEUE', 'host-provisioner')
KUBERNETES_TOKEN_PATH = Path( KUBERNETES_TOKEN_PATH = Path(
os.environ.get( os.environ.get(
'KUBERNETES_TOKEN_PATH', 'KUBERNETES_TOKEN_PATH',
@ -285,11 +288,54 @@ class AMQPError(Exception): ...
class AMQPContext: class AMQPContext:
def __init__(self) -> None: def __init__(
self._conn_params = pika.ConnectionParameters() self,
conn_params: Optional[
pika.ConnectionParameters | pika.URLParameters
] = None,
) -> None:
self._conn_params = conn_params
self._connection: Optional[AsyncioConnection] = None self._connection: Optional[AsyncioConnection] = None
self._channel: Optional[pika.channel.Channel] = None self._channel: Optional[pika.channel.Channel] = None
@classmethod
def from_env(cls) -> Self:
if 'AMQP_URL' in os.environ:
params = pika.URLParameters(os.environ['AMQP_URL'])
else:
kwargs = {}
if host := os.environ.get('AMQP_HOST'):
kwargs['host'] = host
if port := os.environ.get('AMQP_PORT'):
kwargs['port'] = int(port)
if vhost := os.environ.get('AMQP_VIRTUAL_HOST'):
kwargs['virtual_host'] = vhost
if username := os.environ.get('AMQP_USERNAME'):
password = os.environ.get('AMQP_PASSWORD', '')
kwargs['credentials'] = pika.PlainCredentials(
username, password
)
elif os.environ.get('AMQP_EXTERNAL_CREDENTIALS'):
kwargs['credentials'] = pika.credentials.ExternalCredentials()
if (
'AMQP_CA_CERT' in os.environ
or 'AMQP_CLIENT_CERT' in os.environ
or 'AMQP_CLIENT_KEY' in os.environ
):
sslctx = ssl.create_default_context(
cafile=os.environ.get('AMQP_CA_CERT')
)
if certfile := os.environ.get('AMQP_CLIENT_CERT'):
keyfile = os.environ.get('AMQP_CLIENT_KEY')
keypassword = os.environ.get('AMQP_CLIENT_KEY_PASSWORD')
sslctx.load_cert_chain(certfile, keyfile, keypassword)
kwargs['ssl_options'] = pika.SSLOptions(
sslctx, kwargs.get('host')
)
params = pika.ConnectionParameters(**kwargs)
return cls(params)
def close(self): def close(self):
if self._channel: if self._channel:
self._channel.close() self._channel.close()
@ -420,7 +466,7 @@ class Kubernetes(HttpxClientMixin):
class Context: class Context:
def __init__(self): def __init__(self):
self.amqp = AMQPContext() self.amqp = AMQPContext.from_env()
async def handle_firefly_transaction(xact: FireflyIIITransaction) -> None: async def handle_firefly_transaction(xact: FireflyIIITransaction) -> None:
@ -565,10 +611,10 @@ async def start_ansible_job():
async def publish_host_info(hostname: str, sshkeys: str): async def publish_host_info(hostname: str, sshkeys: str):
await context.amqp.connect() await context.amqp.connect()
await context.amqp.queue_declare('host-provision', durable=True) await context.amqp.queue_declare(HOST_INFO_QUEUE, durable=True)
context.amqp.publish( context.amqp.publish(
exchange='', exchange='',
routing_key='host-provision', routing_key=HOST_INFO_QUEUE,
body=json.dumps( body=json.dumps(
{ {
'hostname': hostname, 'hostname': hostname,