From f49d143c22af4905d10d7c92d49a328df91e1306 Mon Sep 17 00:00:00 2001 From: Alejandro Alonso Date: Sat, 15 Nov 2014 01:39:29 +0100 Subject: [PATCH] Fixing neighbors hell --- taiga/base/neighbors.py | 115 ++++++---------------------- tests/integration/test_neighbors.py | 75 ++++++++---------- 2 files changed, 55 insertions(+), 135 deletions(-) diff --git a/taiga/base/neighbors.py b/taiga/base/neighbors.py index 14223487..7b4adbfb 100644 --- a/taiga/base/neighbors.py +++ b/taiga/base/neighbors.py @@ -19,63 +19,11 @@ from functools import partial from collections import namedtuple from django.db.models import Q - +from django.db import connection Neighbor = namedtuple("Neighbor", "left right") -def disjunction_filters(filters): - """From a list of queryset filters, it returns a disjunction (OR) Q object. - - :param filters: List of filters where each item is a dict where the keys are the lookups and - the values the values of those lookups. - - :return: :class:`django.db.models.Q` instance representing the disjunction of the filters. - """ - result = Q() - for filter in filters: - result |= Q(**{lookup: value for lookup, value in filter.items()}) - return result - - -def get_attribute(obj, attr): - """Finds `attr` in obj. - - :param obj: Object where to look for the attribute. - :param attr: Attribute name as a string. It can be a Django lookup field such as - `project__owner__name`, in which case it will look for `obj.project.owner.name`. - - :return: The attribute value. - :raises: `AttributeError` if some attribute doesn't exist. - """ - chunks = attr.lstrip("-").split("__") - attr, chunks = chunks[0], chunks[1:] - obj = value = getattr(obj, attr) - for path in chunks: - value = getattr(obj, path) - obj = value - - return value - - -def transform_field_into_lookup(name, value, operator="", operator_if_desc=""): - """From a field name and value, return a dict that may be used as a queryset filter. - - :param name: Field name as a string. - :param value: Field value. - :param operator: Operator to use in the lookup. - :param operator_if_desc: If the field is reversed (a "-" in front) use this operator - instead. - - :return: A dict that may be used as a queryset filter. - """ - if name.startswith("-"): - name = name[1:] - operator = operator_if_desc - lookup = "{}{}".format(name, operator) - return {lookup: value} - - def get_neighbors(obj, results_set=None): """Get the neighbors of a model instance. @@ -89,51 +37,32 @@ def get_neighbors(obj, results_set=None): """ if results_set is None or results_set.count() == 0: results_set = type(obj).objects.get_queryset() + + compiler = results_set.query.get_compiler('default') + base_sql, base_params = compiler.as_sql(with_col_aliases=True) + + query = """ + SELECT * FROM + (SELECT "id" as id, ROW_NUMBER() OVER() + FROM (%s) as ID_AND_ROW) + AS SELECTED_ID_AND_ROW + """%(base_sql) + query += " WHERE id=%s;" + params = list(base_params) + [obj.id] + + cursor = connection.cursor() + cursor.execute(query, params) + row = cursor.fetchone() + obj_position = row[1] - 1 + try: - left = _left_candidates(obj, results_set).reverse()[0] + left = obj_position > 0 and results_set[obj_position - 1] or None except IndexError: left = None + try: - right = _right_candidates(obj, results_set)[0] + right = results_set[obj_position + 1] except IndexError: right = None return Neighbor(left, right) - - -def _get_candidates(obj, results_set, reverse=False): - ordering = (results_set.query.order_by or []) + list(obj._meta.ordering) - main_ordering, rest_ordering = ordering[0], ordering[1:] - try: - filters = obj.get_neighbors_additional_filters(results_set, ordering, reverse) - if filters is None: - raise AttributeError - except AttributeError: - filters = [order_field_as_filter(obj, main_ordering, reverse)] - filters += [ordering_fields_as_filter(obj, main_ordering, rest_ordering, reverse)] - - return (results_set - .filter(~Q(id=obj.id), disjunction_filters(filters)) - .distinct() - .order_by(*ordering)) -_left_candidates = partial(_get_candidates, reverse=True) -_right_candidates = partial(_get_candidates, reverse=False) - - -def order_field_as_filter(obj, order_field, reverse=None, operator=None): - value = get_attribute(obj, order_field) - if reverse is not None: - if operator is None: - operator = ("__gt", "__lt") - operator = (operator[1], operator[0]) if reverse else operator - else: - operator = () - return transform_field_into_lookup(order_field, value, *operator) - - -def ordering_fields_as_filter(obj, main_order_field, ordering_fields, reverse=False): - """Transform a list of ordering fields into a queryset filter.""" - filter = order_field_as_filter(obj, main_order_field) - for field in ordering_fields: - filter.update(order_field_as_filter(obj, field, reverse, ("__gte", "__lte"))) - return filter diff --git a/tests/integration/test_neighbors.py b/tests/integration/test_neighbors.py index 3281dcee..8cb37e90 100644 --- a/tests/integration/test_neighbors.py +++ b/tests/integration/test_neighbors.py @@ -36,48 +36,6 @@ def teardown_module(): reconnect_signals() -class TestGetAttribute: - def test_no_attribute(self, object): - object.full_name = "name" - with pytest.raises(AttributeError): - n.get_attribute(object, "name") - - with pytest.raises(AttributeError): - n.get_attribute(object, "full_name__last_name") - - def test_one_level(self, object): - object.name = "name" - assert n.get_attribute(object, "name") == object.name - - def test_two_levels(self, object): - object.name = object - object.name.full_name = "first name" - assert n.get_attribute(object, "name__full_name") == object.name.full_name - - def test_three_levels(self, object): - object.info = object - object.info.name = object - object.info.name.full_name = "first name" - assert n.get_attribute(object, "info__name__full_name") == object.info.name.full_name - - -def test_transform_field_into_lookup(): - transform = partial(n.transform_field_into_lookup, value="chuck", operator="__lt", - operator_if_desc="__gt") - - assert transform(name="name") == {"name__lt": "chuck"} - assert transform(name="-name") == {"name__gt": "chuck"} - - -def test_disjunction_filters(): - filters = [{"age__lt": 21, "name__eq": "chuck"}] - result_str = str(n.disjunction_filters(filters)) - - assert result_str.startswith("(OR: ") - assert "('age__lt', 21)" in result_str - assert "('name__eq', 'chuck')" in result_str - - @pytest.mark.django_db class TestUserStories: def test_no_filters(self): @@ -338,3 +296,36 @@ class TestIssues: assert issue1_neighbors.right == issue3 assert issue2_neighbors.left == issue3 assert issue2_neighbors.right is None + + def test_ordering_by_assigned_to_desc_with_none_values(self): + project = f.ProjectFactory.create() + + issue1 = f.IssueFactory.create(project=project, assigned_to=None) + issue2 = f.IssueFactory.create(project=project, assigned_to=None) + issue3 = f.IssueFactory.create(project=project, assigned_to=None) + + issues = Issue.objects.filter(project=project).order_by("-assigned_to__full_name") + issue1_neighbors = n.get_neighbors(issue1, results_set=issues) + issue2_neighbors = n.get_neighbors(issue2, results_set=issues) + + assert issue1_neighbors.left == issue2 + assert issue1_neighbors.right is None + assert issue2_neighbors.left == issue3 + assert issue2_neighbors.right == issue1 + + def test_ordering_by_assigned_to_desc_with_none_and_normal_values(self): + project = f.ProjectFactory.create() + assigned_to1 = f.UserFactory.create(full_name="Chuck Norris") + issue1 = f.IssueFactory.create(project=project, assigned_to=None) + issue2 = f.IssueFactory.create(project=project, assigned_to=assigned_to1) + issue3 = f.IssueFactory.create(project=project, assigned_to=None) + + issues = Issue.objects.filter(project=project).order_by("-assigned_to__full_name") + + issue1_neighbors = n.get_neighbors(issue1, results_set=issues) + issue2_neighbors = n.get_neighbors(issue2, results_set=issues) + + assert issue1_neighbors.left == issue3 + assert issue1_neighbors.right == issue2 + assert issue2_neighbors.left == issue1 + assert issue2_neighbors.right is None