399 lines
14 KiB
Python
399 lines
14 KiB
Python
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
|
|
# Copyright (C) 2014 Jesús Espino <jespinog@gmail.com>
|
|
# Copyright (C) 2014 David Barragán <bameda@dbarragan.com>
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
import operator
|
|
from functools import reduce
|
|
import logging
|
|
|
|
from django.apps import apps
|
|
from django.db.models import Q
|
|
from django.utils.translation import ugettext as _
|
|
|
|
from taiga.base import exceptions as exc
|
|
from taiga.base.api.utils import get_object_or_404
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
class BaseFilterBackend(object):
|
|
"""
|
|
A base class from which all filter backend classes should inherit.
|
|
"""
|
|
|
|
def filter_queryset(self, request, queryset, view):
|
|
"""
|
|
Return a filtered queryset.
|
|
"""
|
|
raise NotImplementedError(".filter_queryset() must be overridden.")
|
|
|
|
|
|
class QueryParamsFilterMixin(BaseFilterBackend):
|
|
_special_values_dict = {
|
|
'true': True,
|
|
'false': False,
|
|
'null': None,
|
|
}
|
|
|
|
def filter_queryset(self, request, queryset, view):
|
|
query_params = {}
|
|
|
|
if not hasattr(view, "filter_fields"):
|
|
return queryset
|
|
|
|
for field in view.filter_fields:
|
|
if isinstance(field, (tuple, list)):
|
|
param_name, field_name = field
|
|
else:
|
|
param_name, field_name = field, field
|
|
|
|
if param_name in request.QUERY_PARAMS:
|
|
field_data = request.QUERY_PARAMS[param_name]
|
|
if field_data in self._special_values_dict:
|
|
query_params[field_name] = self._special_values_dict[field_data]
|
|
else:
|
|
query_params[field_name] = field_data
|
|
|
|
if query_params:
|
|
try:
|
|
queryset = queryset.filter(**query_params)
|
|
except ValueError:
|
|
raise exc.BadRequest(_("Error in filter params types."))
|
|
|
|
return queryset
|
|
|
|
|
|
class OrderByFilterMixin(QueryParamsFilterMixin):
|
|
order_by_query_param = "order_by"
|
|
|
|
def filter_queryset(self, request, queryset, view):
|
|
queryset = super().filter_queryset(request, queryset, view)
|
|
order_by_fields = getattr(view, "order_by_fields", None)
|
|
|
|
raw_fieldname = request.QUERY_PARAMS.get(self.order_by_query_param, None)
|
|
if not raw_fieldname or not order_by_fields:
|
|
return queryset
|
|
|
|
if raw_fieldname.startswith("-"):
|
|
field_name = raw_fieldname[1:]
|
|
else:
|
|
field_name = raw_fieldname
|
|
|
|
if field_name not in order_by_fields:
|
|
return queryset
|
|
|
|
return super().filter_queryset(request, queryset.order_by(raw_fieldname), view)
|
|
|
|
|
|
class FilterBackend(OrderByFilterMixin):
|
|
"""
|
|
Default filter backend.
|
|
"""
|
|
pass
|
|
|
|
|
|
class PermissionBasedFilterBackend(FilterBackend):
|
|
permission = None
|
|
|
|
def filter_queryset(self, request, queryset, view):
|
|
project_id = None
|
|
if (hasattr(view, "filter_fields") and "project" in view.filter_fields and
|
|
"project" in request.QUERY_PARAMS):
|
|
try:
|
|
project_id = int(request.QUERY_PARAMS["project"])
|
|
except:
|
|
logger.error("Filtering project diferent value than an integer: {}".format(
|
|
request.QUERY_PARAMS["project"]
|
|
))
|
|
raise exc.BadRequest(_("'project' must be an integer value."))
|
|
|
|
qs = queryset
|
|
|
|
if request.user.is_authenticated() and request.user.is_superuser:
|
|
qs = qs
|
|
elif request.user.is_authenticated():
|
|
membership_model = apps.get_model('projects', 'Membership')
|
|
memberships_qs = membership_model.objects.filter(user=request.user)
|
|
if project_id:
|
|
memberships_qs = memberships_qs.filter(project_id=project_id)
|
|
memberships_qs = memberships_qs.filter(Q(role__permissions__contains=[self.permission]) |
|
|
Q(is_owner=True))
|
|
|
|
projects_list = [membership.project_id for membership in memberships_qs]
|
|
|
|
qs = qs.filter(Q(project_id__in=projects_list) |
|
|
Q(project__public_permissions__contains=[self.permission]))
|
|
else:
|
|
qs = qs.filter(project__anon_permissions__contains=[self.permission])
|
|
|
|
return super().filter_queryset(request, qs.distinct(), view)
|
|
|
|
|
|
class CanViewProjectFilterBackend(PermissionBasedFilterBackend):
|
|
permission = "view_project"
|
|
|
|
|
|
class CanViewUsFilterBackend(PermissionBasedFilterBackend):
|
|
permission = "view_us"
|
|
|
|
|
|
class CanViewIssuesFilterBackend(PermissionBasedFilterBackend):
|
|
permission = "view_issues"
|
|
|
|
|
|
class CanViewTasksFilterBackend(PermissionBasedFilterBackend):
|
|
permission = "view_tasks"
|
|
|
|
|
|
class CanViewWikiPagesFilterBackend(PermissionBasedFilterBackend):
|
|
permission = "view_wiki_pages"
|
|
|
|
|
|
class CanViewWikiLinksFilterBackend(PermissionBasedFilterBackend):
|
|
permission = "view_wiki_links"
|
|
|
|
|
|
class CanViewMilestonesFilterBackend(PermissionBasedFilterBackend):
|
|
permission = "view_milestones"
|
|
|
|
|
|
class PermissionBasedAttachmentFilterBackend(PermissionBasedFilterBackend):
|
|
permission = None
|
|
|
|
def filter_queryset(self, request, queryset, view):
|
|
qs = super().filter_queryset(request, queryset, view)
|
|
|
|
ct = view.get_content_type()
|
|
return qs.filter(content_type=ct)
|
|
|
|
|
|
class CanViewUserStoryAttachmentFilterBackend(PermissionBasedAttachmentFilterBackend):
|
|
permission = "view_us"
|
|
|
|
|
|
class CanViewTaskAttachmentFilterBackend(PermissionBasedAttachmentFilterBackend):
|
|
permission = "view_tasks"
|
|
|
|
|
|
class CanViewIssueAttachmentFilterBackend(PermissionBasedAttachmentFilterBackend):
|
|
permission = "view_issues"
|
|
|
|
|
|
class CanViewWikiAttachmentFilterBackend(PermissionBasedAttachmentFilterBackend):
|
|
permission = "view_wiki_pages"
|
|
|
|
|
|
class CanViewProjectObjFilterBackend(FilterBackend):
|
|
def filter_queryset(self, request, queryset, view):
|
|
project_id = None
|
|
if (hasattr(view, "filter_fields") and "project" in view.filter_fields and
|
|
"project" in request.QUERY_PARAMS):
|
|
try:
|
|
project_id = int(request.QUERY_PARAMS["project"])
|
|
except:
|
|
logger.error("Filtering project diferent value than an integer: {}".format(
|
|
request.QUERY_PARAMS["project"]
|
|
))
|
|
raise exc.BadRequest(_("'project' must be an integer value."))
|
|
|
|
qs = queryset
|
|
|
|
if request.user.is_authenticated() and request.user.is_superuser:
|
|
qs = qs
|
|
elif request.user.is_authenticated():
|
|
membership_model = apps.get_model("projects", "Membership")
|
|
memberships_qs = membership_model.objects.filter(user=request.user)
|
|
if project_id:
|
|
memberships_qs = memberships_qs.filter(project_id=project_id)
|
|
memberships_qs = memberships_qs.filter(Q(role__permissions__contains=['view_project']) |
|
|
Q(is_owner=True))
|
|
|
|
projects_list = [membership.project_id for membership in memberships_qs]
|
|
|
|
qs = qs.filter((Q(id__in=projects_list) |
|
|
Q(public_permissions__contains=["view_project"])))
|
|
else:
|
|
qs = qs.filter(anon_permissions__contains=["view_project"])
|
|
|
|
return super().filter_queryset(request, qs.distinct(), view)
|
|
|
|
|
|
class IsProjectMemberFilterBackend(FilterBackend):
|
|
def filter_queryset(self, request, queryset, view):
|
|
if request.user.is_authenticated() and request.user.is_superuser:
|
|
queryset = queryset
|
|
elif request.user.is_authenticated():
|
|
queryset = queryset.filter(project__members=request.user)
|
|
else:
|
|
queryset = queryset.none()
|
|
|
|
return super().filter_queryset(request, queryset.distinct(), view)
|
|
|
|
|
|
class MembersFilterBackend(PermissionBasedFilterBackend):
|
|
permission = "view_project"
|
|
|
|
def filter_queryset(self, request, queryset, view):
|
|
project_id = None
|
|
project = None
|
|
qs = queryset.filter(is_active=True)
|
|
if "project" in request.QUERY_PARAMS:
|
|
try:
|
|
project_id = int(request.QUERY_PARAMS["project"])
|
|
except:
|
|
logger.error("Filtering project diferent value than an integer: {}".format(
|
|
request.QUERY_PARAMS["project"]))
|
|
raise exc.BadRequest(_("'project' must be an integer value."))
|
|
|
|
if project_id:
|
|
Project = apps.get_model('projects', 'Project')
|
|
project = get_object_or_404(Project, pk=project_id)
|
|
|
|
if request.user.is_authenticated() and request.user.is_superuser:
|
|
qs = qs
|
|
elif request.user.is_authenticated():
|
|
Membership = apps.get_model('projects', 'Membership')
|
|
memberships_qs = Membership.objects.filter(user=request.user)
|
|
if project_id:
|
|
memberships_qs = memberships_qs.filter(project_id=project_id)
|
|
memberships_qs = memberships_qs.filter(Q(role__permissions__contains=[self.permission]) |
|
|
Q(is_owner=True))
|
|
|
|
projects_list = [membership.project_id for membership in memberships_qs]
|
|
|
|
if project:
|
|
is_member = project.id in projects_list
|
|
has_project_public_view_permission = "view_project" in project.public_permissions
|
|
if not is_member and not has_project_public_view_permission:
|
|
qs = qs.none()
|
|
|
|
q = Q(memberships__project_id__in=projects_list) | Q(id=request.user.id)
|
|
|
|
#If there is no selected project we want access to users from public projects
|
|
if not project:
|
|
q = q | Q(memberships__project__public_permissions__contains=[self.permission])
|
|
|
|
qs = qs.filter(q)
|
|
|
|
else:
|
|
if project and not "view_project" in project.anon_permissions:
|
|
qs = qs.none()
|
|
|
|
qs = qs.filter(memberships__project__anon_permissions__contains=[self.permission])
|
|
|
|
return qs.distinct()
|
|
|
|
|
|
class BaseIsProjectAdminFilterBackend(object):
|
|
def get_project_ids(self, request, view):
|
|
project_id = None
|
|
if hasattr(view, "filter_fields") and "project" in view.filter_fields:
|
|
project_id = request.QUERY_PARAMS.get("project", None)
|
|
|
|
if request.user.is_authenticated() and request.user.is_superuser:
|
|
return None
|
|
|
|
if not request.user.is_authenticated():
|
|
return []
|
|
|
|
membership_model = apps.get_model('projects', 'Membership')
|
|
memberships_qs = membership_model.objects.filter(user=request.user, is_owner=True)
|
|
if project_id:
|
|
memberships_qs = memberships_qs.filter(project_id=project_id)
|
|
|
|
projects_list = [membership.project_id for membership in memberships_qs]
|
|
|
|
return projects_list
|
|
|
|
|
|
class IsProjectAdminFilterBackend(FilterBackend, BaseIsProjectAdminFilterBackend):
|
|
def filter_queryset(self, request, queryset, view):
|
|
project_ids = self.get_project_ids(request, view)
|
|
if project_ids is None:
|
|
queryset = queryset
|
|
elif project_ids == []:
|
|
queryset = queryset.none()
|
|
else:
|
|
queryset = queryset.filter(project_id__in=project_ids)
|
|
|
|
return super().filter_queryset(request, queryset.distinct(), view)
|
|
|
|
|
|
class IsProjectAdminFromWebhookLogFilterBackend(FilterBackend, BaseIsProjectAdminFilterBackend):
|
|
def filter_queryset(self, request, queryset, view):
|
|
project_ids = self.get_project_ids(request, view)
|
|
if project_ids is None:
|
|
queryset = queryset
|
|
elif project_ids == []:
|
|
queryset = queryset.none()
|
|
else:
|
|
queryset = queryset.filter(webhook__project_id__in=project_ids)
|
|
|
|
return super().filter_queryset(request, queryset, view)
|
|
|
|
|
|
class TagsFilter(FilterBackend):
|
|
def __init__(self, filter_name='tags'):
|
|
self.filter_name = filter_name
|
|
|
|
def _get_tags_queryparams(self, params):
|
|
tags = params.get(self.filter_name, None)
|
|
if tags:
|
|
return tags.split(",")
|
|
|
|
return None
|
|
|
|
def filter_queryset(self, request, queryset, view):
|
|
query_tags = self._get_tags_queryparams(request.QUERY_PARAMS)
|
|
if query_tags:
|
|
queryset = queryset.filter(tags__contains=query_tags)
|
|
|
|
return super().filter_queryset(request, queryset, view)
|
|
|
|
|
|
class StatusFilter(FilterBackend):
|
|
def __init__(self, filter_name='status'):
|
|
self.filter_name = filter_name
|
|
|
|
def _get_status_queryparams(self, params):
|
|
status = params.get(self.filter_name, None)
|
|
if status is not None:
|
|
status = set([x.strip() for x in status.split(",")])
|
|
return list(status)
|
|
|
|
return None
|
|
|
|
def filter_queryset(self, request, queryset, view):
|
|
query_status = self._get_status_queryparams(request.QUERY_PARAMS)
|
|
if query_status:
|
|
queryset = queryset.filter(status__in=query_status)
|
|
|
|
return super().filter_queryset(request, queryset, view)
|
|
|
|
|
|
class QFilter(FilterBackend):
|
|
def filter_queryset(self, request, queryset, view):
|
|
q = request.QUERY_PARAMS.get('q', None)
|
|
if q:
|
|
if q.isdigit():
|
|
qs_args = [Q(ref=q)]
|
|
else:
|
|
qs_args = [Q(subject__icontains=x) for x in q.split()]
|
|
|
|
queryset = queryset.filter(reduce(operator.and_, qs_args))
|
|
|
|
return queryset
|