Merge pull request #153 from taigaio/bug/1609/issues-neightbors
Fixing neighbors hellremotes/origin/enhancement/email-actions
commit
9d1f2f2bb6
|
@ -19,63 +19,11 @@ from functools import partial
|
|||
from collections import namedtuple
|
||||
|
||||
from django.db.models import Q
|
||||
|
||||
from django.db import connection
|
||||
|
||||
Neighbor = namedtuple("Neighbor", "left right")
|
||||
|
||||
|
||||
def disjunction_filters(filters):
|
||||
"""From a list of queryset filters, it returns a disjunction (OR) Q object.
|
||||
|
||||
:param filters: List of filters where each item is a dict where the keys are the lookups and
|
||||
the values the values of those lookups.
|
||||
|
||||
:return: :class:`django.db.models.Q` instance representing the disjunction of the filters.
|
||||
"""
|
||||
result = Q()
|
||||
for filter in filters:
|
||||
result |= Q(**{lookup: value for lookup, value in filter.items()})
|
||||
return result
|
||||
|
||||
|
||||
def get_attribute(obj, attr):
|
||||
"""Finds `attr` in obj.
|
||||
|
||||
:param obj: Object where to look for the attribute.
|
||||
:param attr: Attribute name as a string. It can be a Django lookup field such as
|
||||
`project__owner__name`, in which case it will look for `obj.project.owner.name`.
|
||||
|
||||
:return: The attribute value.
|
||||
:raises: `AttributeError` if some attribute doesn't exist.
|
||||
"""
|
||||
chunks = attr.lstrip("-").split("__")
|
||||
attr, chunks = chunks[0], chunks[1:]
|
||||
obj = value = getattr(obj, attr)
|
||||
for path in chunks:
|
||||
value = getattr(obj, path)
|
||||
obj = value
|
||||
|
||||
return value
|
||||
|
||||
|
||||
def transform_field_into_lookup(name, value, operator="", operator_if_desc=""):
|
||||
"""From a field name and value, return a dict that may be used as a queryset filter.
|
||||
|
||||
:param name: Field name as a string.
|
||||
:param value: Field value.
|
||||
:param operator: Operator to use in the lookup.
|
||||
:param operator_if_desc: If the field is reversed (a "-" in front) use this operator
|
||||
instead.
|
||||
|
||||
:return: A dict that may be used as a queryset filter.
|
||||
"""
|
||||
if name.startswith("-"):
|
||||
name = name[1:]
|
||||
operator = operator_if_desc
|
||||
lookup = "{}{}".format(name, operator)
|
||||
return {lookup: value}
|
||||
|
||||
|
||||
def get_neighbors(obj, results_set=None):
|
||||
"""Get the neighbors of a model instance.
|
||||
|
||||
|
@ -89,51 +37,32 @@ def get_neighbors(obj, results_set=None):
|
|||
"""
|
||||
if results_set is None or results_set.count() == 0:
|
||||
results_set = type(obj).objects.get_queryset()
|
||||
|
||||
compiler = results_set.query.get_compiler('default')
|
||||
base_sql, base_params = compiler.as_sql(with_col_aliases=True)
|
||||
|
||||
query = """
|
||||
SELECT * FROM
|
||||
(SELECT "id" as id, ROW_NUMBER() OVER()
|
||||
FROM (%s) as ID_AND_ROW)
|
||||
AS SELECTED_ID_AND_ROW
|
||||
"""%(base_sql)
|
||||
query += " WHERE id=%s;"
|
||||
params = list(base_params) + [obj.id]
|
||||
|
||||
cursor = connection.cursor()
|
||||
cursor.execute(query, params)
|
||||
row = cursor.fetchone()
|
||||
obj_position = row[1] - 1
|
||||
|
||||
try:
|
||||
left = _left_candidates(obj, results_set).reverse()[0]
|
||||
left = obj_position > 0 and results_set[obj_position - 1] or None
|
||||
except IndexError:
|
||||
left = None
|
||||
|
||||
try:
|
||||
right = _right_candidates(obj, results_set)[0]
|
||||
right = results_set[obj_position + 1]
|
||||
except IndexError:
|
||||
right = None
|
||||
|
||||
return Neighbor(left, right)
|
||||
|
||||
|
||||
def _get_candidates(obj, results_set, reverse=False):
|
||||
ordering = (results_set.query.order_by or []) + list(obj._meta.ordering)
|
||||
main_ordering, rest_ordering = ordering[0], ordering[1:]
|
||||
try:
|
||||
filters = obj.get_neighbors_additional_filters(results_set, ordering, reverse)
|
||||
if filters is None:
|
||||
raise AttributeError
|
||||
except AttributeError:
|
||||
filters = [order_field_as_filter(obj, main_ordering, reverse)]
|
||||
filters += [ordering_fields_as_filter(obj, main_ordering, rest_ordering, reverse)]
|
||||
|
||||
return (results_set
|
||||
.filter(~Q(id=obj.id), disjunction_filters(filters))
|
||||
.distinct()
|
||||
.order_by(*ordering))
|
||||
_left_candidates = partial(_get_candidates, reverse=True)
|
||||
_right_candidates = partial(_get_candidates, reverse=False)
|
||||
|
||||
|
||||
def order_field_as_filter(obj, order_field, reverse=None, operator=None):
|
||||
value = get_attribute(obj, order_field)
|
||||
if reverse is not None:
|
||||
if operator is None:
|
||||
operator = ("__gt", "__lt")
|
||||
operator = (operator[1], operator[0]) if reverse else operator
|
||||
else:
|
||||
operator = ()
|
||||
return transform_field_into_lookup(order_field, value, *operator)
|
||||
|
||||
|
||||
def ordering_fields_as_filter(obj, main_order_field, ordering_fields, reverse=False):
|
||||
"""Transform a list of ordering fields into a queryset filter."""
|
||||
filter = order_field_as_filter(obj, main_order_field)
|
||||
for field in ordering_fields:
|
||||
filter.update(order_field_as_filter(obj, field, reverse, ("__gte", "__lte")))
|
||||
return filter
|
||||
|
|
|
@ -36,48 +36,6 @@ def teardown_module():
|
|||
reconnect_signals()
|
||||
|
||||
|
||||
class TestGetAttribute:
|
||||
def test_no_attribute(self, object):
|
||||
object.full_name = "name"
|
||||
with pytest.raises(AttributeError):
|
||||
n.get_attribute(object, "name")
|
||||
|
||||
with pytest.raises(AttributeError):
|
||||
n.get_attribute(object, "full_name__last_name")
|
||||
|
||||
def test_one_level(self, object):
|
||||
object.name = "name"
|
||||
assert n.get_attribute(object, "name") == object.name
|
||||
|
||||
def test_two_levels(self, object):
|
||||
object.name = object
|
||||
object.name.full_name = "first name"
|
||||
assert n.get_attribute(object, "name__full_name") == object.name.full_name
|
||||
|
||||
def test_three_levels(self, object):
|
||||
object.info = object
|
||||
object.info.name = object
|
||||
object.info.name.full_name = "first name"
|
||||
assert n.get_attribute(object, "info__name__full_name") == object.info.name.full_name
|
||||
|
||||
|
||||
def test_transform_field_into_lookup():
|
||||
transform = partial(n.transform_field_into_lookup, value="chuck", operator="__lt",
|
||||
operator_if_desc="__gt")
|
||||
|
||||
assert transform(name="name") == {"name__lt": "chuck"}
|
||||
assert transform(name="-name") == {"name__gt": "chuck"}
|
||||
|
||||
|
||||
def test_disjunction_filters():
|
||||
filters = [{"age__lt": 21, "name__eq": "chuck"}]
|
||||
result_str = str(n.disjunction_filters(filters))
|
||||
|
||||
assert result_str.startswith("(OR: ")
|
||||
assert "('age__lt', 21)" in result_str
|
||||
assert "('name__eq', 'chuck')" in result_str
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestUserStories:
|
||||
def test_no_filters(self):
|
||||
|
@ -338,3 +296,36 @@ class TestIssues:
|
|||
assert issue1_neighbors.right == issue3
|
||||
assert issue2_neighbors.left == issue3
|
||||
assert issue2_neighbors.right is None
|
||||
|
||||
def test_ordering_by_assigned_to_desc_with_none_values(self):
|
||||
project = f.ProjectFactory.create()
|
||||
|
||||
issue1 = f.IssueFactory.create(project=project, assigned_to=None)
|
||||
issue2 = f.IssueFactory.create(project=project, assigned_to=None)
|
||||
issue3 = f.IssueFactory.create(project=project, assigned_to=None)
|
||||
|
||||
issues = Issue.objects.filter(project=project).order_by("-assigned_to__full_name")
|
||||
issue1_neighbors = n.get_neighbors(issue1, results_set=issues)
|
||||
issue2_neighbors = n.get_neighbors(issue2, results_set=issues)
|
||||
|
||||
assert issue1_neighbors.left == issue2
|
||||
assert issue1_neighbors.right is None
|
||||
assert issue2_neighbors.left == issue3
|
||||
assert issue2_neighbors.right == issue1
|
||||
|
||||
def test_ordering_by_assigned_to_desc_with_none_and_normal_values(self):
|
||||
project = f.ProjectFactory.create()
|
||||
assigned_to1 = f.UserFactory.create(full_name="Chuck Norris")
|
||||
issue1 = f.IssueFactory.create(project=project, assigned_to=None)
|
||||
issue2 = f.IssueFactory.create(project=project, assigned_to=assigned_to1)
|
||||
issue3 = f.IssueFactory.create(project=project, assigned_to=None)
|
||||
|
||||
issues = Issue.objects.filter(project=project).order_by("-assigned_to__full_name")
|
||||
|
||||
issue1_neighbors = n.get_neighbors(issue1, results_set=issues)
|
||||
issue2_neighbors = n.get_neighbors(issue2, results_set=issues)
|
||||
|
||||
assert issue1_neighbors.left == issue3
|
||||
assert issue1_neighbors.right == issue2
|
||||
assert issue2_neighbors.left == issue1
|
||||
assert issue2_neighbors.right is None
|
||||
|
|
Loading…
Reference in New Issue