diff --git a/taiga/events/backends/rabbitmq.py b/taiga/events/backends/rabbitmq.py index 829dcf3a..c09113c6 100644 --- a/taiga/events/backends/rabbitmq.py +++ b/taiga/events/backends/rabbitmq.py @@ -50,17 +50,21 @@ class EventsPushBackend(base.BaseEventsPushBackend): def emit_event(self, message:str, *, routing_key:str, channel:str="events"): connection = _make_rabbitmq_connection(self.url) - try: - rchannel = connection.channel() - message = AmqpMessage(message) + connection.connect() + except ConnectionRefusedError: + log.error("EventsPushBackend: Unable to connect with RabbitMQ at {}".format(self.url), + exc_info=True) + else: + try: + message = AmqpMessage(message) + rchannel = connection.channel() - rchannel.exchange_declare(exchange=channel, type="topic", auto_delete=True) - rchannel.basic_publish(message, routing_key=routing_key, exchange=channel) - rchannel.close() - - except Exception: - log.error("Unhandled exception", exc_info=True) - - finally: - connection.close() + rchannel.exchange_declare(exchange=channel, type="topic", auto_delete=True) + rchannel.basic_publish(message, routing_key=routing_key, exchange=channel) + rchannel.close() + except Exception: + log.error("EventsPushBackend: Unhandled exception", + exc_info=True) + finally: + connection.close() diff --git a/taiga/events/events.py b/taiga/events/events.py index 2df3dd37..bbd8735c 100644 --- a/taiga/events/events.py +++ b/taiga/events/events.py @@ -17,10 +17,9 @@ # along with this program. If not, see . -import json import collections -from django.contrib.contenttypes.models import ContentType +from django.db import connection from taiga.base.utils import json from taiga.base.utils.db import get_typename_for_model_instance @@ -39,7 +38,8 @@ watched_types = set([ def emit_event(data:dict, routing_key:str, *, - sessionid:str=None, channel:str="events"): + sessionid:str=None, channel:str="events", + on_commit:bool=True): if not sessionid: sessionid = mw.get_current_session_id() @@ -47,9 +47,14 @@ def emit_event(data:dict, routing_key:str, *, "data": data} backend = backends.get_events_backend() - return backend.emit_event(message=json.dumps(data), - routing_key=routing_key, - channel=channel) + + def backend_emit_event(): + backend.emit_event(message=json.dumps(data), routing_key=routing_key, channel=channel) + + if on_commit: + connection.on_commit(backend_emit_event) + else: + backend_emit_event() def emit_event_for_model(obj, *, type:str="change", channel:str="events", diff --git a/taiga/events/management/commands/emit_notification_message.py b/taiga/events/management/commands/emit_notification_message.py index 7ae7facc..ed5365d2 100644 --- a/taiga/events/management/commands/emit_notification_message.py +++ b/taiga/events/management/commands/emit_notification_message.py @@ -33,4 +33,4 @@ class Command(BaseCommand): "desc": options["description"], } routing_key = "notifications" - emit_event(data, routing_key) + emit_event(data, routing_key, on_commit=False) diff --git a/taiga/events/signal_handlers.py b/taiga/events/signal_handlers.py index cb2a1250..e310fcf1 100644 --- a/taiga/events/signal_handlers.py +++ b/taiga/events/signal_handlers.py @@ -17,7 +17,6 @@ # along with this program. If not, see . from django.db.models import signals -from django.db import connection from django.dispatch import receiver @@ -43,8 +42,7 @@ def on_save_any_model(sender, instance, created, **kwargs): if created: type = "create" - emit_event = lambda: events.emit_event_for_model(instance, sessionid=sesionid, type=type) - connection.on_commit(emit_event) + events.emit_event_for_model(instance, sessionid=sesionid, type=type) def on_delete_any_model(sender, instance, **kwargs): @@ -57,5 +55,4 @@ def on_delete_any_model(sender, instance, **kwargs): sesionid = mw.get_current_session_id() - emit_event = lambda: events.emit_event_for_model(instance, sessionid=sesionid, type="delete") - connection.on_commit(emit_event) + events.emit_event_for_model(instance, sessionid=sesionid, type="delete")