Adding import api

remotes/origin/enhancement/email-actions
Jesús Espino 2014-09-01 16:20:02 +02:00
parent 2b36375b6b
commit 6d78c79018
22 changed files with 1474 additions and 255 deletions

View File

@ -290,6 +290,7 @@ REST_FRAMEWORK = {
"PAGINATE_BY": 30,
"PAGINATE_BY_PARAM": "page_size",
"MAX_PAGINATE_BY": 1000,
"DATETIME_FORMAT": "%Y-%m-%dT%H:%M:%S%z"
}
DEFAULT_PROJECT_TEMPLATE = "scrum"

View File

@ -17,8 +17,10 @@
# This code is partially taken from django-rest-framework:
# Copyright (c) 2011-2014, Tom Christie
import json
from django.core.exceptions import PermissionDenied
from django.http import Http404
from django.http import Http404, HttpResponse
from django.utils.datastructures import SortedDict
from django.views.decorators.csrf import csrf_exempt
@ -31,6 +33,9 @@ from rest_framework.utils import formatting
from taiga.base.utils.iterators import as_tuple
from django.conf import settings
from django.views.defaults import server_error
def get_view_name(view_cls, suffix=None):
"""
@ -436,3 +441,10 @@ class APIView(View):
ret['renders'] = [renderer.media_type for renderer in self.renderer_classes]
ret['parses'] = [parser.media_type for parser in self.parser_classes]
return ret
def api_server_error(request, *args, **kwargs):
if settings.DEBUG == False and request.META['CONTENT_TYPE'] == "application/json":
return HttpResponse(json.dumps({"error": "Server application error"}),
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return server_error(request, *args, **kwargs)

View File

@ -0,0 +1,22 @@
from contextlib import contextmanager
@contextmanager
def without_signals(*disablers):
for disabler in disablers:
if not (isinstance(disabler, list) or isinstance(disabler, tuple)) or len(disabler) == 0:
raise ValueError("The parameters must be lists of at least one parameter (the signal)")
signal, *ids = disabler
signal.backup_receivers = signal.receivers
signal.receivers = list(filter(lambda x: x[0][0] not in ids, signal.receivers))
try:
yield
except Exception as e:
raise e
finally:
for disabler in disablers:
signal, *ids = disabler
signal.receivers = signal.backup_receivers

View File

@ -1,65 +1,194 @@
from rest_framework.exceptions import APIException
from rest_framework.response import Response
from rest_framework import status
from django.utils.decorators import method_decorator
from django.db.transaction import atomic
from django.db.models import signals
from taiga.base.api.mixins import CreateModelMixin
from taiga.base.api.viewsets import GenericViewSet
from taiga.base.decorators import detail_route
from taiga.projects.models import Project
from taiga.base.utils.signals import without_signals
from taiga.projects.models import Project, Membership
from . import serializers
from . import service
from . import permissions
from django.db.models import signals
def __disconnect_signals():
signals.pre_save.receivers = []
signals.post_save.receivers = []
class Http400(APIException):
status_code = 400
class ProjectImporterViewSet(CreateModelMixin, GenericViewSet):
model = Project
permission_classes = (permissions.ImportPermission, )
@method_decorator(atomic)
def create(self, request, *args, **kwargs):
self.check_permissions(request, 'import_project', None)
data = request.DATA.copy()
data['owner'] = data.get('owner', request.user.email)
with without_signals((signals.post_save, "project_post_save")):
project_serialized = service.store_project(data)
if project_serialized:
service.store_choices(project_serialized.object, data, "points", project_serialized.object.points, serializers.PointsExportSerializer, "default_points")
service.store_choices(project_serialized.object, data, "issue_types", project_serialized.object.issue_types, serializers.IssueTypeExportSerializer, "default_issue_type")
service.store_choices(project_serialized.object, data, "issue_statuses", project_serialized.object.issue_statuses, serializers.IssueStatusExportSerializer, "default_issue_status")
service.store_choices(project_serialized.object, data, "us_statuses", project_serialized.object.us_statuses, serializers.UserStoryStatusExportSerializer, "default_us_status")
service.store_choices(project_serialized.object, data, "task_statuses", project_serialized.object.task_statuses, serializers.TaskStatusExportSerializer, "default_task_status")
service.store_choices(project_serialized.object, data, "priorities", project_serialized.object.priorities, serializers.PriorityExportSerializer, "default_priority")
service.store_choices(project_serialized.object, data, "severities", project_serialized.object.severities, serializers.SeverityExportSerializer, "default_severity")
if project_serialized is None:
raise Http400(service.get_errors())
if "points" in data:
service.store_choices(project_serialized.object, data,
"points", serializers.PointsExportSerializer)
if "issue_types" in data:
service.store_choices(project_serialized.object, data,
"issue_types",
serializers.IssueTypeExportSerializer)
if "issue_statuses" in data:
service.store_choices(project_serialized.object, data,
"issue_statuses",
serializers.IssueStatusExportSerializer,)
if "us_statuses" in data:
service.store_choices(project_serialized.object, data,
"us_statuses",
serializers.UserStoryStatusExportSerializer,)
if "task_statuses" in data:
service.store_choices(project_serialized.object, data,
"task_statuses",
serializers.TaskStatusExportSerializer)
if "priorities" in data:
service.store_choices(project_serialized.object, data,
"priorities",
serializers.PriorityExportSerializer)
if "severities" in data:
service.store_choices(project_serialized.object, data,
"severities",
serializers.SeverityExportSerializer)
if ("points" in data or "issues_types" in data or
"issues_statuses" in data or "us_statuses" in data or
"task_statuses" in data or "priorities" in data or
"severities" in data):
service.store_default_choices(project_serialized.object, data)
if "roles" in data:
service.store_roles(project_serialized.object, data)
if "memberships" in data:
service.store_memberships(project_serialized.object, data)
headers = self.get_success_headers(project_serialized.data)
return Response(project_serialized.data, status=status.HTTP_201_CREATED, headers=headers)
return Response(service.get_errors(), status=status.HTTP_400_BAD_REQUEST)
if project_serialized.object.memberships.filter(user=project_serialized.object.owner).count() == 0:
if project_serialized.object.roles.all().count() > 0:
Membership.objects.create(
project=project_serialized.object,
email=project_serialized.object.owner.email,
user=project_serialized.object.owner,
role=project_serialized.object.roles.all().first(),
is_owner=True
)
errors = service.get_errors()
if errors:
raise Http400(errors)
response_data = project_serialized.data
response_data['id'] = project_serialized.object.id
headers = self.get_success_headers(response_data)
return Response(response_data, status=status.HTTP_201_CREATED, headers=headers)
@detail_route(methods=['post'])
@method_decorator(atomic)
def issue(self, request, *args, **kwargs):
self.check_permissions(request, 'import_item', serializer.object)
project = self.get_object_or_none()
self.check_permissions(request, 'import_item', project)
with without_signals((signals.post_save, "events_dispatcher_on_change", "refissue")):
issue = service.store_issue(project, request.DATA)
errors = service.get_errors()
if errors:
raise Http400(errors)
headers = self.get_success_headers(issue.data)
return Response(issue.data, status=status.HTTP_201_CREATED, headers=headers)
@detail_route(methods=['post'])
@method_decorator(atomic)
def task(self, request, *args, **kwargs):
self.check_permissions(request, 'import_item', serializer.object)
project = self.get_object_or_none()
self.check_permissions(request, 'import_item', project)
with without_signals((signals.post_save, "events_dispatcher_on_change", "reftask")):
task = service.store_task(project, request.DATA)
errors = service.get_errors()
if errors:
raise Http400(errors)
headers = self.get_success_headers(task.data)
return Response(task.data, status=status.HTTP_201_CREATED, headers=headers)
@detail_route(methods=['post'])
@method_decorator(atomic)
def us(self, request, *args, **kwargs):
self.check_permissions(request, 'import_item', serializer.object)
project = self.get_object_or_none()
self.check_permissions(request, 'import_item', project)
with without_signals((signals.post_save, "events_dispatcher_on_change", "refus")):
us = service.store_user_story(project, request.DATA)
errors = service.get_errors()
if errors:
raise Http400(errors)
headers = self.get_success_headers(us.data)
return Response(us.data, status=status.HTTP_201_CREATED, headers=headers)
@detail_route(methods=['post'])
@method_decorator(atomic)
def milestone(self, request, *args, **kwargs):
project = self.get_object_or_none()
self.check_permissions(request, 'import_item', project)
with without_signals((signals.post_save, "events_dispatcher_on_change")):
milestone = service.store_milestone(project, request.DATA)
errors = service.get_errors()
if errors:
raise Http400(errors)
headers = self.get_success_headers(milestone.data)
return Response(milestone.data, status=status.HTTP_201_CREATED, headers=headers)
@detail_route(methods=['post'])
@method_decorator(atomic)
def wiki_page(self, request, *args, **kwargs):
self.check_permissions(request, 'import_item', serializer.object)
project = self.get_object_or_none()
self.check_permissions(request, 'import_item', project)
with without_signals((signals.post_save, "events_dispatcher_on_change")):
wiki_page = service.store_wiki_page(project, request.DATA)
errors = service.get_errors()
if errors:
raise Http400(errors)
headers = self.get_success_headers(wiki_page.data)
return Response(wiki_page.data, status=status.HTTP_201_CREATED, headers=headers)
@detail_route(methods=['post'])
@method_decorator(atomic)
def wiki_link(self, request, *args, **kwargs):
self.check_permissions(request, 'import_item', serializer.object)
project = self.get_object_or_none()
self.check_permissions(request, 'import_item', project)
with without_signals((signals.post_save, "events_dispatcher_on_change")):
wiki_link = service.store_wiki_link(project, request.DATA)
errors = service.get_errors()
if errors:
raise Http400(errors)
headers = self.get_success_headers(wiki_link.data)
return Response(wiki_link.data, status=status.HTTP_201_CREATED, headers=headers)

View File

@ -0,0 +1,158 @@
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
# Copyright (C) 2014 Jesús Espino <jespinog@gmail.com>
# Copyright (C) 2014 David Barragán <bameda@dbarragan.com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.db.models import signals
from taiga.base.utils.signals import without_signals
from . import serializers
from . import service
class TaigaImportError(Exception):
def __init__(self, message):
self.message = message
def store_milestones(project, data):
results = []
for milestone_data in data.get('milestones', []):
milestone = service.store_milestone(project, milestone_data)
results.append(milestone)
return results
def store_tasks(project, data):
results = []
for task in data.get('tasks', []):
task = service.store_task(project, task)
results.append(task)
return results
def store_wiki_pages(project, data):
results = []
for wiki_page in data.get('wiki_pages', []):
results.append(service.store_wiki_page(project, wiki_page))
return results
def store_wiki_links(project, data):
results = []
for wiki_link in data.get('wiki_links', []):
results.append(service.store_wiki_link(project, wiki_link))
return results
def store_user_stories(project, data):
results = []
for userstory in data.get('user_stories', []):
us = service.store_user_story(project, userstory)
results.append(us)
return results
def store_issues(project, data):
issues = []
for issue in data.get('issues', []):
issues.append(service.store_issue(project, issue))
return issues
def dict_to_project(data, owner=None):
if owner:
data['owner'] = owner
with without_signals([signals.post_save, "project_post_save"]):
project_serialized = service.store_project(data)
if not project_serialized:
raise TaigaImportError('error importing project')
proj = project_serialized.object
service.store_choices(proj, data, "points", serializers.PointsExportSerializer)
service.store_choices(proj, data, "issue_types", serializers.IssueTypeExportSerializer)
service.store_choices(proj, data, "issue_statuses", serializers.IssueStatusExportSerializer)
service.store_choices(proj, data, "us_statuses", serializers.UserStoryStatusExportSerializer)
service.store_choices(proj, data, "task_statuses", serializers.TaskStatusExportSerializer)
service.store_choices(proj, data, "priorities", serializers.PriorityExportSerializer)
service.store_choices(proj, data, "severities", serializers.SeverityExportSerializer)
if service.get_errors(clear=False):
raise TaigaImportError('error importing choices')
service.store_default_choices(proj, data)
if service.get_errors(clear=False):
raise TaigaImportError('error importing default choices')
with without_signals([signals.post_save, "role_post_save"]):
service.store_roles(proj, data)
if service.get_errors(clear=False):
raise TaigaImportError('error importing roles')
service.store_memberships(proj, data)
if proj.memberships.filter(user=proj.owner).count() == 0:
if proj.roles.all().count() > 0:
Membership.objects.create(
project=proj,
email=proj.owner.email,
user=proj.owner,
role=proj.roles.all().first(),
is_owner=True
)
if service.get_errors(clear=False):
raise TaigaImportError('error importing memberships')
with without_signals((signals.post_save, "events_dispatcher_on_change")):
store_milestones(proj, data)
if service.get_errors(clear=False):
raise TaigaImportError('error importing milestones')
with without_signals((signals.post_save, "events_dispatcher_on_change")):
store_wiki_pages(proj, data)
if service.get_errors(clear=False):
raise TaigaImportError('error importing wiki pages')
with without_signals((signals.post_save, "events_dispatcher_on_change")):
store_wiki_links(proj, data)
if service.get_errors(clear=False):
raise TaigaImportError('error importing wiki links')
with without_signals((signals.post_save, "events_dispatcher_on_change", "user_story_create_role_points_handler", "refus")):
store_user_stories(proj, data)
if service.get_errors(clear=False):
raise TaigaImportError('error importing user stories')
with without_signals((signals.post_save, "events_dispatcher_on_change", "refissue")):
store_issues(proj, data)
if service.get_errors(clear=False):
raise TaigaImportError('error importing issues')
with without_signals((signals.post_save, "events_dispatcher_on_change", "reftask")):
store_tasks(proj, data)
if service.get_errors(clear=False):
raise TaigaImportError('error importing issues')

View File

@ -1,18 +1,49 @@
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
from django.db.models import signals
from optparse import make_option
import json
import pprint
from taiga.projects.models import Project
from taiga.export_import.renderers import ExportRenderer
from taiga.export_import.service import dict_to_project
from taiga.export_import.dump_service import dict_to_project, TaigaImportError
from taiga.export_import.service import get_errors
class Command(BaseCommand):
args = '<dump_file> <owner-email>'
help = 'Export a project to json'
renderer_context = {"indent": 4}
renderer = ExportRenderer()
option_list = BaseCommand.option_list + (
make_option('--overwrite',
action='store_true',
dest='overwrite',
default=False,
help='Delete project if exists'),
)
def handle(self, *args, **options):
data = json.loads(open(args[0], 'r').read())
try:
with transaction.atomic():
if options["overwrite"]:
receivers_back = signals.post_delete.receivers
signals.post_delete.receivers = []
try:
proj = Project.objects.get(slug=data.get("slug", "not a slug"))
proj.tasks.all().delete()
proj.user_stories.all().delete()
proj.issues.all().delete()
proj.memberships.all().delete()
proj.roles.all().delete()
proj.delete()
except Project.DoesNotExist:
pass
signals.post_delete.receivers = receivers_back
dict_to_project(data, args[1])
except TaigaImportError as e:
print("ERROR:", end=" ")
print(e.message)
print(get_errors())

View File

@ -14,31 +14,30 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.contrib.contenttypes.models import ContentType
from django.core.files.base import ContentFile
from rest_framework import serializers
import json
import base64
import os
import io
from collections import OrderedDict
from django.contrib.contenttypes.models import ContentType
from django.core.files.base import ContentFile
from django.core.exceptions import ObjectDoesNotExist, ValidationError
from rest_framework import serializers
from taiga.projects import models as projects_models
from taiga.projects.userstories import models as userstories_models
from taiga.projects.tasks import models as tasks_models
from taiga.projects.issues import models as issues_models
from taiga.projects.milestones import models as milestones_models
from taiga.projects.wiki import models as wiki_models
from taiga.projects.votes import models as votes_models
from taiga.projects.notifications import models as notifications_models
from taiga.projects.history import models as history_models
from taiga.projects.attachments import models as attachments_models
from taiga.users import models as users_models
from taiga.projects.votes import services as votes_service
from taiga.projects.history import services as history_service
from taiga.base.serializers import JsonField, PgArrayField
from taiga import mdrender
class AttachedFileField(serializers.WritableField):
read_only = False
@ -72,6 +71,15 @@ class UserRelatedField(serializers.RelatedField):
except users_models.User.DoesNotExist:
return None
class CommentField(serializers.WritableField):
read_only = False
def field_from_native(self, data, files, field_name, into):
super().field_from_native(data, files, field_name, into)
into["comment_html"] = mdrender.render(self.context['project'], data.get("comment", ""))
class ProjectRelatedField(serializers.RelatedField):
read_only = False
@ -88,8 +96,99 @@ class ProjectRelatedField(serializers.RelatedField):
try:
kwargs = {self.slug_field: data, "project": self.context['project']}
return self.queryset.get(**kwargs)
except self.parent.opts.model.DoesNotExist:
return None
except ObjectDoesNotExist:
raise ValidationError("{}=\"{}\" not found in this project".format(self.slug_field, data))
class HistoryUserField(JsonField):
def to_native(self, obj):
if obj is None:
return []
try:
user = users_models.User.objects.get(pk=obj['pk'])
except users_models.User.DoesNotExist:
user = None
return (UserRelatedField().to_native(user), obj['name'])
def from_native(self, data):
if data is None:
return []
if len(data) < 2:
return []
return {"pk": UserRelatedField().from_native(data[0]).pk, "name": data[1]}
class HistoryValuesField(JsonField):
def to_native(self, obj):
if obj is None:
return []
if "users" in obj:
obj['users'] = map(HistoryUserField().to_native, obj['users'])
return obj
def from_native(self, data):
if data is None:
return []
if "users" in data:
data['users'] = map(HistoryUserField().from_native, data['users'])
return data
class HistoryDiffField(JsonField):
def to_native(self, obj):
if obj is None:
return []
if "assigned_to" in obj:
obj['assigned_to'] = HistoryUserField().to_native(obj['assigned_to'])
return obj
def from_native(self, data):
if data is None:
return []
if "assigned_to" in data:
data['assigned_to'] = HistoryUserField().from_native(data['assigned_to'])
return data
class HistoryExportSerializer(serializers.ModelSerializer):
user = HistoryUserField()
diff = HistoryDiffField(required=False)
snapshot = JsonField(required=False)
values = HistoryValuesField(required=False)
comment = CommentField(required=False)
class Meta:
model = history_models.HistoryEntry
exclude = ("id", "comment_html")
class HistoryExportSerializerMixin(serializers.ModelSerializer):
history = serializers.SerializerMethodField("get_history")
def get_history(self, obj):
history_qs = history_service.get_history_queryset_by_model_instance(obj)
return HistoryExportSerializer(history_qs, many=True).data
class AttachmentExportSerializer(serializers.ModelSerializer):
owner = UserRelatedField(required=False)
attached_file = AttachedFileField()
modified_date = serializers.DateTimeField(required=False)
class Meta:
model = attachments_models.Attachment
exclude = ('id', 'content_type', 'object_id', 'project')
class AttachmentExportSerializerMixin(serializers.ModelSerializer):
attachments = serializers.SerializerMethodField("get_attachments")
def get_attachments(self, obj):
content_type = ContentType.objects.get_for_model(obj.__class__)
attachments_qs = attachments_models.Attachment.objects.filter(object_id=obj.pk, content_type=content_type)
return AttachmentExportSerializer(attachments_qs, many=True).data
class PointsExportSerializer(serializers.ModelSerializer):
@ -97,36 +196,43 @@ class PointsExportSerializer(serializers.ModelSerializer):
model = projects_models.Points
exclude = ('id', 'project')
class UserStoryStatusExportSerializer(serializers.ModelSerializer):
class Meta:
model = projects_models.UserStoryStatus
exclude = ('id', 'project')
class TaskStatusExportSerializer(serializers.ModelSerializer):
class Meta:
model = projects_models.TaskStatus
exclude = ('id', 'project')
class IssueStatusExportSerializer(serializers.ModelSerializer):
class Meta:
model = projects_models.IssueStatus
exclude = ('id', 'project')
class PriorityExportSerializer(serializers.ModelSerializer):
class Meta:
model = projects_models.Priority
exclude = ('id', 'project')
class SeverityExportSerializer(serializers.ModelSerializer):
class Meta:
model = projects_models.Severity
exclude = ('id', 'project')
class IssueTypeExportSerializer(serializers.ModelSerializer):
class Meta:
model = projects_models.IssueType
exclude = ('id', 'project')
class RoleExportSerializer(serializers.ModelSerializer):
permissions = PgArrayField(required=False)
@ -134,9 +240,10 @@ class RoleExportSerializer(serializers.ModelSerializer):
model = users_models.Role
exclude = ('id', 'project')
class MembershipExportSerializer(serializers.ModelSerializer):
user = UserRelatedField(required=False)
role = ProjectRelatedField(slug_field="slug")
role = ProjectRelatedField(slug_field="name")
class Meta:
model = projects_models.Membership
@ -147,120 +254,62 @@ class MembershipExportSerializer(serializers.ModelSerializer):
class RolePointsExportSerializer(serializers.ModelSerializer):
role = ProjectRelatedField(slug_field="slug")
role = ProjectRelatedField(slug_field="name")
points = ProjectRelatedField(slug_field="name")
class Meta:
model = userstories_models.RolePoints
exclude = ('id', 'user_story')
class MilestoneExportSerializer(serializers.ModelSerializer):
owner = UserRelatedField(required=False)
watchers = UserRelatedField(many=True, required=False)
tasks_without_us = serializers.SerializerMethodField("get_tasks_without_us")
modified_date = serializers.DateTimeField(required=False)
class Meta:
model = milestones_models.Milestone
exclude = ('id', 'project')
def get_tasks_without_us(self, obj):
queryset = tasks_models.Task.objects.filter(milestone=obj, user_story__isnull=True)
return TaskExportSerializer(queryset.order_by('ref'), many=True).data
class AttachmentExportSerializer(serializers.ModelSerializer):
owner = UserRelatedField()
attached_file = AttachedFileField()
class Meta:
model = attachments_models.Attachment
exclude = ('id', 'content_type', 'object_id', 'project')
class AttachmentExportSerializerMixin(serializers.ModelSerializer):
attachments = serializers.SerializerMethodField("get_attachments")
def get_attachments(self, obj):
content_type = ContentType.objects.get_for_model(obj.__class__)
attachments_qs = attachments_models.Attachment.objects.filter(object_id=obj.pk, content_type=content_type)
return AttachmentExportSerializer(attachments_qs, many=True).data
class TaskExportSerializer(AttachmentExportSerializerMixin, serializers.ModelSerializer):
class TaskExportSerializer(HistoryExportSerializerMixin, AttachmentExportSerializerMixin, serializers.ModelSerializer):
owner = UserRelatedField(required=False)
status = ProjectRelatedField(slug_field="name", required=False)
milestone = ProjectRelatedField(slug_field="slug", required=False)
status = ProjectRelatedField(slug_field="name")
user_story = ProjectRelatedField(slug_field="ref", required=False)
milestone = ProjectRelatedField(slug_field="name", required=False)
assigned_to = UserRelatedField(required=False)
watchers = UserRelatedField(many=True, required=False)
modified_date = serializers.DateTimeField(required=False)
class Meta:
model = tasks_models.Task
exclude = ('id', 'project', 'user_story')
exclude = ('id', 'project')
class UserStoryExportSerializer(AttachmentExportSerializerMixin, serializers.ModelSerializer):
class UserStoryExportSerializer(HistoryExportSerializerMixin, AttachmentExportSerializerMixin, serializers.ModelSerializer):
role_points = RolePointsExportSerializer(many=True, required=False)
generated_from_issue = ProjectRelatedField(slug_field="ref", required=False)
owner = UserRelatedField(required=False)
status = ProjectRelatedField(slug_field="name", required=False)
tasks = TaskExportSerializer(many=True, required=False)
milestone = ProjectRelatedField(slug_field="slug", required=False)
status = ProjectRelatedField(slug_field="name")
milestone = ProjectRelatedField(slug_field="name", required=False)
watchers = UserRelatedField(many=True, required=False)
modified_date = serializers.DateTimeField(required=False)
class Meta:
model = userstories_models.UserStory
exclude = ('id', 'project', 'points')
def _convert_user(user_pk):
try:
user = users_models.User.objects.get(pk=user_pk)
except users_models.User.DoesNotExist:
return "#imported#{}".format(user_pk)
return user.email
def _convert_user_tuple(user_tuple):
return (_convert_user(user_tuple[0]), user_tuple[1])
class HistoryExportSerializer(serializers.ModelSerializer):
user = serializers.SerializerMethodField("get_user")
diff = serializers.SerializerMethodField("get_diff")
snapshot = JsonField()
values = serializers.SerializerMethodField("get_values")
def get_user(self, obj):
return (_convert_user(obj.user['pk']), obj.user['name'])
def get_values(self, obj):
for key, value in obj.values.items():
if key == "users":
obj.values["users"] = dict(map(_convert_user_tuple, value.items()))
return obj.values
def get_diff(self, obj):
for key, value in obj.diff.items():
if key == "assigned_to":
obj.diff["assigned_to"] = map(_convert_user, value)
return obj.diff
class Meta:
model = history_models.HistoryEntry
class HistoryExportSerializerMixin(serializers.ModelSerializer):
history = serializers.SerializerMethodField("get_history")
def get_history(self, obj):
history_qs = history_service.get_history_queryset_by_model_instance(obj)
return HistoryExportSerializer(history_qs, many=True).data
exclude = ('id', 'project', 'points', 'tasks')
class IssueExportSerializer(HistoryExportSerializerMixin, AttachmentExportSerializerMixin, serializers.ModelSerializer):
owner = UserRelatedField(required=False)
status = ProjectRelatedField(slug_field="name", required=False)
status = ProjectRelatedField(slug_field="name")
assigned_to = UserRelatedField(required=False)
priority = ProjectRelatedField(slug_field="name", required=False)
severity = ProjectRelatedField(slug_field="name", required=False)
type = ProjectRelatedField(slug_field="name", required=False)
milestone = ProjectRelatedField(slug_field="slug", required=False)
priority = ProjectRelatedField(slug_field="name")
severity = ProjectRelatedField(slug_field="name")
type = ProjectRelatedField(slug_field="name")
milestone = ProjectRelatedField(slug_field="name", required=False)
watchers = UserRelatedField(many=True, required=False)
votes = serializers.SerializerMethodField("get_votes")
modified_date = serializers.DateTimeField(required=False)
def get_votes(self, obj):
return [x.email for x in votes_service.get_voters(obj)]
@ -269,20 +318,24 @@ class IssueExportSerializer(HistoryExportSerializerMixin, AttachmentExportSerial
model = issues_models.Issue
exclude = ('id', 'project')
class WikiPageExportSerializer(AttachmentExportSerializerMixin, serializers.ModelSerializer):
class WikiPageExportSerializer(HistoryExportSerializerMixin, AttachmentExportSerializerMixin, serializers.ModelSerializer):
owner = UserRelatedField(required=False)
last_modifier = UserRelatedField(required=False)
watchers = UserRelatedField(many=True, required=False)
modified_date = serializers.DateTimeField(required=False)
class Meta:
model = wiki_models.WikiPage
exclude = ('id', 'project')
class WikiLinkExportSerializer(serializers.ModelSerializer):
class Meta:
model = wiki_models.WikiLink
exclude = ('id', 'project')
class ProjectExportSerializer(serializers.ModelSerializer):
owner = UserRelatedField(required=False)
default_points = serializers.SlugRelatedField(slug_field="name", required=False)
@ -305,6 +358,7 @@ class ProjectExportSerializer(serializers.ModelSerializer):
wiki_pages = WikiPageExportSerializer(many=True, required=False)
wiki_links = WikiLinkExportSerializer(many=True, required=False)
user_stories = UserStoryExportSerializer(many=True, required=False)
tasks = TaskExportSerializer(many=True, required=False)
issues = IssueExportSerializer(many=True, required=False)
tags_colors = JsonField(required=False)
anon_permissions = PgArrayField(required=False)

View File

@ -14,28 +14,37 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.db.models import signals
from django.db import transaction
import uuid
from django.contrib.contenttypes.models import ContentType
from taiga.projects.models import Project
from taiga.projects.history.services import make_key_from_model_object
from taiga.projects.references import sequences as seq
from taiga.projects.references import models as refs
from taiga.projects.services import find_invited_user
from . import serializers
_errors_log = []
_errors_log = {}
def get_errors():
def get_errors(clear=True):
_errors = _errors_log.copy()
if clear:
_errors_log.clear()
return _errors
def add_errors(errors):
_errors_log.append(errors)
def add_errors(section, errors):
if section in _errors_log:
_errors_log[section].append(errors)
else:
_errors_log[section] = [errors]
def project_to_dict(project):
return serializers.ProjectExportSerializer(project).data
@transaction.atomic
def store_project(data):
project_data = {}
for key, value in data.items():
@ -45,7 +54,7 @@ def store_project(data):
"default_issue_type", "memberships", "points", "us_statuses",
"task_statuses", "issue_statuses", "priorities", "severities",
"issue_types", "roles", "milestones", "wiki_pages",
"wiki_links", "notify_policies", "user_stories", "issues"
"wiki_links", "notify_policies", "user_stories", "issues", "tasks",
]
if key not in excluded_fields:
project_data[key] = value
@ -55,174 +64,288 @@ def store_project(data):
serialized.object._importing = True
serialized.object.save()
return serialized
else:
add_errors(serialized.errors)
add_errors("project", serialized.errors)
return None
@transaction.atomic
def store_choices(project, data, field, relation, serializer, default_field):
relation.all().delete()
for point in data[field]:
serialized = serializer(data=point)
serialized.is_valid()
def store_choice(project, data, field, serializer):
serialized = serializer(data=data)
if serialized.is_valid():
serialized.object.project = project
serialized.object._importing = True
serialized.save()
return serialized.object
add_errors(field, serialized.errors)
return None
def store_choices(project, data, field, serializer):
result = []
for choice_data in data[field]:
result.append(store_choice(project, choice_data, field, serializer))
return result
def store_role(project, role):
serialized = serializers.RoleExportSerializer(data=role)
if serialized.is_valid():
serialized.object.project = project
serialized.object._importing = True
serialized.save()
return serialized
add_errors("roles", serialized.errors)
return None
def store_roles(project, data):
results = []
for role in data['roles']:
results.append(store_role(project, role))
return results
@transaction.atomic
def store_default_choices(project, data):
project.default_points = project.points.all().get(name=data['default_points'])
project.default_issue_type = project.issue_types.get(name=data['default_issue_type'])
project.default_issue_status = project.issue_statuses.get(name=data['default_issue_status'])
project.default_us_status = project.us_statuses.get(name=data['default_us_status'])
project.default_task_status = project.task_statuses.get(name=data['default_task_status'])
project.default_priority = project.priorities.get(name=data['default_priority'])
project.default_severity = project.severities.get(name=data['default_severity'])
def helper(project, field, related, data):
if field in data:
value = related.all().get(name=data[field])
else:
value = related.all().first()
setattr(project, field, value)
helper(project, "default_points", project.points, data)
helper(project, "default_issue_type", project.issue_types, data)
helper(project, "default_issue_status", project.issue_statuses, data)
helper(project, "default_us_status", project.us_statuses, data)
helper(project, "default_task_status", project.task_statuses, data)
helper(project, "default_priority", project.priorities, data)
helper(project, "default_severity", project.severities, data)
project._importing = True
project.save()
@transaction.atomic
def store_roles(project, data):
project.roles.all().delete()
for role in data['roles']:
serialized = serializers.RoleExportSerializer(data=role)
serialized.is_valid()
serialized.object.project = project
serialized.object._importing = True
serialized.save()
@transaction.atomic
def store_memberships(project, data):
for membership in data['memberships']:
def store_membership(project, membership):
serialized = serializers.MembershipExportSerializer(data=membership, context={"project": project})
serialized.is_valid()
if serialized.is_valid():
serialized.object.project = project
serialized.object._importing = True
if not serialized.object.token:
serialized.object.token = str(uuid.uuid1())
serialized.object.user = find_invited_user(serialized.object, default=serialized.object.user)
serialized.save()
return serialized
add_errors("memberships", serialized.errors)
return None
def store_memberships(project, data):
results = []
for membership in data.get('memberships', []):
results.append(store_membership(project, membership))
return results
def store_task(project, task):
if 'status' not in task and project.default_task_status:
task['status'] = project.default_task_status.name
@transaction.atomic
def store_task(project, us, task):
serialized = serializers.TaskExportSerializer(data=task, context={"project": project})
serialized.is_valid()
serialized.object.user_story = us
if serialized.is_valid():
serialized.object.project = project
if serialized.object.owner is None:
serialized.object.owner = serialized.object.project.owner
serialized.object._importing = True
serialized.object._not_notify = True
serialized.save()
for task_attachment in task['attachments']:
if serialized.object.ref:
sequence_name = refs.make_sequence_name(project)
if not seq.exists(sequence_name):
seq.create(sequence_name)
seq.set_max(sequence_name, serialized.object.ref)
else:
serialized.object.ref, _ = refs.make_reference(serialized.object, project)
serialized.object.save()
for task_attachment in task.get('attachments', []):
store_attachment(project, serialized.object, task_attachment)
@transaction.atomic
def store_milestones(project, data):
for milestone in data['milestones']:
for history in task.get('history', []):
store_history(project, serialized.object, history)
return serialized
add_errors("tasks", serialized.errors)
return None
def store_milestone(project, milestone):
serialized = serializers.MilestoneExportSerializer(data=milestone)
serialized.is_valid()
if serialized.is_valid():
serialized.object.project = project
serialized.object._importing = True
serialized.save()
for task_without_us in milestone['tasks_without_us']:
store_task(project, None, task_without_us)
for task_without_us in milestone.get('tasks_without_us', []):
task_without_us['user_story'] = None
store_task(project, task_without_us)
return serialized
add_errors("milestones", serialized.errors)
return None
def store_attachment(project, obj, attachment):
serialized = serializers.AttachmentExportSerializer(data=attachment)
serialized.is_valid()
if serialized.is_valid():
serialized.object.content_type = ContentType.objects.get_for_model(obj.__class__)
serialized.object.object_id = obj.id
serialized.object.project = project
if serialized.object.owner is None:
serialized.object.owner = serialized.object.project.owner
serialized.object._importing = True
serialized.save()
return serialized
add_errors("attachments", serialized.errors)
return serialized
@transaction.atomic
def store_wiki_pages(project, data):
for wiki_page in data['wiki_pages']:
def store_history(project, obj, history):
serialized = serializers.HistoryExportSerializer(data=history, context={"project": project})
if serialized.is_valid():
serialized.object.key = make_key_from_model_object(obj)
if serialized.object.diff is None:
serialized.object.diff = []
serialized.object._importing = True
serialized.save()
return serialized
add_errors("history", serialized.errors)
return serialized
def store_wiki_page(project, wiki_page):
serialized = serializers.WikiPageExportSerializer(data=wiki_page)
serialized.is_valid()
if serialized.is_valid():
serialized.object.project = project
if serialized.object.owner is None:
serialized.object.owner = serialized.object.project.owner
serialized.object._importing = True
serialized.object._not_notify = True
serialized.save()
for attachment in wiki_page['attachments']:
for attachment in wiki_page.get('attachments', []):
store_attachment(project, serialized.object, attachment)
@transaction.atomic
def store_wiki_links(project, data):
for wiki_link in data['wiki_links']:
for history in wiki_page.get('history', []):
store_history(project, serialized.object, history)
return serialized
add_errors("wiki_pages", serialized.errors)
return None
def store_wiki_link(project, wiki_link):
serialized = serializers.WikiLinkExportSerializer(data=wiki_link)
serialized.is_valid()
if serialized.is_valid():
serialized.object.project = project
serialized.object._importing = True
serialized.save()
return serialized
add_errors("wiki_links", serialized.errors)
return None
@transaction.atomic
def store_role_point(project, us, role_point):
serialized = serializers.RolePointsExportSerializer(data=role_point, context={"project": project} )
serialized.is_valid()
serialized = serializers.RolePointsExportSerializer(data=role_point, context={"project": project})
if serialized.is_valid():
serialized.object.user_story = us
serialized.save()
return serialized.object
add_errors("role_points", serialized.errors)
return None
def store_user_story(project, userstory):
if 'status' not in userstory and project.default_us_status:
userstory['status'] = project.default_us_status.name
@transaction.atomic
def store_user_stories(project, data):
for userstory in data['user_stories']:
userstory_data = {}
for key, value in userstory.items():
excluded_fields = [
'tasks', 'role_points'
]
if key not in excluded_fields:
if key != 'role_points':
userstory_data[key] = value
serialized_us = serializers.UserStoryExportSerializer(data=userstory_data, context={"project": project})
serialized_us.is_valid()
if serialized_us.is_valid():
serialized_us.object.project = project
if serialized_us.object.owner is None:
serialized_us.object.owner = serialized_us.object.project.owner
serialized_us.object._importing = True
serialized_us.object._not_notify = True
serialized_us.save()
for task in userstory['tasks']:
store_task(project, serialized_us.object, task)
if serialized_us.object.ref:
sequence_name = refs.make_sequence_name(project)
if not seq.exists(sequence_name):
seq.create(sequence_name)
seq.set_max(sequence_name, serialized_us.object.ref)
else:
serialized_us.object.ref, _ = refs.make_reference(serialized_us.object, project)
serialized_us.object.save()
for us_attachment in userstory['attachments']:
for us_attachment in userstory.get('attachments', []):
store_attachment(project, serialized_us.object, us_attachment)
for role_point in userstory['role_points']:
for role_point in userstory.get('role_points', []):
store_role_point(project, serialized_us.object, role_point)
@transaction.atomic
def store_issues(project, data):
for issue in data['issues']:
serialized = serializers.IssueExportSerializer(data=issue, context={"project": project})
serialized.is_valid()
for history in userstory.get('history', []):
store_history(project, serialized_us.object, history)
return serialized_us
add_errors("user_stories", serialized_us.errors)
return None
def store_issue(project, data):
serialized = serializers.IssueExportSerializer(data=data, context={"project": project})
if 'type' not in data and project.default_issue_type:
data['type'] = project.default_issue_type.name
if 'status' not in data and project.default_issue_status:
data['status'] = project.default_issue_status.name
if 'priority' not in data and project.default_priority:
data['priority'] = project.default_priority.name
if 'severity' not in data and project.default_severity:
data['severity'] = project.default_severity.name
if serialized.is_valid():
serialized.object.project = project
if serialized.object.owner is None:
serialized.object.owner = serialized.object.project.owner
serialized.object._importing = True
serialized.object._not_notify = True
serialized.save()
for attachment in issue['attachments']:
if serialized.object.ref:
sequence_name = refs.make_sequence_name(project)
if not seq.exists(sequence_name):
seq.create(sequence_name)
seq.set_max(sequence_name, serialized.object.ref)
else:
serialized.object.ref, _ = refs.make_reference(serialized.object, project)
serialized.object.save()
for attachment in data.get('attachments', []):
store_attachment(project, serialized.object, attachment)
def dict_to_project(data, owner=None):
signals.pre_save.receivers = []
signals.post_save.receivers = []
signals.pre_delete.receivers = []
signals.post_delete.receivers = []
if owner:
data['owner'] = owner
project_serialized = store_project(data)
store_choices(project_serialized.object, data, "points", project_serialized.object.points, serializers.PointsExportSerializer, "default_points")
store_choices(project_serialized.object, data, "issue_types", project_serialized.object.issue_types, serializers.IssueTypeExportSerializer, "default_issue_type")
store_choices(project_serialized.object, data, "issue_statuses", project_serialized.object.issue_statuses, serializers.IssueStatusExportSerializer, "default_issue_status")
store_choices(project_serialized.object, data, "us_statuses", project_serialized.object.us_statuses, serializers.UserStoryStatusExportSerializer, "default_us_status")
store_choices(project_serialized.object, data, "task_statuses", project_serialized.object.task_statuses, serializers.TaskStatusExportSerializer, "default_task_status")
store_choices(project_serialized.object, data, "priorities", project_serialized.object.priorities, serializers.PriorityExportSerializer, "default_priority")
store_choices(project_serialized.object, data, "severities", project_serialized.object.severities, serializers.SeverityExportSerializer, "default_severity")
store_default_choices(project_serialized.object, data)
store_roles(project_serialized.object, data)
store_memberships(project_serialized.object, data)
store_milestones(project_serialized.object, data)
store_wiki_pages(project_serialized.object, data)
store_wiki_links(project_serialized.object, data)
store_user_stories(project_serialized.object, data)
store_issues(project_serialized.object, data)
for history in data.get('history', []):
store_history(project, serialized.object, history)
return serialized
add_errors("issues", serialized.errors)
return None

View File

@ -36,7 +36,7 @@ def get_attachment_file_path(instance, filename):
class Attachment(models.Model):
owner = models.ForeignKey(settings.AUTH_USER_MODEL, null=False, blank=False,
owner = models.ForeignKey(settings.AUTH_USER_MODEL, null=True, blank=False,
related_name="change_attachments",
verbose_name=_("owner"))
project = models.ForeignKey("projects.Project", null=False, blank=False,

View File

@ -76,6 +76,18 @@ class Issue(OCCModelMixin, WatchedModelMixin, BlockedMixin, TaggedMixin, models.
if not self._importing or not self.modified_date:
self.modified_date = timezone.now()
if not self.status:
self.status = self.project.default_issue_status
if not self.type:
self.type = self.project.default_issue_type
if not self.severity:
self.severity = self.project.default_severity
if not self.priority:
self.priority = self.project.default_priority
return super().save(*args, **kwargs)
def __str__(self):

View File

@ -764,6 +764,9 @@ def project_post_save(sender, instance, created, **kwargs):
if not created:
return
if instance._importing:
return
template = getattr(instance, "creation_template", None)
if template is None:
template = ProjectTemplate.objects.get(slug=settings.DEFAULT_PROJECT_TEMPLATE)

View File

@ -37,6 +37,8 @@ class WatchedResourceMixin(object):
after it on inheritance definition.
"""
_not_notify = False
def send_notifications(self, obj, history=None):
"""
Shortcut method for resources with special save
@ -50,6 +52,9 @@ class WatchedResourceMixin(object):
if not history:
return
if self._not_notify:
return
obj = self.get_object_for_snapshot(obj)
# Process that analizes the corresponding diff and

View File

@ -57,4 +57,9 @@ def next_value(seqname):
result = cursor.fetchone()
return result[0]
def set_max(seqname, new_value):
sql = "SELECT setval(%s, GREATEST(nextval(%s), %s));"
with closing(connection.cursor()) as cursor:
cursor.execute(sql, [seqname, seqname, new_value])
result = cursor.fetchone()
return result[0]

View File

@ -77,6 +77,9 @@ class Task(OCCModelMixin, WatchedModelMixin, BlockedMixin, TaggedMixin, models.M
if not self._importing or not self.modified_date:
self.modified_date = timezone.now()
if not self.status:
self.status = self.project.default_task_status
return super().save(*args, **kwargs)
def __str__(self):

View File

@ -112,6 +112,9 @@ class UserStory(OCCModelMixin, WatchedModelMixin, BlockedMixin, TaggedMixin, mod
if not self._importing or not self.modified_date:
self.modified_date = timezone.now()
if not self.status:
self.status = self.project.default_us_status
super().save(*args, **kwargs)
def __str__(self):

View File

@ -52,3 +52,5 @@ def mediafiles_urlpatterns():
urlpatterns += staticfiles_urlpatterns(prefix="/static/")
urlpatterns += mediafiles_urlpatterns()
handler500 = "taiga.base.api.views.api_server_error"

View File

@ -64,6 +64,9 @@ class UsersViewSet(ModelCrudViewSet):
queryset = models.User.objects.all()
filter_backends = (MembersFilterBackend,)
def create(self, *args, **kwargs):
raise exc.NotSupported()
def pre_conditions_on_save(self, obj):
if self.request.user.is_superuser:
return

View File

@ -28,7 +28,6 @@ class UserPermission(TaigaResourcePermission):
enought_perms = IsSuperUser()
global_perms = None
retrieve_perms = AllowAny()
create_perms = AllowAny()
update_perms = IsTheSameUser()
destroy_perms = IsTheSameUser()
list_perms = AllowAny()

View File

@ -70,7 +70,7 @@ class ProjectFactory(Factory):
class RoleFactory(Factory):
FACTORY_FOR = get_model("users", "Role")
name = "Tester"
name = factory.Sequence(lambda n: "Role {}".format(n))
slug = factory.Sequence(lambda n: "test-role-{}".format(n))
project = factory.SubFactory("tests.factories.ProjectFactory")
@ -102,7 +102,7 @@ class IssueAttachmentFactory(AttachmentFactory):
class WikiAttachmentFactory(AttachmentFactory):
content_object = factory.SubFactory("tests.factories.WikiFactory")
content_object = factory.SubFactory("tests.factories.WikiPageFactory")
class RolePointsFactory(Factory):

View File

@ -119,7 +119,7 @@ def test_user_create(client, data):
"full_name": "test",
})
results = helper_test_http_method(client, 'post', url, create_data, users)
assert results == [201]
assert results == [405]
def test_user_patch(client, data):

View File

@ -0,0 +1,639 @@
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
# Copyright (C) 2014 Jesús Espino <jespinog@gmail.com>
# Copyright (C) 2014 David Barragán <bameda@dbarragan.com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import pytest
import json
import base64
import datetime
from django.core.urlresolvers import reverse
from .. import factories as f
from taiga.projects.models import Project
from taiga.projects.issues.models import Issue
from taiga.projects.userstories.models import UserStory
from taiga.projects.tasks.models import Task
from taiga.projects.wiki.models import WikiPage
from taiga.export_import.service import project_to_dict
from taiga.export_import.dump_service import dict_to_project
pytestmark = pytest.mark.django_db
def test_invalid_project_import(client):
user = f.UserFactory.create()
client.login(user)
url = reverse("importer-list")
data = {}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 400
def test_valid_project_import_without_extra_data(client):
user = f.UserFactory.create()
client.login(user)
url = reverse("importer-list")
data = {
"name": "Imported project",
"description": "Imported project",
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 201
response_data = json.loads(response.content.decode("utf-8"))
must_empty_children = [
"issues", "user_stories", "roles", "us_statuses", "wiki_pages", "priorities",
"severities", "milestones", "points", "issue_types", "task_statuses",
"memberships", "issue_statuses", "wiki_links",
]
assert all(map(lambda x: len(response_data[x]) == 0, must_empty_children))
assert response_data["owner"] == user.email
def test_valid_project_import_with_not_existing_memberships(client):
user = f.UserFactory.create()
client.login(user)
url = reverse("importer-list")
data = {
"name": "Imported project",
"description": "Imported project",
"memberships": [{
"email": "bad@email.com",
"role": "Role",
}],
"roles": [{"name": "Role"}]
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 201
response_data = json.loads(response.content.decode("utf-8"))
# The new membership and the owner membership
assert len(response_data["memberships"]) == 2
def test_valid_project_import_with_extra_data(client):
user = f.UserFactory.create()
client.login(user)
url = reverse("importer-list")
data = {
"name": "Imported project",
"description": "Imported project",
"roles": [{
"permissions": [],
"name": "Test"
}],
"us_statuses": [{
"name": "Test"
}],
"severities": [{
"name": "Test"
}],
"priorities": [{
"name": "Test"
}],
"points": [{
"name": "Test"
}],
"issue_types": [{
"name": "Test"
}],
"task_statuses": [{
"name": "Test"
}],
"issue_statuses": [{
"name": "Test"
}],
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 201
response_data = json.loads(response.content.decode("utf-8"))
must_empty_children = [
"issues", "user_stories", "wiki_pages", "milestones",
"wiki_links",
]
must_one_instance_children = [
"roles", "us_statuses", "severities", "priorities", "points",
"issue_types", "task_statuses", "issue_statuses", "memberships",
]
assert all(map(lambda x: len(response_data[x]) == 0, must_empty_children))
# Allwais is created at least the owner membership
assert all(map(lambda x: len(response_data[x]) == 1, must_one_instance_children))
assert response_data["owner"] == user.email
def test_invalid_project_import_with_extra_data(client):
user = f.UserFactory.create()
client.login(user)
url = reverse("importer-list")
data = {
"name": "Imported project",
"description": "Imported project",
"roles": [{ }],
"us_statuses": [{ }],
"severities": [{ }],
"priorities": [{ }],
"points": [{ }],
"issue_types": [{ }],
"task_statuses": [{ }],
"issue_statuses": [{ }],
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 400
response_data = json.loads(response.content.decode("utf-8"))
assert len(response_data) == 8
assert Project.objects.filter(slug="imported-project").count() == 0
def test_invalid_issue_import(client):
user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user)
client.login(user)
url = reverse("importer-issue", args=[project.pk])
data = {}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 400
def test_valid_issue_import_without_extra_data(client):
user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user)
project.default_issue_type = f.IssueTypeFactory.create(project=project)
project.default_issue_status = f.IssueStatusFactory.create(project=project)
project.default_severity = f.SeverityFactory.create(project=project)
project.default_priority = f.PriorityFactory.create(project=project)
project.save()
client.login(user)
url = reverse("importer-issue", args=[project.pk])
data = {
"subject": "Test"
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 201
response_data = json.loads(response.content.decode("utf-8"))
assert response_data["owner"] == user.email
assert response_data["ref"] is not None
def test_valid_issue_import_with_extra_data(client):
user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user)
project.default_issue_type = f.IssueTypeFactory.create(project=project)
project.default_issue_status = f.IssueStatusFactory.create(project=project)
project.default_severity = f.SeverityFactory.create(project=project)
project.default_priority = f.PriorityFactory.create(project=project)
project.save()
client.login(user)
url = reverse("importer-issue", args=[project.pk])
data = {
"subject": "Imported issue",
"description": "Imported issue",
"attachments": [{
"owner": user.email,
"attached_file": {
"name": "imported attachment",
"data": base64.b64encode(b"TEST").decode("utf-8")
}
}]
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 201
response_data = json.loads(response.content.decode("utf-8"))
assert len(response_data["attachments"]) == 1
assert response_data["owner"] == user.email
assert response_data["ref"] is not None
def test_invalid_issue_import_with_extra_data(client):
user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user)
project.default_issue_type = f.IssueTypeFactory.create(project=project)
project.default_issue_status = f.IssueStatusFactory.create(project=project)
project.default_severity = f.SeverityFactory.create(project=project)
project.default_priority = f.PriorityFactory.create(project=project)
project.save()
client.login(user)
url = reverse("importer-issue", args=[project.pk])
data = {
"subject": "Imported issue",
"description": "Imported issue",
"attachments": [{ }],
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 400
response_data = json.loads(response.content.decode("utf-8"))
assert len(response_data) == 1
assert Issue.objects.filter(subject="Imported issue").count() == 0
def test_invalid_issue_import_with_bad_choices(client):
user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user)
project.default_issue_type = f.IssueTypeFactory.create(project=project)
project.default_issue_status = f.IssueStatusFactory.create(project=project)
project.default_severity = f.SeverityFactory.create(project=project)
project.default_priority = f.PriorityFactory.create(project=project)
project.save()
client.login(user)
url = reverse("importer-issue", args=[project.pk])
data = {
"subject": "Imported issue",
"description": "Imported issue",
"status": "Not valid"
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 400
response_data = json.loads(response.content.decode("utf-8"))
assert len(response_data) == 1
url = reverse("importer-issue", args=[project.pk])
data = {
"subject": "Imported issue",
"description": "Imported issue",
"priority": "Not valid"
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 400
response_data = json.loads(response.content.decode("utf-8"))
assert len(response_data) == 1
url = reverse("importer-issue", args=[project.pk])
data = {
"subject": "Imported issue",
"description": "Imported issue",
"severity": "Not valid"
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 400
response_data = json.loads(response.content.decode("utf-8"))
assert len(response_data) == 1
url = reverse("importer-issue", args=[project.pk])
data = {
"subject": "Imported issue",
"description": "Imported issue",
"type": "Not valid"
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 400
response_data = json.loads(response.content.decode("utf-8"))
assert len(response_data) == 1
def test_invalid_us_import(client):
user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user)
client.login(user)
url = reverse("importer-us", args=[project.pk])
data = {}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 400
def test_valid_us_import_without_extra_data(client):
user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user)
project.default_us_status = f.UserStoryStatusFactory.create(project=project)
project.save()
client.login(user)
url = reverse("importer-us", args=[project.pk])
data = {
"subject": "Test"
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 201
response_data = json.loads(response.content.decode("utf-8"))
assert response_data["owner"] == user.email
assert response_data["ref"] is not None
def test_valid_us_import_with_extra_data(client):
user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user)
project.default_us_status = f.UserStoryStatusFactory.create(project=project)
project.save()
client.login(user)
url = reverse("importer-us", args=[project.pk])
data = {
"subject": "Imported us",
"description": "Imported us",
"attachments": [{
"owner": user.email,
"attached_file": {
"name": "imported attachment",
"data": base64.b64encode(b"TEST").decode("utf-8")
}
}]
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 201
response_data = json.loads(response.content.decode("utf-8"))
assert len(response_data["attachments"]) == 1
assert response_data["owner"] == user.email
assert response_data["ref"] is not None
def test_invalid_us_import_with_extra_data(client):
user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user)
project.default_us_status = f.UserStoryStatusFactory.create(project=project)
project.save()
client.login(user)
url = reverse("importer-us", args=[project.pk])
data = {
"subject": "Imported us",
"description": "Imported us",
"attachments": [{ }],
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 400
response_data = json.loads(response.content.decode("utf-8"))
assert len(response_data) == 1
assert UserStory.objects.filter(subject="Imported us").count() == 0
def test_invalid_us_import_with_bad_choices(client):
user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user)
project.default_us_status = f.UserStoryStatusFactory.create(project=project)
project.save()
client.login(user)
url = reverse("importer-us", args=[project.pk])
data = {
"subject": "Imported us",
"description": "Imported us",
"status": "Not valid"
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 400
response_data = json.loads(response.content.decode("utf-8"))
assert len(response_data) == 1
def test_invalid_task_import(client):
user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user)
client.login(user)
url = reverse("importer-task", args=[project.pk])
data = {}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 400
def test_valid_task_import_without_extra_data(client):
user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user)
project.default_task_status = f.TaskStatusFactory.create(project=project)
project.save()
client.login(user)
url = reverse("importer-task", args=[project.pk])
data = {
"subject": "Test"
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 201
response_data = json.loads(response.content.decode("utf-8"))
assert response_data["owner"] == user.email
assert response_data["ref"] is not None
def test_valid_task_import_with_extra_data(client):
user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user)
project.default_task_status = f.TaskStatusFactory.create(project=project)
project.save()
client.login(user)
url = reverse("importer-task", args=[project.pk])
data = {
"subject": "Imported task",
"description": "Imported task",
"attachments": [{
"owner": user.email,
"attached_file": {
"name": "imported attachment",
"data": base64.b64encode(b"TEST").decode("utf-8")
}
}]
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 201
response_data = json.loads(response.content.decode("utf-8"))
assert len(response_data["attachments"]) == 1
assert response_data["owner"] == user.email
assert response_data["ref"] is not None
def test_invalid_task_import_with_extra_data(client):
user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user)
project.default_task_status = f.TaskStatusFactory.create(project=project)
project.save()
client.login(user)
url = reverse("importer-task", args=[project.pk])
data = {
"subject": "Imported task",
"description": "Imported task",
"attachments": [{ }],
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 400
response_data = json.loads(response.content.decode("utf-8"))
assert len(response_data) == 1
assert Task.objects.filter(subject="Imported task").count() == 0
def test_invalid_task_import_with_bad_choices(client):
user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user)
project.default_task_status = f.TaskStatusFactory.create(project=project)
project.save()
client.login(user)
url = reverse("importer-task", args=[project.pk])
data = {
"subject": "Imported task",
"description": "Imported task",
"status": "Not valid"
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 400
response_data = json.loads(response.content.decode("utf-8"))
assert len(response_data) == 1
def test_valid_task_with_user_story(client):
user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user)
project.default_task_status = f.TaskStatusFactory.create(project=project)
us = f.UserStoryFactory.create(project=project)
project.save()
client.login(user)
url = reverse("importer-task", args=[project.pk])
data = {
"subject": "Imported task",
"description": "Imported task",
"user_story": us.ref
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 201
assert us.tasks.all().count() == 1
def test_invalid_wiki_page_import(client):
user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user)
client.login(user)
url = reverse("importer-wiki-page", args=[project.pk])
data = {}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 400
def test_valid_wiki_page_import_without_extra_data(client):
user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user)
client.login(user)
url = reverse("importer-wiki-page", args=[project.pk])
data = {
"slug": "imported-wiki-page",
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 201
response_data = json.loads(response.content.decode("utf-8"))
assert response_data["owner"] == user.email
def test_valid_wiki_page_import_with_extra_data(client):
user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user)
client.login(user)
url = reverse("importer-wiki-page", args=[project.pk])
data = {
"slug": "imported-wiki-page",
"content": "Imported wiki_page",
"attachments": [{
"owner": user.email,
"attached_file": {
"name": "imported attachment",
"data": base64.b64encode(b"TEST").decode("utf-8")
}
}]
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 201
response_data = json.loads(response.content.decode("utf-8"))
assert len(response_data["attachments"]) == 1
assert response_data["owner"] == user.email
def test_invalid_wiki_page_import_with_extra_data(client):
user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user)
client.login(user)
url = reverse("importer-wiki-page", args=[project.pk])
data = {
"slug": "imported-wiki-page",
"content": "Imported wiki_page",
"attachments": [{ }],
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 400
response_data = json.loads(response.content.decode("utf-8"))
assert len(response_data) == 1
assert WikiPage.objects.filter(slug="imported-wiki-page").count() == 0
def test_invalid_wiki_link_import(client):
user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user)
client.login(user)
url = reverse("importer-wiki-link", args=[project.pk])
data = {}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 400
def test_valid_wiki_link_import(client):
user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user)
client.login(user)
url = reverse("importer-wiki-link", args=[project.pk])
data = {
"title": "Imported wiki_link",
"href": "imported-wiki-link",
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 201
response_data = json.loads(response.content.decode("utf-8"))
def test_invalid_milestone_import(client):
user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user)
client.login(user)
url = reverse("importer-milestone", args=[project.pk])
data = {}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 400
def test_valid_milestone_import(client):
user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user)
client.login(user)
url = reverse("importer-milestone", args=[project.pk])
data = {
"name": "Imported milestone",
"estimated_start": "2014-10-10",
"estimated_finish": "2014-10-20",
}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 201
response_data = json.loads(response.content.decode("utf-8"))

View File

@ -10,6 +10,21 @@ from taiga.users import models
pytestmark = pytest.mark.django_db
def test_api_user_normal_user(client):
user = f.UserFactory.create(is_superuser=True)
url = reverse('users-list')
data = {}
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 405
client.login(user)
response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 405
def test_api_user_patch_same_email(client):
user = f.UserFactory.create(email="same@email.com")
url = reverse('users-detail', kwargs={"pk": user.pk})