Taiga-events integration (realtime taiga)

remotes/origin/enhancement/email-actions
Andrey Antukh 2014-09-17 10:36:40 +02:00 committed by Jesús Espino
parent b14c8d84bf
commit 43e16c2c13
15 changed files with 291 additions and 125 deletions

View File

@ -7,6 +7,7 @@ psycopg2==2.5.4
pillow==2.5.3 pillow==2.5.3
pytz==2014.4 pytz==2014.4
six==1.8.0 six==1.8.0
amqp==1.4.6
djmail==0.9 djmail==0.9
django-pgjson==0.2.0 django-pgjson==0.2.0
djorm-pgarray==1.0.4 djorm-pgarray==1.0.4

View File

@ -87,7 +87,9 @@ DJMAIL_MAX_RETRY_NUMBER = 3
DJMAIL_TEMPLATE_EXTENSION = "jinja" DJMAIL_TEMPLATE_EXTENSION = "jinja"
# Events backend # Events backend
EVENTS_PUSH_BACKEND = "taiga.events.backends.postgresql.EventsPushBackend" # EVENTS_PUSH_BACKEND = "taiga.events.backends.postgresql.EventsPushBackend"
EVENTS_PUSH_BACKEND = "taiga.events.backends.rabbitmq.EventsPushBackend"
EVENTS_PUSH_BACKEND_OPTIONS = {"url": "//guest:guest@127.0.0.1/"}
# Message System # Message System
MESSAGE_STORAGE = "django.contrib.messages.storage.session.SessionStorage" MESSAGE_STORAGE = "django.contrib.messages.storage.session.SessionStorage"

View File

@ -0,0 +1,17 @@
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
# Copyright (C) 2014 Jesús Espino <jespinog@gmail.com>
# Copyright (C) 2014 David Barragán <bameda@dbarragan.com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
default_app_config = "taiga.events.apps.EventsAppConfig"

39
taiga/events/apps.py Normal file
View File

@ -0,0 +1,39 @@
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
# Copyright (C) 2014 Jesús Espino <jespinog@gmail.com>
# Copyright (C) 2014 David Barragán <bameda@dbarragan.com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
from django.apps import AppConfig
from django.db.models import signals
from . import signal_handlers as handlers
def connect_events_signals():
signals.post_save.connect(handlers.on_save_any_model, dispatch_uid="events_change")
signals.post_delete.connect(handlers.on_delete_any_model, dispatch_uid="events_delete")
def disconnect_events_signals():
signals.post_save.disconnect(dispatch_uid="events_change")
signals.post_delete.disconnect(dispatch_uid="events_delete")
class EventsAppConfig(AppConfig):
name = "taiga.events"
verbose_name = "Events App Config"
def ready(self):
connect_events_signals()

View File

@ -21,7 +21,7 @@ from django.conf import settings
class BaseEventsPushBackend(object, metaclass=abc.ABCMeta): class BaseEventsPushBackend(object, metaclass=abc.ABCMeta):
@abc.abstractmethod @abc.abstractmethod
def emit_event(self, message:str, *, channel:str="events"): def emit_event(self, message:str, *, routing_key:str, channel:str="events"):
pass pass

View File

@ -20,7 +20,10 @@ from . import base
class EventsPushBackend(base.BaseEventsPushBackend): class EventsPushBackend(base.BaseEventsPushBackend):
@transaction.atomic @transaction.atomic
def emit_event(self, message:str, *, channel:str="events"): def emit_event(self, message:str, *, routing_key:str, channel:str="events"):
routing_key = routing_key.replace(".", "__")
channel = "{channel}_{routing_key}".format(channel=channel,
routing_key=routing_key)
sql = "NOTIFY {channel}, %s".format(channel=channel) sql = "NOTIFY {channel}, %s".format(channel=channel)
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute(sql, [message]) cursor.execute(sql, [message])

View File

