From 0e414267db2437ae4c75ff607536657aaa913cb5 Mon Sep 17 00:00:00 2001 From: Anler Hp Date: Thu, 26 Jun 2014 08:54:36 +0200 Subject: [PATCH] Integration with RabbitMQ and Celery First update taiga-vagrant vm if you're using it, you will have access to the rabbit management console at port 8001 of the host machine. * Defining tasks - Tasks must be defined in a `deferred` module of an app, for example, `taiga.projects.deferred` module. - Tasks must be decorated and given the name ".", for example in `taiga.projects.deferred` module: ``` from taiga.celery import app @app.task(name="projects.add") def add(x, y): return x + y ``` - Tasks should be at most just wrappers around service functions to promote re-usability of those functions from other parts of the code, say, management commands and the like. * Calling tasks Tasks should be called using one of the three functions defined in `taiga.deferred` module: - `defer`: Use this function if you need to perform some task asynchronously and GET THE RESULT back later, for example: ``` result = defer("projects.add", x=1, y=2) ... result.get() # => 3 ``` - `call_async`: Use this function when you want to fire off some task. No result is get back. For example: ``` call_async("projects.email_user", user) ``` - `apply_async`: Is the same as `call_async` but since it's a function application and you must pass the args and kwargs together as one parameter each you are allowed to pass celery-specific extra-options (but bear in mind this is not recommended!) ``` apply_async("projects.email_user", args=(user,), kwargs={}, routing_key="tasks.email") ``` --- requirements.txt | 2 ++ settings/__init__.py | 2 ++ settings/celery.py | 16 ++++++++++++++ taiga/__init__.py | 1 + taiga/celery.py | 12 +++++++++++ taiga/deferred.py | 43 +++++++++++++++++++++++++++++++++++++ taiga/services/__init__.py | 0 taiga/services/celery.py | 8 +++++++ tests/unit/test_deferred.py | 35 ++++++++++++++++++++++++++++++ 9 files changed, 119 insertions(+) create mode 100644 settings/celery.py create mode 100644 taiga/celery.py create mode 100644 taiga/deferred.py create mode 100644 taiga/services/__init__.py create mode 100644 taiga/services/celery.py create mode 100644 tests/unit/test_deferred.py diff --git a/requirements.txt b/requirements.txt index 0beaec04..32159b61 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,3 +26,5 @@ enum34==1.0 django-reversion==1.8.1 easy-thumbnails==2.0 +celery==3.1.12 +redis==2.10 diff --git a/settings/__init__.py b/settings/__init__.py index a43a344c..dd336c53 100644 --- a/settings/__init__.py +++ b/settings/__init__.py @@ -17,6 +17,8 @@ from __future__ import absolute_import, print_function import os, sys +from .celery import * + try: print("Trying import local.py settings...", file=sys.stderr) from .local import * diff --git a/settings/celery.py b/settings/celery.py new file mode 100644 index 00000000..227c52ae --- /dev/null +++ b/settings/celery.py @@ -0,0 +1,16 @@ +from kombu import Exchange, Queue + +BROKER_URL = 'amqp://guest:guest@localhost:5672//' +CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' + +CELERY_TIMEZONE = 'Europe/Madrid' +CELERY_ENABLE_UTC = True + +CELERY_DEFAULT_QUEUE = 'tasks' +CELERY_QUEUES = ( + Queue('tasks', routing_key='task.#'), + Queue('transient', routing_key='transient.#', delivery_mode=1) +) +CELERY_DEFAULT_EXCHANGE = 'tasks' +CELERY_DEFAULT_EXCHANGE_TYPE = 'topic' +CELERY_DEFAULT_ROUTING_KEY = 'task.default' diff --git a/taiga/__init__.py b/taiga/__init__.py index e69de29b..a10bec52 100644 --- a/taiga/__init__.py +++ b/taiga/__init__.py @@ -0,0 +1 @@ +from . import celery diff --git a/taiga/celery.py b/taiga/celery.py new file mode 100644 index 00000000..01a93494 --- /dev/null +++ b/taiga/celery.py @@ -0,0 +1,12 @@ +import os + +from celery import Celery + +from django.conf import settings + +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings') + +app = Celery('taiga') + +app.config_from_object('django.conf:settings') +app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, related_name="deferred") diff --git a/taiga/deferred.py b/taiga/deferred.py new file mode 100644 index 00000000..e7e9b3e8 --- /dev/null +++ b/taiga/deferred.py @@ -0,0 +1,43 @@ +from .celery import app + + +def defer(task: str, *args, **kwargs): + """Defer the execution of a task. + + Defer the execution of a task and returns a future objects with the following methods among + others: + - `failed()` Returns `True` if the task failed. + - `ready()` Returns `True` if the task has been executed. + - `forget()` Forget about the result. + - `get()` Wait until the task is ready and return its result. + - `result` When the task has been executed the result is in this attribute. + More info at Celery docs on `AsyncResult` object. + + :param task: Name of the task to execute. + + :return: A future object. + """ + return app.send_task(task, args, kwargs, routing_key="transient.deferred") + + +def call_async(task: str, *args, **kwargs): + """Run a task and ignore its result. + + This is just a star argument version of `apply_async`. + + :param task: Name of the task to execute. + :param args: Arguments for the task. + :param kwargs: Keyword arguments for the task. + """ + apply_async(task, args, kwargs) + + +def apply_async(task: str, args=None, kwargs=None, **options): + """Run a task and ignore its result. + + :param task: Name of the task to execute. + :param args: Tupple of arguments for the task. + :param kwargs: Dict of keyword arguments for the task. + :param options: Celery-specific options when running the task. See Celery docs on `apply_async` + """ + app.send_task(task, args, kwargs, **options) diff --git a/taiga/services/__init__.py b/taiga/services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/taiga/services/celery.py b/taiga/services/celery.py new file mode 100644 index 00000000..f3a3b890 --- /dev/null +++ b/taiga/services/celery.py @@ -0,0 +1,8 @@ +from celery import Celery + +from django.conf import settings + +app = Celery('taiga') + +app.config_from_object('django.conf:settings') +app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) diff --git a/tests/unit/test_deferred.py b/tests/unit/test_deferred.py new file mode 100644 index 00000000..29836421 --- /dev/null +++ b/tests/unit/test_deferred.py @@ -0,0 +1,35 @@ +from unittest import mock +from taiga.deferred import defer, call_async, apply_async + + +@mock.patch("taiga.deferred.app") +def test_defer(app): + name = "task name" + args = (1, 2) + kwargs = {"kw": "keyword argument"} + + defer(name, *args, **kwargs) + + app.send_task.assert_called_once_with(name, args, kwargs, routing_key="transient.deferred") + + +@mock.patch("taiga.deferred.app") +def test_apply_async(app): + name = "task name" + args = (1, 2) + kwargs = {"kw": "keyword argument"} + + apply_async(name, args, kwargs) + + app.send_task.assert_called_once_with(name, args, kwargs) + + +@mock.patch("taiga.deferred.app") +def test_call_async(app): + name = "task name" + args = (1, 2) + kwargs = {"kw": "keyword argument"} + + call_async(name, *args, **kwargs) + + app.send_task.assert_called_once_with(name, args, kwargs)