Refactorized auth controllers.

Now controllers only have presentation logic. All domain
logic is moved to separate transactional service functions.
remotes/origin/enhancement/email-actions
Andrey Antukh 2014-04-17 00:24:23 +02:00 committed by Jesús Espino
parent 9d41a48a46
commit b7df530546
24 changed files with 424 additions and 185 deletions

View File

@ -16,3 +16,6 @@ django-jinja>=0.23
jinja2==2.7.1
pygments>=1.6
django-sites==0.4
# Uncomment it if you are using python < 3.4
#enum34==0.9.23

View File

@ -1,137 +1,109 @@
# -*- coding: utf-8 -*-
from functools import partial
from enum import Enum
from django.db.models.loading import get_model
from django.db.models import Q
from django.contrib.auth import logout, login, authenticate
from django.shortcuts import get_object_or_404
from django.utils.translation import ugettext_lazy as _
from rest_framework.response import Response
from rest_framework.permissions import AllowAny
from rest_framework import status, viewsets
from rest_framework import status
from rest_framework import viewsets
from rest_framework import serializers
from taiga.base.decorators import list_route
from taiga.domains.models import DomainMember
from taiga.domains import get_active_domain
from taiga.base.users.models import User, Role
from taiga.base.users.serializers import UserSerializer
from taiga.base import exceptions as exc
from taiga.base import auth
from taiga.users.services import get_and_validate_user
from taiga.domains.services import is_public_register_enabled_for_domain
from .serializers import (PublicRegisterSerializer,
PrivateRegisterSerializer,
PrivateGenericRegisterSerializer,
PrivateRegisterExistingSerializer)
from .serializers import PublicRegisterSerializer
from .serializers import PrivateRegisterForExistingUserSerializer
from .serializers import PrivateRegisterForNewUserSerializer
from .services import private_register_for_existing_user
from .services import private_register_for_new_user
from .services import public_register
from .services import make_auth_response_data
def _parse_data(data:dict, *, cls):
"""
Generic function for parse user data using
specified serializer on `cls` keyword parameter.
Raises: RequestValidationError exception if
some errors found when data is validated.
Returns the parsed data.
"""
serializer = cls(data=data)
if not serializer.is_valid():
raise exc.RequestValidationError(serializer.errors)
return serializer.data
# Parse public register data
parse_public_register_data = partial(_parse_data, cls=PublicRegisterSerializer)
# Parse private register data for existing user
parse_private_register_for_existing_user_data = \
partial(_parse_data, cls=PrivateRegisterForExistingUserSerializer)
# Parse private register data for new user
parse_private_register_for_new_user_data = \
partial(_parse_data, cls=PrivateRegisterForNewUserSerializer)
class RegisterTypeEnum(Enum):
new_user = 1
existing_user = 2
def parse_register_type(userdata:dict) -> str:
"""
Parses user data and detects that register type is.
It returns RegisterTypeEnum value.
"""
# Create adhoc inner serializer for avoid parse
# manually the user data.
class _serializer(serializers.Serializer):
existing = serializers.BooleanField()
instance = _serializer(data=userdata)
if not instance.is_valid():
raise exc.RequestValidationError(instance.errors)
if instance.data["existing"]:
return RegisterTypeEnum.existing_user
return RegisterTypeEnum.new_user
class AuthViewSet(viewsets.ViewSet):
permission_classes = (AllowAny,)
def _create_response(self, user):
serializer = UserSerializer(user)
response_data = serializer.data
domain = get_active_domain()
response_data['is_site_owner'] = domain.user_is_owner(user)
response_data['is_site_staff'] = domain.user_is_staff(user)
response_data["auth_token"] = auth.get_token_for_user(user)
return response_data
def _create_domain_member(self, user):
domain = get_active_domain()
if domain.members.filter(user=user).count() == 0:
domain_member = DomainMember(domain=domain, user=user, email=user.email,
is_owner=False, is_staff=False)
domain_member.save()
def _send_public_register_email(self, user):
context = {"user": user}
mbuilder = MagicMailBuilder()
email = mbuilder.public_register_user(user.email, context)
email.send()
def _public_register(self, request):
if not request.domain.public_register:
if not is_public_register_enabled_for_domain(request.domain):
raise exc.BadRequest(_("Public register is disabled for this domain."))
serializer = PublicRegisterSerializer(data=request.DATA)
if not serializer.is_valid():
raise exc.BadRequest(serializer.errors)
try:
data = parse_public_register_data(request.DATA)
user = public_register(request.domain, **data)
except exc.IntegrityError as e:
raise exc.BadRequest(e.detail)
data = serializer.data
if User.objects.filter(Q(username=data["username"]) | Q(email=data["email"])).exists():
raise exc.BadRequest(_("This username or email is already in use."))
user = User(username=data["username"],
first_name=data["first_name"],
last_name=data["last_name"],
email=data["email"])
user.set_password(data["password"])
user.save()
self._create_domain_member(user)
#self._send_public_register_email(user)
response_data = self._create_response(user)
return Response(response_data, status=status.HTTP_201_CREATED)
def _send_private_register_email(self, user, **kwargs):
context = {"user": user}
context.update(kwargs)
mbuilder = MagicMailBuilder()
email = mbuilder.private_register_user(user.email, context)
email.send()
data = make_auth_response_data(request.domain, user)
return Response(data, status=status.HTTP_201_CREATED)
def _private_register(self, request):
base_serializer = PrivateGenericRegisterSerializer(data=request.DATA)
if not base_serializer.is_valid():
raise exc.BadRequest(base_serializer.errors)
membership_model = get_model("projects", "Membership")
try:
membership = membership_model.objects.get(token=base_serializer.data["token"])
except membership_model.DoesNotExist as e:
raise exc.BadRequest(_("Invalid token")) from e
if base_serializer.data["existing"]:
serializer = PrivateRegisterExistingSerializer(data=request.DATA)
if not serializer.is_valid():
raise exc.BadRequest(serializer.errors)
user = get_object_or_404(User, username=serializer.data["username"])
if not user.check_password(serializer.data["password"]):
raise exc.BadRequest({"password": _("Incorrect password")})
register_type = parse_register_type(request.DATA)
if register_type is RegisterTypeEnum.existing_user:
data = parse_private_register_for_existing_user_data(request.DATA)
user = private_register_for_existing_user(request.domain, **data)
else:
serializer = PrivateRegisterSerializer(data=request.DATA)
if not serializer.is_valid():
raise exc.BadRequest(serializer.errors)
data = parse_private_register_for_new_user_data(request.DATA)
user = private_register_for_new_user(request.domain, **data)
data = serializer.data
if User.objects.filter(Q(username=data["username"]) | Q(email=data["email"])).exists():
raise exc.BadRequest(_("This username or email is already in use."))
user = User(username=data["username"],
first_name=data["first_name"],
last_name=data["last_name"],
email=data["email"])
user.set_password(data["password"])
user.save()
self._create_domain_member(user)
membership.user = user
membership.save()
#self._send_private_register_email(user, membership=membership)
response_data = self._create_response(user)
return Response(response_data, status=status.HTTP_201_CREATED)
data = make_auth_response_data(request.domain, user)
return Response(data, status=status.HTTP_201_CREATED)
@list_route(methods=["POST"], permission_classes=[AllowAny])
def register(self, request, **kwargs):
@ -140,7 +112,6 @@ class AuthViewSet(viewsets.ViewSet):
return self._public_register(request)
elif type == "private":
return self._private_register(request)
raise exc.BadRequest(_("invalid register type"))
# Login view: /api/v1/auth
@ -148,13 +119,6 @@ class AuthViewSet(viewsets.ViewSet):
username = request.DATA.get('username', None)
password = request.DATA.get('password', None)
try:
user = User.objects.get(username=username)
except User.DoesNotExist:
raise exc.BadRequest(_("Invalid username or password"))
if not user.check_password(password):
raise exc.BadRequest(_("Invalid username or password"))
response_data = self._create_response(user)
return Response(response_data, status=status.HTTP_200_OK)
user = get_and_validate_user(username=username, password=password)
data = make_auth_response_data(request.domain, user)
return Response(data, status=status.HTTP_200_OK)