@ -0,0 +1,65 @@
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import logging
from amqp import Connection as AmqpConnection
from amqp.basic_message import Message as AmqpMessage
from urllib.parse import urlparse
from . import base
log = logging.getLogger("tagia.events")
def _make_rabbitmq_connection(url):
parse_result = urlparse(url)
# Parse host & user/password
try:
(authdata, host) = parse_result.netloc.split("@")
except Exception as e:
raise RuntimeError("Invalid url") from e
try:
(user, password) = authdata.split(":")
except Exception:
(user, password) = ("guest", "guest")
vhost = parse_result.path
return AmqpConnection(host=host, userid=user,
password=password, virtual_host=vhost)
class EventsPushBackend(base.BaseEventsPushBackend):
def __init__(self, url):
self.url = url
def emit_event(self, message:str, *, routing_key:str, channel:str="events"):
connection = _make_rabbitmq_connection(self.url)
try:
rchannel = connection.channel()
message = AmqpMessage(message)
rchannel.exchange_declare(exchange=channel, type="topic", auto_delete=True)
rchannel.basic_publish(message, routing_key=routing_key, exchange=channel)
rchannel.close()
except Exception:
log.error("Unhandled exception", exc_info=True)
finally:
connection.close()

View File

@ -1,61 +0,0 @@
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
from django.contrib.contenttypes.models import ContentType
from . import backends
# The complete list of content types
# of allowed models for change events
watched_types = (
("userstories", "userstory"),
("issues", "issue"),
)
def _get_type_for_model(model_instance):
"""
Get content type tuple from model instance.
"""
ct = ContentType.objects.get_for_model(model_instance)
return (ct.app_label, ct.model)
def emit_change_event_for_model(model_instance, sessionid:str, *,
type:str="change", channel:str="events"):
"""
Emit change event for notify of model change to
all connected frontends.
"""
content_type = _get_type_for_model(model_instance)
assert hasattr(model_instance, "project_id")
assert content_type in watched_types
assert type in ("create", "change", "delete")
project_id = model_instance.project_id
routing_key = "project.{0}".format(project_id)
data = {"type": "model-changes",
"routing_key": routing_key,
"session_id": sessionid,
"data": {
"type": type,
"matches": ".".join(content_type),
"pk": model_instance.pk}}
backend = backends.get_events_backend()
return backend.emit_event(json.dumps(data), channel="events")

101
taiga/events/events.py Normal file
View File

@ -0,0 +1,101 @@
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import collections
from django.contrib.contenttypes.models import ContentType
from taiga.base.utils import json
from . import middleware as mw
from . import backends
# The complete list of content types
# of allowed models for change events
watched_types = set([
"userstories.userstory",
"issues.issue",
"tasks.task",
"wiki.wiki_page",
"milestones.milestone",
])
def _get_type_for_model(model_instance):
"""
Get content type tuple from model instance.
"""
ct = ContentType.objects.get_for_model(model_instance)
return ".".join([ct.app_label, ct.model])
def emit_event(data:dict, routing_key:str, *,
sessionid:str=None, channel:str="events"):
if not sessionid:
sessionid = mw.get_current_session_id()
data = {"session_id": sessionid,
"data": data}
backend = backends.get_events_backend()
return backend.emit_event(message=json.dumps(data),
routing_key=routing_key,
channel=channel)
def emit_event_for_model(obj, *, type:str="change", channel:str="events",
content_type:str=None, sessionid:str=None):
"""
Sends a model change event.
"""
assert type in set(["create", "change", "delete"])
assert hasattr(obj, "project_id")
if not content_type:
content_type = _get_type_for_model(obj)
projectid = getattr(obj, "project_id")
pk = getattr(obj, "pk", None)
app_name, model_name = content_type.split(".", 1)
routing_key = "changes.project.{0}.{1}".format(projectid, app_name)
data = {"type": type,
"matches": content_type,
"pk": pk}
return emit_event(routing_key=routing_key,
channel=channel,
sessionid=sessionid,
data=data)
def emit_event_for_ids(ids, content_type:str, projectid:int, *,
type:str="change", channel:str="events", sessionid:str=None):
assert type in set(["create", "change", "delete"])
assert isinstance(ids, collections.Iterable)
assert content_type, "content_type parameter is mandatory"
app_name, model_name = content_type.split(".", 1)
routing_key = "changes.project.{0}.{1}".format(projectid, app_name)
data = {"type": type,
"matches": content_type,
"pk": ids}
return emit_event(routing_key=routing_key,
channel=channel,
sessionid=sessionid,
data=data)

