Merge pull request #886 from taigaio/emit-events-fix
Fix sending events accidentally before commitremotes/origin/issue/4795/notification_even_they_are_disabled
commit
3d1d60db32
|
@ -50,17 +50,21 @@ class EventsPushBackend(base.BaseEventsPushBackend):
|
||||||
|
|
||||||
def emit_event(self, message:str, *, routing_key:str, channel:str="events"):
|
def emit_event(self, message:str, *, routing_key:str, channel:str="events"):
|
||||||
connection = _make_rabbitmq_connection(self.url)
|
connection = _make_rabbitmq_connection(self.url)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
rchannel = connection.channel()
|
connection.connect()
|
||||||
message = AmqpMessage(message)
|
except ConnectionRefusedError:
|
||||||
|
log.error("EventsPushBackend: Unable to connect with RabbitMQ at {}".format(self.url),
|
||||||
|
exc_info=True)
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
message = AmqpMessage(message)
|
||||||
|
rchannel = connection.channel()
|
||||||
|
|
||||||
rchannel.exchange_declare(exchange=channel, type="topic", auto_delete=True)
|
rchannel.exchange_declare(exchange=channel, type="topic", auto_delete=True)
|
||||||
rchannel.basic_publish(message, routing_key=routing_key, exchange=channel)
|
rchannel.basic_publish(message, routing_key=routing_key, exchange=channel)
|
||||||
rchannel.close()
|
rchannel.close()
|
||||||
|
except Exception:
|
||||||
except Exception:
|
log.error("EventsPushBackend: Unhandled exception",
|
||||||
log.error("Unhandled exception", exc_info=True)
|
exc_info=True)
|
||||||
|
finally:
|
||||||
finally:
|
connection.close()
|
||||||
connection.close()
|
|
||||||
|
|
|
@ -17,10 +17,9 @@
|
||||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
|
||||||
import json
|
|
||||||
import collections
|
import collections
|
||||||
|
|
||||||
from django.contrib.contenttypes.models import ContentType
|
from django.db import connection
|
||||||
|
|
||||||
from taiga.base.utils import json
|
from taiga.base.utils import json
|
||||||
from taiga.base.utils.db import get_typename_for_model_instance
|
from taiga.base.utils.db import get_typename_for_model_instance
|
||||||
|
@ -39,7 +38,8 @@ watched_types = set([
|
||||||
|
|
||||||
|
|
||||||
def emit_event(data:dict, routing_key:str, *,
|
def emit_event(data:dict, routing_key:str, *,
|
||||||
sessionid:str=None, channel:str="events"):
|
sessionid:str=None, channel:str="events",
|
||||||
|
on_commit:bool=True):
|
||||||
if not sessionid:
|
if not sessionid:
|
||||||
sessionid = mw.get_current_session_id()
|
sessionid = mw.get_current_session_id()
|
||||||
|
|
||||||
|
@ -47,9 +47,14 @@ def emit_event(data:dict, routing_key:str, *,
|
||||||
"data": data}
|
"data": data}
|
||||||
|
|
||||||
backend = backends.get_events_backend()
|
backend = backends.get_events_backend()
|
||||||
return backend.emit_event(message=json.dumps(data),
|
|
||||||
routing_key=routing_key,
|
def backend_emit_event():
|
||||||
channel=channel)
|
backend.emit_event(message=json.dumps(data), routing_key=routing_key, channel=channel)
|
||||||
|
|
||||||
|
if on_commit:
|
||||||
|
connection.on_commit(backend_emit_event)
|
||||||
|
else:
|
||||||
|
backend_emit_event()
|
||||||
|
|
||||||
|
|
||||||
def emit_event_for_model(obj, *, type:str="change", channel:str="events",
|
def emit_event_for_model(obj, *, type:str="change", channel:str="events",
|
||||||
|
|
|
@ -33,4 +33,4 @@ class Command(BaseCommand):
|
||||||
"desc": options["description"],
|
"desc": options["description"],
|
||||||
}
|
}
|
||||||
routing_key = "notifications"
|
routing_key = "notifications"
|
||||||
emit_event(data, routing_key)
|
emit_event(data, routing_key, on_commit=False)
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
from django.db.models import signals
|
from django.db.models import signals
|
||||||
from django.db import connection
|
|
||||||
|
|
||||||
from django.dispatch import receiver
|
from django.dispatch import receiver
|
||||||
|
|
||||||
|
@ -43,8 +42,7 @@ def on_save_any_model(sender, instance, created, **kwargs):
|
||||||
if created:
|
if created:
|
||||||
type = "create"
|
type = "create"
|
||||||
|
|
||||||
emit_event = lambda: events.emit_event_for_model(instance, sessionid=sesionid, type=type)
|
events.emit_event_for_model(instance, sessionid=sesionid, type=type)
|
||||||
connection.on_commit(emit_event)
|
|
||||||
|
|
||||||
|
|
||||||
def on_delete_any_model(sender, instance, **kwargs):
|
def on_delete_any_model(sender, instance, **kwargs):
|
||||||
|
@ -57,5 +55,4 @@ def on_delete_any_model(sender, instance, **kwargs):
|
||||||
|
|
||||||
sesionid = mw.get_current_session_id()
|
sesionid = mw.get_current_session_id()
|
||||||
|
|
||||||
emit_event = lambda: events.emit_event_for_model(instance, sessionid=sesionid, type="delete")
|
events.emit_event_for_model(instance, sessionid=sesionid, type="delete")
|
||||||
connection.on_commit(emit_event)
|
|
||||||
|
|
Loading…
Reference in New Issue