View File

@ -14,16 +14,10 @@ class PublicRegisterSerializer(BaseRegisterSerializer):
pass
class PrivateRegisterSerializer(BaseRegisterSerializer):
pass
class PrivateGenericRegisterSerializer(serializers.Serializer):
class PrivateRegisterForNewUserSerializer(BaseRegisterSerializer):
token = serializers.CharField(max_length=255, required=True)
existing = serializers.BooleanField()
# existing = serializers.ChoiceField(choices=[("on", "on"), ("off", "off")])
class PrivateRegisterExistingSerializer(serializers.Serializer):
class PrivateRegisterForExistingUserSerializer(serializers.Serializer):
username = serializers.CharField(max_length=200)
password = serializers.CharField(min_length=4)
token = serializers.CharField(max_length=255, required=True)

179
taiga/auth/services.py Normal file
View File

@ -0,0 +1,179 @@
"""
This module contains a domain logic for authentication
process. It called services because in DDD says it.
NOTE: Python doesn't have java limitations for "everytghing
should be contained in a class". Because of that, it
not uses clasess and uses simple functions.
"""
from django.db.models.loading import get_model
from django.db.models import Q
from django.db import transaction as tx
from django.db import IntegrityError
from django.utils.translation import ugettext as _
from djmail.template_mail import MagicMailBuilder
from taiga.base import exceptions as exc
from taiga.users.serializers import UserSerializer
from taiga.users.services import get_and_validate_user
from taiga.domains.services import (create_domain_member,
is_user_exists_on_domain)
from .backends import get_token_for_user
def send_public_register_email(user) -> bool:
"""
Given a user, send public register welcome email
message to specified user.
"""
context = {"user": user}
mbuilder = MagicMailBuilder()
email = mbuilder.public_register_user(user.email, context)
return bool(email.send())
def send_private_register_email(user, **kwargs) -> bool:
"""
Given a user, send private register welcome
email message to specified user.
"""
context = {"user": user}
context.update(kwargs)
mbuilder = MagicMailBuilder()
email = mbuilder.private_register_user(user.email, context)
return bool(email.send())
def is_user_already_registred(*, username:str, email:str) -> bool:
"""
Checks if a specified user is already registred.
"""
user_model = get_model("users", "User")
qs = user_model.objects.filter(Q(username=username) |
Q(email=email))
return qs.exists()
def get_membership_by_token(token:str):
"""
Given a token, returns a membership instance
that matches with specified token.
If not matches with any membership NotFound exception
is raised.
"""
membership_model = get_model("projects", "Membership")
qs = membership_model.objects.filter(token=token)
if len(qs) == 0:
raise exc.NotFound("Token not matches any member.")
return qs[0]
@tx.atomic
def public_register(domain, *, username:str, password:str,
email:str, first_name:str, last_name:str):
"""
Given a parsed parameters, try register a new user
knowing that it follows a public register flow.
This can raise `exc.IntegrityError` exceptions in
case of conflics found.
:returns: User
"""
if is_user_already_registred(username=username, email=email):
raise exc.IntegrityError("User is already registred.")
user_model = get_model("users", "User")
user = user_model(username=username,
email=email,
first_name=first_name,
last_name=last_name)
user.set_password(password)
user.save()
if not is_user_exists_on_domain(domain, user):
create_domain_member(domain, user)
# send_public_register_email(user)
return user
@tx.atomic
def private_register_for_existing_user(domain, *, token:str, username:str, password:str):
"""
Register works not only for register users, also serves for accept
inviatations for projects as existing user.
Given a invitation token with parsed parameters, accept inviation
as existing user.
"""
user = get_and_validate_user(username=username, password=password)
membership = get_membership_by_token(token)
if not is_user_exists_on_domain(domain, user):
create_domain_member(domain, user)
membership.user = user
membership.save(update_fields=["user"])
# send_private_register_email(user)
return user
@tx.atomic
def private_register_for_new_user(domain, *, token:str, username:str, email:str,
first_name:str, last_name:str, password:str):
"""
Given a inviation token, try register new user matching
the invitation token.
"""
user_model = get_model("users", "User")
if is_user_already_registred(username=username, email=email):
raise exc.WrongArguments(_("Username or Email is already in use."))
user = user_model(username=username,
email=email,
first_name=first_name,
last_name=last_name)
user.set_password(password)
try:
user.save()
except IntegrityError:
raise exc.IntegrityError(_("Error on creating new user."))
if not is_user_exists_on_domain(domain, user):
create_domain_member(domain, user)
membership = get_membership_by_token(token)
membership.user = user
membership.save(update_fields=["user"])
return user
def make_auth_response_data(domain, user) -> dict:
"""
Given a domain and user, creates data structure
using python dict containing a representation
of the logged user.
"""
serializer = UserSerializer(user)
data = dict(serializer.data)
data['is_site_owner'] = domain.user_is_owner(user)
data['is_site_staff'] = domain.user_is_staff(user)
data["auth_token"] = get_token_for_user(user)
return data