View File

@ -1,53 +0,0 @@
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.db.models import signals
from django.dispatch import receiver
from . import middleware as mw
from . import changes
@receiver(signals.post_save, dispatch_uid="events_dispatcher_on_change")
def on_save_any_model(sender, instance, created, **kwargs):
# Ignore any object that can not have project_id
content_type = changes._get_type_for_model(instance)
# Ignore changes on import
if getattr(instance, '_importing', False):
return
# Ignore any other changes
if content_type not in changes.watched_types:
return
sesionid = mw.get_current_session_id()
if created:
changes.emit_change_event_for_model(instance, sesionid, type="create")
else:
changes.emit_change_event_for_model(instance, sesionid, type="change")
@receiver(signals.post_delete, dispatch_uid="events_dispatcher_on_delete")
def on_delete_any_model(sender, instance, **kwargs):
# Ignore any object that can not have project_id
content_type = changes._get_type_for_model(instance)
# Ignore any other changes
if content_type not in changes.watched_types:
return
sesionid = mw.get_current_session_id()
changes.emit_change_event_for_model(instance, sesionid, type="delete")

View File

@ -0,0 +1,34 @@
from django.db.models import signals
from django.dispatch import receiver
from . import middleware as mw
from . import events
def on_save_any_model(sender, instance, created, **kwargs):
# Ignore any object that can not have project_id
content_type = events._get_type_for_model(instance)
# Ignore any other events
if content_type not in events.watched_types:
return
sesionid = mw.get_current_session_id()
if created:
events.emit_event_for_model(instance, sessionid=sesionid, type="create")
else:
events.emit_event_for_model(instance, sessionid=sesionid, type="change")
def on_delete_any_model(sender, instance, **kwargs):
# Ignore any object that can not have project_id
content_type = events._get_type_for_model(instance)
# Ignore any other changes
if content_type not in events.watched_types:
return
sesionid = mw.get_current_session_id()
events.emit_event_for_model(instance, sessionid=sesionid, type="delete")

View File

