External applications support
parent
a3d996bf6b
commit
bd09e23b61
|
@ -16,6 +16,7 @@
|
|||
- Now users can watch public issues, tasks and user stories.
|
||||
- Add endpoints to show the watchers list for issues, tasks and user stories.
|
||||
- Add headers to allow threading for notification emails about changes to issues, tasks, user stories, and wiki pages. (thanks to [@brett](https://github.com/brettp)).
|
||||
- Add externall apps: now Taiga can integrate with hundreds of applications and service.
|
||||
- i18n.
|
||||
- Add polish (pl) translation.
|
||||
- Add portuguese (Brazil) (pt_BR) translation.
|
||||
|
|
|
@ -31,6 +31,7 @@ premailer==2.8.1
|
|||
django-transactional-cleanup==0.1.14
|
||||
lxml==3.4.1
|
||||
git+https://github.com/Xof/django-pglocks.git@dbb8d7375066859f897604132bd437832d2014ea
|
||||
pyjwkest==1.0.3
|
||||
|
||||
# Comment it if you are using python >= 3.4
|
||||
enum34==1.0
|
||||
|
|
|
@ -266,6 +266,7 @@ INSTALLED_APPS = [
|
|||
"taiga.front",
|
||||
"taiga.users",
|
||||
"taiga.userstorage",
|
||||
"taiga.external_apps",
|
||||
"taiga.projects",
|
||||
"taiga.projects.references",
|
||||
"taiga.projects.custom_attributes",
|
||||
|
@ -384,6 +385,9 @@ REST_FRAMEWORK = {
|
|||
|
||||
# Mainly used for api debug.
|
||||
"taiga.auth.backends.Session",
|
||||
|
||||
# Application tokens auth
|
||||
"taiga.external_apps.auth_backends.Token",
|
||||
),
|
||||
"DEFAULT_THROTTLE_CLASSES": (
|
||||
"taiga.base.throttling.AnonRateThrottle",
|
||||
|
|
|
@ -31,9 +31,11 @@ from .viewsets import ModelCrudViewSet
|
|||
from .viewsets import ModelUpdateRetrieveViewSet
|
||||
from .viewsets import GenericViewSet
|
||||
from .viewsets import ReadOnlyListViewSet
|
||||
from .viewsets import ModelRetrieveViewSet
|
||||
|
||||
__all__ = ["ModelCrudViewSet",
|
||||
"ModelListViewSet",
|
||||
"ModelUpdateRetrieveViewSet",
|
||||
"GenericViewSet",
|
||||
"ReadOnlyListViewSet"]
|
||||
"ReadOnlyListViewSet",
|
||||
"ModelRetrieveViewSet"]
|
||||
|
|
|
@ -167,3 +167,7 @@ class ModelUpdateRetrieveViewSet(mixins.UpdateModelMixin,
|
|||
mixins.RetrieveModelMixin,
|
||||
GenericViewSet):
|
||||
pass
|
||||
|
||||
class ModelRetrieveViewSet(mixins.RetrieveModelMixin,
|
||||
GenericViewSet):
|
||||
pass
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
|
||||
# Copyright (C) 2014 Jesús Espino <jespinog@gmail.com>
|
||||
# Copyright (C) 2014 David Barragán <bameda@dbarragan.com>
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from django.contrib import admin
|
||||
|
||||
from . import models
|
||||
|
||||
|
||||
class ApplicationAdmin(admin.ModelAdmin):
|
||||
readonly_fields=("id",)
|
||||
|
||||
admin.site.register(models.Application, ApplicationAdmin)
|
||||
|
||||
|
||||
class ApplicationTokenAdmin(admin.ModelAdmin):
|
||||
readonly_fields=("token",)
|
||||
search_fields = ("user__username", "user__full_name", "user__email", "application__name")
|
||||
|
||||
admin.site.register(models.ApplicationToken, ApplicationTokenAdmin)
|
|
@ -0,0 +1,104 @@
|
|||
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
|
||||
# Copyright (C) 2014 Jesús Espino <jespinog@gmail.com>
|
||||
# Copyright (C) 2014 David Barragán <bameda@dbarragan.com>
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from . import serializers
|
||||
from . import models
|
||||
from . import permissions
|
||||
from . import services
|
||||
|
||||
from taiga.base import response
|
||||
from taiga.base import exceptions as exc
|
||||
from taiga.base.api import ModelCrudViewSet, ModelRetrieveViewSet
|
||||
from taiga.base.api.utils import get_object_or_404
|
||||
from taiga.base.decorators import list_route, detail_route
|
||||
|
||||
from django.db import transaction
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
|
||||
class Application(ModelRetrieveViewSet):
|
||||
serializer_class = serializers.ApplicationSerializer
|
||||
permission_classes = (permissions.ApplicationPermission,)
|
||||
model = models.Application
|
||||
|
||||
@detail_route(methods=["GET"])
|
||||
def token(self, request, *args, **kwargs):
|
||||
if self.request.user.is_anonymous():
|
||||
raise exc.NotAuthenticated(_("Authentication required"))
|
||||
|
||||
application = get_object_or_404(models.Application, **kwargs)
|
||||
self.check_permissions(request, 'token', request.user)
|
||||
try:
|
||||
application_token = models.ApplicationToken.objects.get(user=request.user, application=application)
|
||||
application_token.update_auth_code()
|
||||
application_token.state = request.GET.get("state", None)
|
||||
application_token.save()
|
||||
|
||||
except models.ApplicationToken.DoesNotExist:
|
||||
application_token = models.ApplicationToken(
|
||||
user=request.user,
|
||||
application=application
|
||||
)
|
||||
|
||||
auth_code_data = serializers.ApplicationTokenSerializer(application_token).data
|
||||
return response.Ok(auth_code_data)
|
||||
|
||||
|
||||
class ApplicationToken(ModelCrudViewSet):
|
||||
serializer_class = serializers.ApplicationTokenSerializer
|
||||
permission_classes = (permissions.ApplicationTokenPermission,)
|
||||
|
||||
def get_queryset(self):
|
||||
if self.request.user.is_anonymous():
|
||||
raise exc.NotAuthenticated(_("Authentication required"))
|
||||
|
||||
return models.ApplicationToken.objects.filter(user=self.request.user)
|
||||
|
||||
@list_route(methods=["POST"])
|
||||
def authorize(self, request, pk=None):
|
||||
if self.request.user.is_anonymous():
|
||||
raise exc.NotAuthenticated(_("Authentication required"))
|
||||
|
||||
application_id = request.DATA.get("application", None)
|
||||
state = request.DATA.get("state", None)
|
||||
application_token = services.authorize_token(application_id, request.user, state)
|
||||
|
||||
auth_code_data = serializers.AuthorizationCodeSerializer(application_token).data
|
||||
return response.Ok(auth_code_data)
|
||||
|
||||
@list_route(methods=["POST"])
|
||||
def validate(self, request, pk=None):
|
||||
application_id = request.DATA.get("application", None)
|
||||
auth_code = request.DATA.get("auth_code", None)
|
||||
state = request.DATA.get("state", None)
|
||||
application_token = get_object_or_404(models.ApplicationToken,
|
||||
application__id=application_id,
|
||||
auth_code=auth_code,
|
||||
state=state)
|
||||
|
||||
application_token.generate_token()
|
||||
application_token.save()
|
||||
|
||||
access_token_data = serializers.AccessTokenSerializer(application_token).data
|
||||
return response.Ok(access_token_data)
|
||||
|
||||
# POST method disabled
|
||||
def create(self, *args, **kwargs):
|
||||
raise exc.NotSupported()
|
||||
|
||||
# PATCH and PUT methods disabled
|
||||
def update(self, *args, **kwargs):
|
||||
raise exc.NotSupported()
|
|
@ -0,0 +1,40 @@
|
|||
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
|
||||
# Copyright (C) 2014 Jesús Espino <jespinog@gmail.com>
|
||||
# Copyright (C) 2014 David Barragán <bameda@dbarragan.com>
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import re
|
||||
|
||||
from taiga.base.api.authentication import BaseAuthentication
|
||||
|
||||
from . import services
|
||||
|
||||
class Token(BaseAuthentication):
|
||||
auth_rx = re.compile(r"^Application (.+)$")
|
||||
|
||||
def authenticate(self, request):
|
||||
if "HTTP_AUTHORIZATION" not in request.META:
|
||||
return None
|
||||
|
||||
token_rx_match = self.auth_rx.search(request.META["HTTP_AUTHORIZATION"])
|
||||
if not token_rx_match:
|
||||
return None
|
||||
|
||||
token = token_rx_match.group(1)
|
||||
user = services.get_user_for_application_token(token)
|
||||
|
||||
return (user, token)
|
||||
|
||||
def authenticate_header(self, request):
|
||||
return 'Bearer realm="api"'
|
|
@ -0,0 +1,30 @@
|
|||
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
|
||||
# Copyright (C) 2014 Jesús Espino <jespinog@gmail.com>
|
||||
# Copyright (C) 2014 David Barragán <bameda@dbarragan.com>
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from Crypto.PublicKey import RSA
|
||||
from jwkest.jwk import SYMKey
|
||||
from jwkest.jwe import JWE
|
||||
|
||||
|
||||
def encrypt(content, key):
|
||||
sym_key = SYMKey(key=key, alg="A128KW")
|
||||
jwe = JWE(content, alg="A128KW", enc="A256GCM")
|
||||
return jwe.encrypt([sym_key])
|
||||
|
||||
|
||||
def decrypt(content, key):
|
||||
sym_key = SYMKey(key=key, alg="A128KW")
|
||||
return JWE().decrypt(content, keys=[sym_key])
|
|
@ -0,0 +1,52 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from django.db import models, migrations
|
||||
from django.conf import settings
|
||||
import taiga.external_apps.models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='Application',
|
||||
fields=[
|
||||
('id', models.CharField(serialize=False, unique=True, max_length=255, default=taiga.external_apps.models._generate_uuid, primary_key=True)),
|
||||
('name', models.CharField(verbose_name='name', max_length=255)),
|
||||
('icon_url', models.TextField(null=True, blank=True, verbose_name='Icon url')),
|
||||
('web', models.CharField(null=True, blank=True, max_length=255, verbose_name='web')),
|
||||
('description', models.TextField(null=True, blank=True, verbose_name='description')),
|
||||
('next_url', models.TextField(verbose_name='Next url')),
|
||||
('key', models.TextField(verbose_name='secret key for cyphering the application tokens')),
|
||||
],
|
||||
options={
|
||||
'verbose_name_plural': 'applications',
|
||||
'verbose_name': 'application',
|
||||
'ordering': ['name'],
|
||||
},
|
||||
bases=(models.Model,),
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='ApplicationToken',
|
||||
fields=[
|
||||
('id', models.AutoField(serialize=False, auto_created=True, verbose_name='ID', primary_key=True)),
|
||||
('auth_code', models.CharField(null=True, blank=True, max_length=255, default=None)),
|
||||
('token', models.CharField(null=True, blank=True, max_length=255, default=None)),
|
||||
('state', models.CharField(null=True, blank=True, max_length=255, default='')),
|
||||
('application', models.ForeignKey(verbose_name='application', related_name='application_tokens', to='external_apps.Application')),
|
||||
('user', models.ForeignKey(verbose_name='user', related_name='application_tokens', to=settings.AUTH_USER_MODEL)),
|
||||
],
|
||||
options={
|
||||
},
|
||||
bases=(models.Model,),
|
||||
),
|
||||
migrations.AlterUniqueTogether(
|
||||
name='applicationtoken',
|
||||
unique_together=set([('application', 'user')]),
|
||||
),
|
||||
]
|
|
@ -0,0 +1,85 @@
|
|||
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
|
||||
# Copyright (C) 2014 Jesús Espino <jespinog@gmail.com>
|
||||
# Copyright (C) 2014 David Barragán <bameda@dbarragan.com>
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from django.conf import settings
|
||||
from django.db import models
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from . import services
|
||||
|
||||
import uuid
|
||||
|
||||
def _generate_uuid():
|
||||
return str(uuid.uuid1())
|
||||
|
||||
|
||||
class Application(models.Model):
|
||||
id = models.CharField(primary_key=True, max_length=255, unique=True, default=_generate_uuid)
|
||||
|
||||
name = models.CharField(max_length=255, null=False, blank=False,
|
||||
verbose_name=_("name"))
|
||||
|
||||
icon_url = models.TextField(null=True, blank=True, verbose_name=_("Icon url"))
|
||||
web = models.CharField(max_length=255, null=True, blank=True, verbose_name=_("web"))
|
||||
description = models.TextField(null=True, blank=True, verbose_name=_("description"))
|
||||
|
||||
next_url = models.TextField(null=False, blank=False, verbose_name=_("Next url"))
|
||||
|
||||
key = models.TextField(null=False, blank=False, verbose_name=_("secret key for cyphering the application tokens"))
|
||||
|
||||
class Meta:
|
||||
verbose_name = "application"
|
||||
verbose_name_plural = "applications"
|
||||
ordering = ["name"]
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
|
||||
class ApplicationToken(models.Model):
|
||||
user = models.ForeignKey(settings.AUTH_USER_MODEL, blank=False, null=False,
|
||||
related_name="application_tokens",
|
||||
verbose_name=_("user"))
|
||||
|
||||
application = models.ForeignKey("Application", blank=False, null=False,
|
||||
related_name="application_tokens",
|
||||
verbose_name=_("application"))
|
||||
|
||||
auth_code = models.CharField(max_length=255, null=True, blank=True, default=None)
|
||||
token = models.CharField(max_length=255, null=True, blank=True, default=None)
|
||||
# An unguessable random string. It is used to protect against cross-site request forgery attacks.
|
||||
state = models.CharField(max_length=255, null=True, blank=True, default="")
|
||||
|
||||
class Meta:
|
||||
unique_together = ("application", "user",)
|
||||
|
||||
def __str__(self):
|
||||
return "{application}: {user} - {token}".format(application=self.application.name, user=self.user.get_full_name(), token=self.token)
|
||||
|
||||
@property
|
||||
def cyphered_token(self):
|
||||
return services.cypher_token(self)
|
||||
|
||||
@property
|
||||
def next_url(self):
|
||||
return "{url}?auth_code={auth_code}".format(url=self.application.next_url, auth_code=self.auth_code)
|
||||
|
||||
def update_auth_code(self):
|
||||
self.auth_code = _generate_uuid()
|
||||
|
||||
def generate_token(self):
|
||||
self.auth_code = None
|
||||
self.token = _generate_uuid()
|
|
@ -0,0 +1,43 @@
|
|||
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
|
||||
# Copyright (C) 2014 Jesús Espino <jespinog@gmail.com>
|
||||
# Copyright (C) 2014 David Barragán <bameda@dbarragan.com>
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from taiga.base.api.permissions import TaigaResourcePermission
|
||||
from taiga.base.api.permissions import IsAuthenticated
|
||||
from taiga.base.api.permissions import PermissionComponent
|
||||
|
||||
|
||||
class ApplicationPermission(TaigaResourcePermission):
|
||||
retrieve_perms = IsAuthenticated()
|
||||
token_perms = IsAuthenticated()
|
||||
list_perms = IsAuthenticated()
|
||||
|
||||
|
||||
class CanUseToken(PermissionComponent):
|
||||
def check_permissions(self, request, view, obj=None):
|
||||
if not obj:
|
||||
return False
|
||||
|
||||
return request.user == obj.user
|
||||
|
||||
|
||||
class ApplicationTokenPermission(TaigaResourcePermission):
|
||||
retrieve_perms = IsAuthenticated() & CanUseToken()
|
||||
by_application_perms = IsAuthenticated()
|
||||
create_perms = IsAuthenticated()
|
||||
update_perms = IsAuthenticated() & CanUseToken()
|
||||
partial_update_perms = IsAuthenticated() & CanUseToken()
|
||||
destroy_perms = IsAuthenticated() & CanUseToken()
|
||||
list_perms = IsAuthenticated()
|
|
@ -0,0 +1,56 @@
|
|||
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
|
||||
# Copyright (C) 2014 Jesús Espino <jespinog@gmail.com>
|
||||
# Copyright (C) 2014 David Barragán <bameda@dbarragan.com>
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import json
|
||||
|
||||
from taiga.base.api import serializers
|
||||
|
||||
from . import models
|
||||
from . import services
|
||||
|
||||
from django.utils.translation import ugettext as _
|
||||
|
||||
|
||||
class ApplicationSerializer(serializers.ModelSerializer):
|
||||
class Meta:
|
||||
model = models.Application
|
||||
fields = ("id", "name", "web", "description", "icon_url")
|
||||
|
||||
|
||||
class ApplicationTokenSerializer(serializers.ModelSerializer):
|
||||
cyphered_token = serializers.CharField(source="cyphered_token", read_only=True)
|
||||
next_url = serializers.CharField(source="next_url", read_only=True)
|
||||
application = ApplicationSerializer(read_only=True)
|
||||
|
||||
class Meta:
|
||||
model = models.ApplicationToken
|
||||
fields = ("user", "id", "application", "auth_code", "next_url")
|
||||
|
||||
|
||||
class AuthorizationCodeSerializer(serializers.ModelSerializer):
|
||||
next_url = serializers.CharField(source="next_url", read_only=True)
|
||||
class Meta:
|
||||
model = models.ApplicationToken
|
||||
fields = ("auth_code", "state", "next_url")
|
||||
|
||||
|
||||
class AccessTokenSerializer(serializers.ModelSerializer):
|
||||
cyphered_token = serializers.CharField(source="cyphered_token", read_only=True)
|
||||
next_url = serializers.CharField(source="next_url", read_only=True)
|
||||
|
||||
class Meta:
|
||||
model = models.ApplicationToken
|
||||
fields = ("cyphered_token", )
|
|
@ -0,0 +1,54 @@
|
|||
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
|
||||
# Copyright (C) 2014 Jesús Espino <jespinog@gmail.com>
|
||||
# Copyright (C) 2014 David Barragán <bameda@dbarragan.com>
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
from taiga.base import exceptions as exc
|
||||
from taiga.base.api.utils import get_object_or_404
|
||||
|
||||
from django.apps import apps
|
||||
from django.utils.translation import ugettext as _
|
||||
|
||||
from . import encryption
|
||||
|
||||
import json
|
||||
|
||||
def get_user_for_application_token(token:str) -> object:
|
||||
"""
|
||||
Given an application token it tries to find an associated user
|
||||
"""
|
||||
app_token = apps.get_model("external_apps", "ApplicationToken").objects.filter(token=token).first()
|
||||
if not app_token:
|
||||
raise exc.NotAuthenticated(_("Invalid token"))
|
||||
return app_token.user
|
||||
|
||||
|
||||
def authorize_token(application_id:int, user:object, state:str) -> object:
|
||||
ApplicationToken = apps.get_model("external_apps", "ApplicationToken")
|
||||
Application = apps.get_model("external_apps", "Application")
|
||||
application = get_object_or_404(Application, id=application_id)
|
||||
token, _ = ApplicationToken.objects.get_or_create(user=user, application=application)
|
||||
token.update_auth_code()
|
||||
token.state = state
|
||||
token.save()
|
||||
return token
|
||||
|
||||
|
||||
def cypher_token(application_token:object) -> str:
|
||||
content = {
|
||||
"token": application_token.token
|
||||
}
|
||||
|
||||
return encryption.encrypt(json.dumps(content), application_token.application.key)
|
|
@ -213,6 +213,13 @@ router.register(r"importer", ProjectImporterViewSet, base_name="importer")
|
|||
router.register(r"exporter", ProjectExporterViewSet, base_name="exporter")
|
||||
|
||||
|
||||
# External apps
|
||||
from taiga.external_apps.api import Application, ApplicationToken
|
||||
router.register(r"applications", Application, base_name="applications")
|
||||
router.register(r"application-tokens", ApplicationToken, base_name="application-tokens")
|
||||
|
||||
|
||||
|
||||
# Stats
|
||||
# - see taiga.stats.routers and taiga.stats.apps
|
||||
|
||||
|
|
|
@ -482,6 +482,21 @@ class HistoryEntryFactory(Factory):
|
|||
type = 1
|
||||
|
||||
|
||||
class ApplicationFactory(Factory):
|
||||
class Meta:
|
||||
model = "external_apps.Application"
|
||||
strategy = factory.CREATE_STRATEGY
|
||||
|
||||
key = "testingkey"
|
||||
|
||||
class ApplicationTokenFactory(Factory):
|
||||
class Meta:
|
||||
model = "external_apps.ApplicationToken"
|
||||
strategy = factory.CREATE_STRATEGY
|
||||
|
||||
application = factory.SubFactory("tests.factories.ApplicationFactory")
|
||||
user = factory.SubFactory("tests.factories.UserFactory")
|
||||
|
||||
def create_issue(**kwargs):
|
||||
"Create an issue and along with its dependencies."
|
||||
owner = kwargs.pop("owner", None)
|
||||
|
|
|
@ -0,0 +1,145 @@
|
|||
from django.core.urlresolvers import reverse
|
||||
|
||||
from taiga.base.utils import json
|
||||
from tests import factories as f
|
||||
from tests.utils import helper_test_http_method, disconnect_signals, reconnect_signals
|
||||
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
|
||||
def setup_module(module):
|
||||
disconnect_signals()
|
||||
|
||||
|
||||
def teardown_module(module):
|
||||
reconnect_signals()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def data():
|
||||
m = type("Models", (object,), {})
|
||||
m.registered_user = f.UserFactory.create()
|
||||
m.token = f.ApplicationTokenFactory(state="random-state")
|
||||
m.registered_user_with_token = m.token.user
|
||||
return m
|
||||
|
||||
|
||||
def test_application_tokens_create(client, data):
|
||||
url = reverse('application-tokens-list')
|
||||
|
||||
users = [
|
||||
None,
|
||||
data.registered_user,
|
||||
data.registered_user_with_token
|
||||
]
|
||||
|
||||
data = json.dumps({"application": data.token.application.id})
|
||||
results = helper_test_http_method(client, "post", url, data, users)
|
||||
assert results == [405, 405, 405]
|
||||
|
||||
|
||||
def test_applications_retrieve_token(client, data):
|
||||
url=reverse('applications-token', kwargs={"pk": data.token.application.id})
|
||||
|
||||
users = [
|
||||
None,
|
||||
data.registered_user,
|
||||
data.registered_user_with_token
|
||||
]
|
||||
|
||||
results = helper_test_http_method(client, "get", url, None, users)
|
||||
assert results == [401, 200, 200]
|
||||
|
||||
|
||||
def test_application_tokens_retrieve(client, data):
|
||||
url = reverse('application-tokens-detail', kwargs={"pk": data.token.id})
|
||||
|
||||
users = [
|
||||
None,
|
||||
data.registered_user,
|
||||
data.registered_user_with_token
|
||||
]
|
||||
|
||||
results = helper_test_http_method(client, "get", url, None, users)
|
||||
assert results == [401, 404, 200]
|
||||
|
||||
|
||||
def test_application_tokens_authorize(client, data):
|
||||
url=reverse('application-tokens-authorize')
|
||||
|
||||
users = [
|
||||
None,
|
||||
data.registered_user,
|
||||
data.registered_user_with_token
|
||||
]
|
||||
|
||||
data = json.dumps({
|
||||
"application": data.token.application.id,
|
||||
"state": "random-state-123123",
|
||||
})
|
||||
|
||||
results = helper_test_http_method(client, "post", url, data, users)
|
||||
assert results == [401, 200, 200]
|
||||
|
||||
|
||||
def test_application_tokens_validate(client, data):
|
||||
url=reverse('application-tokens-validate')
|
||||
|
||||
users = [
|
||||
None,
|
||||
data.registered_user,
|
||||
data.registered_user_with_token
|
||||
]
|
||||
|
||||
data = json.dumps({
|
||||
"application": data.token.application.id,
|
||||
"key": data.token.application.key,
|
||||
"auth_code": data.token.auth_code,
|
||||
"state": data.token.state
|
||||
})
|
||||
|
||||
results = helper_test_http_method(client, "post", url, data, users)
|
||||
assert results == [200, 200, 200]
|
||||
|
||||
|
||||
def test_application_tokens_update(client, data):
|
||||
url = reverse('application-tokens-detail', kwargs={"pk": data.token.id})
|
||||
|
||||
users = [
|
||||
None,
|
||||
data.registered_user,
|
||||
data.registered_user_with_token
|
||||
]
|
||||
|
||||
patch_data = json.dumps({"application": data.token.application.id})
|
||||
results = helper_test_http_method(client, "patch", url, patch_data, users)
|
||||
assert results == [405, 405, 405]
|
||||
|
||||
|
||||
def test_application_tokens_delete(client, data):
|
||||
url = reverse('application-tokens-detail', kwargs={"pk": data.token.id})
|
||||
|
||||
users = [
|
||||
None,
|
||||
data.registered_user,
|
||||
data.registered_user_with_token
|
||||
]
|
||||
|
||||
results = helper_test_http_method(client, "delete", url, None, users)
|
||||
assert results == [401, 403, 204]
|
||||
|
||||
|
||||
def test_application_tokens_list(client, data):
|
||||
url = reverse('application-tokens-list')
|
||||
|
||||
users = [
|
||||
None,
|
||||
data.registered_user,
|
||||
data.registered_user_with_token
|
||||
]
|
||||
|
||||
results = helper_test_http_method(client, "get", url, None, users)
|
||||
assert results == [401, 200, 200]
|
|
@ -0,0 +1,102 @@
|
|||
from django.core.urlresolvers import reverse
|
||||
|
||||
from taiga.external_apps import encryption
|
||||
from taiga.external_apps import models
|
||||
|
||||
|
||||
from .. import factories as f
|
||||
|
||||
import json
|
||||
import pytest
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
|
||||
def test_own_tokens_listing(client):
|
||||
user_1 = f.UserFactory.create()
|
||||
user_2 = f.UserFactory.create()
|
||||
token_1 = f.ApplicationTokenFactory(user=user_1)
|
||||
token_2 = f.ApplicationTokenFactory(user=user_2)
|
||||
url = reverse("application-tokens-list")
|
||||
client.login(user_1)
|
||||
response = client.json.get(url)
|
||||
assert response.status_code == 200
|
||||
assert len(response.data) == 1
|
||||
assert response.data[0].get("id") == token_1.id
|
||||
assert response.data[0].get("application").get("id") == token_1.application.id
|
||||
|
||||
|
||||
def test_retrieve_existing_token_for_application(client):
|
||||
token = f.ApplicationTokenFactory()
|
||||
url = reverse("applications-token", args=[token.application.id])
|
||||
client.login(token.user)
|
||||
response = client.json.get(url)
|
||||
assert response.status_code == 200
|
||||
assert response.data.get("application").get("id") == token.application.id
|
||||
|
||||
|
||||
|
||||
def test_retrieve_unexisting_token_for_application(client):
|
||||
user = f.UserFactory.create()
|
||||
url = reverse("applications-token", args=[-1])
|
||||
client.login(user)
|
||||
response = client.json.get(url)
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
def test_token_authorize(client):
|
||||
user = f.UserFactory.create()
|
||||
application = f.ApplicationFactory()
|
||||
url = reverse("application-tokens-authorize")
|
||||
client.login(user)
|
||||
|
||||
data = json.dumps({
|
||||
"application": application.id,
|
||||
"state": "random-state"
|
||||
})
|
||||
|
||||
response = client.json.post(url, data)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.data["state"] == "random-state"
|
||||
auth_code_1 = response.data["auth_code"]
|
||||
|
||||
response = client.json.post(url, data)
|
||||
assert response.status_code == 200
|
||||
assert response.data["state"] == "random-state"
|
||||
auth_code_2 = response.data["auth_code"]
|
||||
assert auth_code_1 != auth_code_2
|
||||
|
||||
|
||||
def test_token_authorize_invalid_app(client):
|
||||
user = f.UserFactory.create()
|
||||
url = reverse("application-tokens-authorize")
|
||||
client.login(user)
|
||||
|
||||
data = json.dumps({
|
||||
"application": 33,
|
||||
"state": "random-state"
|
||||
})
|
||||
|
||||
response = client.json.post(url, data)
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
def test_token_validate(client):
|
||||
user = f.UserFactory.create()
|
||||
application = f.ApplicationFactory(next_url="http://next.url")
|
||||
token = f.ApplicationTokenFactory(auth_code="test-auth-code", state="test-state", application=application)
|
||||
url = reverse("application-tokens-validate")
|
||||
client.login(user)
|
||||
|
||||
data = {
|
||||
"application": token.application.id,
|
||||
"auth_code": "test-auth-code",
|
||||
"state": "test-state"
|
||||
}
|
||||
response = client.json.post(url, json.dumps(data))
|
||||
assert response.status_code == 200
|
||||
|
||||
token = models.ApplicationToken.objects.get(id=token.id)
|
||||
decyphered_token = encryption.decrypt(response.data["cyphered_token"], token.application.key)[0]
|
||||
decyphered_token = json.loads(decyphered_token.decode("utf-8"))
|
||||
assert decyphered_token["token"] == token.token
|
Loading…
Reference in New Issue