View File

@ -14,16 +14,19 @@ from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from taiga import urls
from taiga.base import auth
from taiga.base.users.tests import create_user, create_domain
from taiga.users.tests import create_user, create_domain
from taiga.projects.tests import create_project
from taiga.domains.models import Domain, DomainMember
from taiga.projects.models import Membership
from taiga.auth.backends import Token as TokenAuthBackend
from taiga.auth.backends import get_token_for_user
class TestAuthView(viewsets.ViewSet):
authentication_classes = (auth.Token,)
authentication_classes = (TokenAuthBackend,)
permission_classes = (IsAuthenticated,)
def get(self, request, *args, **kwargs):
@ -37,6 +40,7 @@ urls.urlpatterns += patterns("",
class TokenAuthTests(test.TestCase):
fixtures = ["initial_domains.json",]
def setUp(self):
self.user1 = create_user(1)
@ -45,9 +49,9 @@ class TokenAuthTests(test.TestCase):
self.assertEqual(response.status_code, 401)
def test_token_auth_02(self):
token = auth.get_token_for_user(self.user1)
token = get_token_for_user(self.user1)
response = self.client.get(reverse("test-token-auth"),
HTTP_AUTHORIZATION="Bearer {}".format(token))
HTTP_AUTHORIZATION="Bearer {}".format(token))
self.assertEqual(response.status_code, 200)
self.assertEqual(response.content, b'"ok"')

