From 2d75b39dce7b16e13d82897d5704d83531919e48 Mon Sep 17 00:00:00 2001 From: Emanuel Lupi Date: Sun, 22 Dec 2024 16:35:50 -0300 Subject: [PATCH] Add subquery and foreign field unit tests. --- tests/model_fields_/models.py | 7 ++ tests/model_fields_/test_embedded_model.py | 98 +++++++++++++++++++--- 2 files changed, 92 insertions(+), 13 deletions(-) diff --git a/tests/model_fields_/models.py b/tests/model_fields_/models.py index 87b24295..41451b14 100644 --- a/tests/model_fields_/models.py +++ b/tests/model_fields_/models.py @@ -61,3 +61,10 @@ class Author(models.Model): class Book(models.Model): name = models.CharField(max_length=100) author = EmbeddedModelField(Author) + + +class Library(models.Model): + name = models.CharField(max_length=100) + books = models.ManyToManyField("Book", related_name="libraries") + location = models.CharField(max_length=100, null=True, blank=True) + best_seller = models.CharField(max_length=100, null=True, blank=True) diff --git a/tests/model_fields_/test_embedded_model.py b/tests/model_fields_/test_embedded_model.py index 324fb666..8165bbaa 100644 --- a/tests/model_fields_/test_embedded_model.py +++ b/tests/model_fields_/test_embedded_model.py @@ -1,9 +1,18 @@ import operator -from datetime import timedelta from decimal import Decimal from django.core.exceptions import FieldDoesNotExist, ValidationError -from django.db.models import ExpressionWrapper, F, IntegerField, Max, Model, Sum +from django.db.models import ( + Exists, + ExpressionWrapper, + F, + IntegerField, + Max, + Model, + OuterRef, + Subquery, + Sum, +) from django.test import SimpleTestCase, TestCase from django_mongodb.fields import EmbeddedModelField @@ -16,6 +25,7 @@ DecimalParent, EmbeddedModel, EmbeddedModelFieldModel, + Library, ) @@ -185,9 +195,9 @@ def test_embedded_with_json_field(self): models[1:3], ) - @staticmethod - def _truncate_ms(time): - return time - timedelta(microseconds=time.microsecond) + def truncate_ms(self, value): + """Truncate microsends to millisecond precision as supported by MongoDB.""" + return value.replace(microsecond=(value.microsecond // 1000) * 1000) ################ def test_ordering_by_embedded_field(self): @@ -200,23 +210,26 @@ def test_ordering_by_embedded_field(self): self.assertSequenceEqual(query, expected) def test_ordering_grouping_by_embedded_field(self): + expected = sorted( + ( + EmbeddedModelFieldModel.objects.create(simple=EmbeddedModel(someint=x)) + for x in range(6) + ), + key=lambda x: x.simple.someint, + ) query = ( EmbeddedModelFieldModel.objects.annotate( group=ExpressionWrapper(F("simple__someint") + 5, output_field=IntegerField()) ) .values("group") - .annotate(max_pk=Max("simple__auto_now")) + .annotate(max_auto_now=Max("simple__auto_now")) .order_by("simple__someint") ) - query = [{**e, "max_pk": self._truncate_ms(e["max_pk"])} for e in query] - expected = [ - EmbeddedModelFieldModel.objects.create(simple=EmbeddedModel(someint=x)) - for x in range(6) - ] + query_response = [{**e, "max_auto_now": self.truncate_ms(e["max_auto_now"])} for e in query] self.assertSequenceEqual( - query, + query_response, [ - {"group": e.simple.someint + 5, "max_pk": self._truncate_ms(e.simple.auto_now)} + {"group": e.simple.someint + 5, "max_auto_now": self.truncate_ms(e.simple.auto_now)} for e in expected ], ) @@ -229,3 +242,62 @@ def test_ordering_grouping_by_sum(self): .order_by("sum") ) self.assertQuerySetEqual(qs, [0, 2, 4, 6, 8, 10], operator.itemgetter("sum")) + + +class SubqueryExistsTestCase(TestCase): + def setUp(self): + # Create test data + address1 = Address.objects.create(city="New York", state="NY", zip_code=10001) + address2 = Address.objects.create(city="Boston", state="MA", zip_code=20002) + author1 = Author.objects.create(name="Alice", age=30, address=address1) + author2 = Author.objects.create(name="Bob", age=40, address=address2) + book1 = Book.objects.create(name="Book A", author=author1) + book2 = Book.objects.create(name="Book B", author=author2) + Book.objects.create(name="Book C", author=author2) + Book.objects.create(name="Book D", author=author2) + Book.objects.create(name="Book E", author=author1) + + library1 = Library.objects.create( + name="Central Library", location="Downtown", best_seller="Book A" + ) + library2 = Library.objects.create( + name="Community Library", location="Suburbs", best_seller="Book A" + ) + + # Add books to libraries + library1.books.add(book1, book2) + library2.books.add(book2) + + def test_exists_subquery(self): + subquery = Book.objects.filter( + author__name=OuterRef("name"), author__address__city="Boston" + ) + queryset = Author.objects.filter(Exists(subquery)) + + self.assertEqual(queryset.count(), 1) + + def test_in_subquery(self): + subquery = Author.objects.filter(age__gt=35).values("name") + queryset = Book.objects.filter(author__name__in=Subquery(subquery)).order_by("name") + + self.assertEqual(queryset.count(), 3) + self.assertQuerySetEqual(queryset, ["Book B", "Book C", "Book D"], lambda book: book.name) + + def test_range_query(self): + queryset = Author.objects.filter(age__range=(25, 45)).order_by("name") + + self.assertEqual(queryset.count(), 2) + self.assertQuerySetEqual(queryset, ["Alice", "Bob"], lambda author: author.name) + + def test_exists_with_foreign_object(self): + subquery = Library.objects.filter(best_seller=OuterRef("name")) + queryset = Book.objects.filter(Exists(subquery)) + + self.assertEqual(queryset.count(), 1) + self.assertEqual(queryset.first().name, "Book A") + + def test_foreign_field_with_ranges(self): + queryset = Library.objects.filter(books__author__age__range=(25, 35)) + + self.assertEqual(queryset.count(), 1) + self.assertEqual(queryset.first().name, "Central Library")