1
0
Fork 0

host/online: Set TTL for AMQP messages

Setting messages to expire after 10 minutes without being consumed.  If
they haven't been consumed by then, there must be something wrong with
the host provisioner.  Since each host provisioner process only
processes a single message, placing more messages onto the queue without
an expiration will cause a backlog of messages that cannot be processed.
master
Dustin 2025-02-08 09:44:25 -06:00
parent 0c2c045fd2
commit e9da2cdd28
1 changed files with 13 additions and 1 deletions

View File

@ -67,6 +67,8 @@ PAPERLESS_URL = os.environ.get(
ANSIBLE_JOB_YAML = Path(os.environ.get('ANSIBLE_JOB_YAML', 'ansible-job.yaml'))
ANSIBLE_JOB_NAMESPACE = os.environ.get('ANSIBLE_JOB_NAMESPACE', 'ansible')
HOST_INFO_QUEUE = os.environ.get('HOST_INFO_QUEUE', 'host-provisioner')
HOST_INFO_TTL = 600000
KUBERNETES_TOKEN_PATH = Path(
os.environ.get(
'KUBERNETES_TOKEN_PATH',
@ -364,12 +366,19 @@ class AMQPContext:
)
self._channel = await fut
def publish(self, exchange: str, routing_key: str, body: bytes):
def publish(
self,
exchange: str,
routing_key: str,
body: bytes,
properties: Optional[pika.BasicProperties] = None,
):
assert self._channel
self._channel.basic_publish(
exchange,
routing_key,
body,
properties,
)
async def queue_declare(
@ -620,10 +629,13 @@ async def publish_host_info(
data['branch'] = branch
await context.amqp.connect()
await context.amqp.queue_declare(HOST_INFO_QUEUE, durable=True)
properties = pika.BasicProperties()
properties.expiration = str(HOST_INFO_TTL)
context.amqp.publish(
exchange='',
routing_key=HOST_INFO_QUEUE,
body=json.dumps(data).encode('utf-8'),
properties=properties,
)