View File

@ -14,7 +14,7 @@ from .utils.json import to_json
class BaseException(exceptions.APIException):
status_code = status.HTTP_400_BAD_REQUEST
default_detail = _('Unexpected error')
default_detail = _("Unexpected error")
def __init__(self, detail=None):
self.detail = detail or self.default_detail
@ -26,7 +26,7 @@ class NotFound(BaseException):
"""
status_code = status.HTTP_404_NOT_FOUND
default_detail = _('Not found.')
default_detail = _("Not found.")
class NotSupported(BaseException):
@ -39,7 +39,7 @@ class BadRequest(BaseException):
Exception used on bad arguments detected
on api view.
"""
default_detail = _('Wrong arguments.')
default_detail = _("Wrong arguments.")
class WrongArguments(BaseException):
@ -47,7 +47,11 @@ class WrongArguments(BaseException):
Exception used on bad arguments detected
on service. This is same as `BadRequest`.
"""
default_detail = _('Wrong arguments.')
default_detail = _("Wrong arguments.")
class RequestValidationError(BaseException):
default_detail = _("Data validation error")
class PermissionDenied(exceptions.PermissionDenied):
@ -58,6 +62,11 @@ class PermissionDenied(exceptions.PermissionDenied):
pass
class IntegrityError(BaseException):
status_code = status.HTTP_400_BAD_REQUEST
default_detail = _("Integrity Error for wrong or invalid arguments")
class PreconditionError(BaseException):
"""
Error raised on precondition method on viewset.
@ -108,20 +117,20 @@ def exception_handler(exc):
if isinstance(exc, exceptions.APIException):
headers = {}
if getattr(exc, 'auth_header', None):
headers['WWW-Authenticate'] = exc.auth_header
if getattr(exc, 'wait', None):
headers['X-Throttle-Wait-Seconds'] = '%d' % exc.wait
if getattr(exc, "auth_header", None):
headers["WWW-Authenticate"] = exc.auth_header
if getattr(exc, "wait", None):
headers["X-Throttle-Wait-Seconds"] = "%d" % exc.wait
detail = format_exception(exc)
return Response(detail, status=exc.status_code, headers=headers)
elif isinstance(exc, Http404):
return Response({'_error_message': _('Not found')},
return Response({"_error_message": _("Not found")},
status=status.HTTP_404_NOT_FOUND)
elif isinstance(exc, DjangoPermissionDenied):
return Response({'_error_message': _('Permission denied')},
return Response({"_error_message": _("Permission denied")},
status=status.HTTP_403_FORBIDDEN)
# Note: Unhandled exceptions will raise a 500 error.

View File

@ -13,7 +13,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from rest_framework import serializers
from taiga.base.users.serializers import UserSerializer
from taiga.users.serializers import UserSerializer
from .models import Domain, DomainMember

49
taiga/domains/services.py Normal file
View File

@ -0,0 +1,49 @@
"""
This module contains a domain logic for domains application.
"""
from django.db.models.loading import get_model
from django.db import transaction as tx
from django.db import IntegrityError
from taiga.base import exceptions as exc
def is_user_exists_on_domain(domain, user) -> bool:
"""
Checks if user is alredy exists on domain.
"""
return domain.members.filter(user=user).exists()
def is_public_register_enabled_for_domain(domain) -> bool:
"""
Checks if a specified domain have public register
activated.
The implementation is very simple but it encapsulates
request attribute access into more semantic function
call.
"""
return domain.public_register
@tx.atomic
def create_domain_member(domain, user):
"""
Given a domain and user, add user as member to
specified domain.
:returns: DomainMember
"""
domain_member_model = get_model("domains", "DomainMember")
try:
domain_member = domain_member_model(domain=domain, user=user,
email=user.email, is_owner=False,
is_staff=False)
domain_member.save()
except IntegrityError:
raise exc.IntegrityError("User is already member in a site")
return domain_member

View File

@ -18,7 +18,7 @@ from django.http import HttpResponse
from taiga.projects.tests import create_project
from taiga.projects.issues.tests import create_issue
from taiga.base.users.tests import create_user
from taiga.users.tests import create_user
from . import middleware as mw
from . import changes as ch

View File

@ -18,7 +18,7 @@ from django.contrib import admin
from django.contrib.contenttypes import generic
from taiga.projects.milestones.admin import MilestoneInline
from taiga.base.users.admin import RoleInline
from taiga.users.admin import RoleInline
from . import models
import reversion

View File

@ -32,8 +32,9 @@ from taiga.base import filters
from taiga.base import exceptions as exc
from taiga.base.decorators import list_route, detail_route
from taiga.base.permissions import has_project_perm
from taiga.base.api import ModelCrudViewSet, RetrieveModelMixin
from taiga.base.users.models import Role
from taiga.base.api import ModelCrudViewSet, ModelListViewSet, RetrieveModelMixin
from taiga.users.models import Role
from taiga.projects.aggregates.tags import get_all_tags
from . import serializers
from . import models

View File

@ -6,7 +6,7 @@ from django import test
from django.core import mail
from django.core.urlresolvers import reverse
from taiga.base.users.tests import create_user
from taiga.users.tests import create_user
from taiga.projects.tests import create_project, add_membership
from taiga.projects.milestones.tests import create_milestone
from taiga.projects.issues.models import Issue

View File

@ -23,7 +23,7 @@ from django.contrib.contenttypes.models import ContentType
from sampledatahelper.helper import SampleDataHelper
from taiga.base.users.models import *
from taiga.users.models import *
from taiga.projects.models import *
from taiga.projects.milestones.models import *
from taiga.projects.userstories.models import *

View File

@ -6,7 +6,7 @@ from django import test
from django.core import mail
from django.core.urlresolvers import reverse
from taiga.base.users.tests import create_user
from taiga.users.tests import create_user
from taiga.projects.tests import create_project, add_membership
from taiga.projects.milestones.models import Milestone

View File

@ -32,14 +32,19 @@ from django.utils import timezone
from picklefield.fields import PickledObjectField
from taiga.users.models import Role
from taiga.domains.models import DomainMember
from taiga.projects.userstories.models import UserStory
from taiga.base.utils.slug import slugify_uniquely
from taiga.base.utils.dicts import dict_sum
from taiga.base.users.models import Role
from . import choices
# FIXME: this should to be on choices module (?)
VIDEOCONFERENCES_CHOICES = (
('appear-in', 'AppearIn'),
('talky', 'Talky'),
)
class Membership(models.Model):
# This model stores all project memberships. Also

View File

@ -14,15 +14,14 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from os import path
from rest_framework import serializers
from taiga.base.serializers import PickleField
from taiga.base.users.models import Role
from taiga.users.models import Role
from . import models
from os import path
class AttachmentSerializer(serializers.ModelSerializer):
name = serializers.SerializerMethodField("get_name")

View File

@ -8,7 +8,7 @@ from django.core.urlresolvers import reverse
import reversion
from taiga.base.users.tests import create_user
from taiga.users.tests import create_user
from taiga.projects.tests import create_project, add_membership
from taiga.projects.milestones.tests import create_milestone
from taiga.projects.userstories.tests import create_userstory

View File

@ -7,10 +7,12 @@ from django.core.urlresolvers import reverse
from django.core import mail
from django.db.models import get_model
from taiga.base.users.tests import create_user
from taiga.users.tests import create_user
from taiga.projects.models import Project, Membership
from . import create_project, add_membership
from . import create_project
from . import add_membership
class ProfileTestCase(test.TestCase):
fixtures = ["initial_domains.json"]

View File

@ -7,12 +7,13 @@ from django.core.urlresolvers import reverse
from django.core import mail
from django.db.models import get_model
from taiga.base.users.tests import create_user
from taiga.users.tests import create_user
from taiga.projects.models import Project, Membership
from taiga.projects.issues.tests import create_issue
from taiga.projects.tasks.tests import create_task
from . import create_project, add_membership
from . import create_project
from . import add_membership
class AllProjectEventsNotificationsTestCase(test.TestCase):
fixtures = ["initial_domains.json"]

View File

@ -4,7 +4,7 @@ from django import test
from django.core import mail
from django.core.urlresolvers import reverse
from taiga.base.users.tests import create_user
from taiga.users.tests import create_user
from taiga.projects.tests import create_project, add_membership
from taiga.projects.milestones.tests import create_milestone
from taiga.projects.userstories.models import UserStory

View File

@ -1,15 +1,12 @@
# -*- coding: utf-8 -*-
import json
from django import test
from taiga.base.users.tests import create_user
from taiga.users.tests import create_user
from taiga.projects.tests import create_project
from . import create_userstory
from .. import services
from .. import models
from . import create_userstory
class UserStoriesServiceTestCase(test.TestCase):

View File

@ -6,7 +6,7 @@ from django import test
from django.core import mail
from django.core.urlresolvers import reverse
from taiga.base.users.tests import create_user
from taiga.users.tests import create_user
from taiga.projects.tests import create_project, add_membership
from taiga.projects.wiki.models import WikiPage

View File

@ -16,32 +16,37 @@
from taiga.base import routers
from taiga.auth.api import AuthViewSet
from taiga.users.api import UsersViewSet, PermissionsViewSet
from taiga.base.searches.api import SearchViewSet
from taiga.base.resolver.api import ResolverViewSet
from taiga.projects.api import (ProjectViewSet, MembershipViewSet, InvitationViewSet,
UserStoryStatusViewSet, PointsViewSet, TaskStatusViewSet,
IssueStatusViewSet, IssueTypeViewSet, PriorityViewSet,
SeverityViewSet, ProjectAdminViewSet, RolesViewSet) #, QuestionStatusViewSet)
from taiga.domains.api import DomainViewSet, DomainMembersViewSet
from taiga.projects.milestones.api import MilestoneViewSet
from taiga.projects.userstories.api import UserStoryViewSet, UserStoryAttachmentViewSet
from taiga.projects.tasks.api import TaskViewSet, TaskAttachmentViewSet
from taiga.projects.issues.api import IssueViewSet, IssueAttachmentViewSet
#from taiga.projects.questions.api import QuestionViewSet, QuestionAttachmentViewSet
#from taiga.projects.documents.api import DocumentViewSet, DocumentAttachmentViewSet
from taiga.projects.wiki.api import WikiViewSet, WikiAttachmentViewSet
router = routers.DefaultRouter(trailing_slash=False)
# Users & Auth
from taiga.base.users.api import UsersViewSet
from taiga.base.users.api import PermissionsViewSet
from taiga.base.auth.api import AuthViewSet
# taiga.users
router.register(r"users", UsersViewSet, base_name="users")
router.register(r"permissions", PermissionsViewSet, base_name="permissions")
router.register(r"auth", AuthViewSet, base_name="auth")
# Resolver & Search
from taiga.base.resolver.api import ResolverViewSet
from taiga.base.searches.api import SearchViewSet
router.register(r"resolver", ResolverViewSet, base_name="resolver")
router.register(r"search", SearchViewSet, base_name="search")
# Domains
from taiga.domains.api import DomainViewSet
from taiga.domains.api import DomainMembersViewSet
from taiga.projects.api import ProjectAdminViewSet
router.register(r"sites", DomainViewSet, base_name="sites")
router.register(r"site-members", DomainMembersViewSet, base_name="site-members")
router.register(r"site-projects", ProjectAdminViewSet, base_name="site-projects")

27
taiga/users/services.py Normal file
View File

@ -0,0 +1,27 @@
"""
This model contains a domain logic for users application.
"""
from django.db.models.loading import get_model
from taiga.base import exceptions as exc
def get_and_validate_user(*, username:str, password:str) -> bool:
"""
Check if user with username exists and specified
password matchs well with existing user password.
if user is valid, user is returned else, corresponding
exception is raised.
"""
user_model = get_model("users", "User")
qs = user_model.objects.filter(username=username)
if len(qs) == 0:
raise exc.WrongArguments("Username or password does not matches user.")
user = qs[0]
if not user.check_password(password):
raise exc.WrongArguments("Username or password does not matches user.")
return user