@ -14,11 +14,13 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import random
import datetime
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from django.db import transaction from django.db import transaction
from django.utils.timezone import now from django.utils.timezone import now
from django.conf import settings from django.conf import settings
from django.contrib.webdesign import lorem_ipsum from django.contrib.webdesign import lorem_ipsum
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
@ -34,9 +36,8 @@ from taiga.projects.wiki.models import *
from taiga.projects.attachments.models import * from taiga.projects.attachments.models import *
from taiga.projects.history.services import take_snapshot from taiga.projects.history.services import take_snapshot
from taiga.events.apps import disconnect_events_signals
import random
import datetime
ATTACHMENT_SAMPLE_DATA = [ ATTACHMENT_SAMPLE_DATA = [
"taiga/projects/management/commands/sample_data", "taiga/projects/management/commands/sample_data",
@ -102,6 +103,9 @@ class Command(BaseCommand):
@transaction.atomic @transaction.atomic
def handle(self, *args, **options): def handle(self, *args, **options):
# Prevent events emission when sample data is running
disconnect_events_signals()
self.users = [User.objects.get(is_superuser=True)] self.users = [User.objects.get(is_superuser=True)]
# create users # create users

View File

@ -87,7 +87,9 @@ class UserStoryViewSet(OCCResourceMixin, HistoryResourceMixin, WatchedResourceMi
project = get_object_or_404(Project, pk=data["project_id"]) project = get_object_or_404(Project, pk=data["project_id"])
self.check_permissions(request, "bulk_update_order", project) self.check_permissions(request, "bulk_update_order", project)
services.update_userstories_order_in_bulk(data["bulk_stories"], field="backlog_order") services.update_userstories_order_in_bulk(data["bulk_stories"],
project=project,
field="backlog_order")
services.snapshot_userstories_in_bulk(data["bulk_stories"], request.user) services.snapshot_userstories_in_bulk(data["bulk_stories"], request.user)
return response.NoContent() return response.NoContent()
@ -102,7 +104,9 @@ class UserStoryViewSet(OCCResourceMixin, HistoryResourceMixin, WatchedResourceMi
project = get_object_or_404(Project, pk=data["project_id"]) project = get_object_or_404(Project, pk=data["project_id"])
self.check_permissions(request, "bulk_update_order", project) self.check_permissions(request, "bulk_update_order", project)
services.update_userstories_order_in_bulk(data["bulk_stories"], field="sprint_order") services.update_userstories_order_in_bulk(data["bulk_stories"],
project=project,
field="sprint_order")
services.snapshot_userstories_in_bulk(data["bulk_stories"], request.user) services.snapshot_userstories_in_bulk(data["bulk_stories"], request.user)
return response.NoContent() return response.NoContent()
@ -116,7 +120,9 @@ class UserStoryViewSet(OCCResourceMixin, HistoryResourceMixin, WatchedResourceMi
project = get_object_or_404(Project, pk=data["project_id"]) project = get_object_or_404(Project, pk=data["project_id"])
self.check_permissions(request, "bulk_update_order", project) self.check_permissions(request, "bulk_update_order", project)
services.update_userstories_order_in_bulk(data["bulk_stories"], field="kanban_order") services.update_userstories_order_in_bulk(data["bulk_stories"],
project=project,
field="kanban_order")
services.snapshot_userstories_in_bulk(data["bulk_stories"], request.user) services.snapshot_userstories_in_bulk(data["bulk_stories"], request.user)
return response.NoContent() return response.NoContent()

View File

@ -18,6 +18,7 @@ from django.utils import timezone
from taiga.base.utils import db, text from taiga.base.utils import db, text
from taiga.projects.history.services import take_snapshot from taiga.projects.history.services import take_snapshot
from taiga.events import events
from . import models from . import models
@ -48,7 +49,7 @@ def create_userstories_in_bulk(bulk_data, callback=None, precall=None, **additio
return userstories return userstories
def update_userstories_order_in_bulk(bulk_data:list, field:str): def update_userstories_order_in_bulk(bulk_data:list, field:str, project:object):
""" """
Update the order of some user stories. Update the order of some user stories.
`bulk_data` should be a list of tuples with the following format: `bulk_data` should be a list of tuples with the following format:
@ -61,6 +62,10 @@ def update_userstories_order_in_bulk(bulk_data:list, field:str):
user_story_ids.append(us_data["us_id"]) user_story_ids.append(us_data["us_id"])
new_order_values.append({field: us_data["order"]}) new_order_values.append({field: us_data["order"]})
events.emit_event_for_ids(ids=user_story_ids,
content_type="userstories.userstory",
projectid=project.pk)
db.update_in_bulk_with_ids(user_story_ids, new_order_values, model=models.UserStory) db.update_in_bulk_with_ids(user_story_ids, new_order_values, model=models.UserStory)

View File

@ -36,8 +36,11 @@ User Story #2
def test_update_userstories_order_in_bulk(): def test_update_userstories_order_in_bulk():
data = [{"us_id": 1, "order": 1}, {"us_id": 2, "order": 2}] data = [{"us_id": 1, "order": 1}, {"us_id": 2, "order": 2}]
project = mock.Mock()
project.pk = 1
with mock.patch("taiga.projects.userstories.services.db") as db: with mock.patch("taiga.projects.userstories.services.db") as db:
services.update_userstories_order_in_bulk(data, "backlog_order") services.update_userstories_order_in_bulk(data, "backlog_order", project)
db.update_in_bulk_with_ids.assert_called_once_with([1, 2], [{"backlog_order": 1}, {"backlog_order": 2}], db.update_in_bulk_with_ids.assert_called_once_with([1, 2], [{"backlog_order": 1}, {"backlog_order": 2}],
model=models.UserStory) model=models.UserStory)