Skip to content

Commit

Permalink
Add subquery and foreign field unit tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
WaVEV committed Dec 30, 2024
1 parent a2624f8 commit 2d75b39
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 13 deletions.
7 changes: 7 additions & 0 deletions tests/model_fields_/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,10 @@ class Author(models.Model):
class Book(models.Model):
name = models.CharField(max_length=100)
author = EmbeddedModelField(Author)


class Library(models.Model):
name = models.CharField(max_length=100)
books = models.ManyToManyField("Book", related_name="libraries")
location = models.CharField(max_length=100, null=True, blank=True)
best_seller = models.CharField(max_length=100, null=True, blank=True)
98 changes: 85 additions & 13 deletions tests/model_fields_/test_embedded_model.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
import operator
from datetime import timedelta
from decimal import Decimal

from django.core.exceptions import FieldDoesNotExist, ValidationError
from django.db.models import ExpressionWrapper, F, IntegerField, Max, Model, Sum
from django.db.models import (
Exists,
ExpressionWrapper,
F,
IntegerField,
Max,
Model,
OuterRef,
Subquery,
Sum,
)
from django.test import SimpleTestCase, TestCase

from django_mongodb.fields import EmbeddedModelField
Expand All @@ -16,6 +25,7 @@
DecimalParent,
EmbeddedModel,
EmbeddedModelFieldModel,
Library,
)


Expand Down Expand Up @@ -185,9 +195,9 @@ def test_embedded_with_json_field(self):
models[1:3],
)

@staticmethod
def _truncate_ms(time):
return time - timedelta(microseconds=time.microsecond)
def truncate_ms(self, value):
"""Truncate microsends to millisecond precision as supported by MongoDB."""
return value.replace(microsecond=(value.microsecond // 1000) * 1000)

################
def test_ordering_by_embedded_field(self):
Expand All @@ -200,23 +210,26 @@ def test_ordering_by_embedded_field(self):
self.assertSequenceEqual(query, expected)

def test_ordering_grouping_by_embedded_field(self):
expected = sorted(
(
EmbeddedModelFieldModel.objects.create(simple=EmbeddedModel(someint=x))
for x in range(6)
),
key=lambda x: x.simple.someint,
)
query = (
EmbeddedModelFieldModel.objects.annotate(
group=ExpressionWrapper(F("simple__someint") + 5, output_field=IntegerField())
)
.values("group")
.annotate(max_pk=Max("simple__auto_now"))
.annotate(max_auto_now=Max("simple__auto_now"))
.order_by("simple__someint")
)
query = [{**e, "max_pk": self._truncate_ms(e["max_pk"])} for e in query]
expected = [
EmbeddedModelFieldModel.objects.create(simple=EmbeddedModel(someint=x))
for x in range(6)
]
query_response = [{**e, "max_auto_now": self.truncate_ms(e["max_auto_now"])} for e in query]
self.assertSequenceEqual(
query,
query_response,
[
{"group": e.simple.someint + 5, "max_pk": self._truncate_ms(e.simple.auto_now)}
{"group": e.simple.someint + 5, "max_auto_now": self.truncate_ms(e.simple.auto_now)}
for e in expected
],
)
Expand All @@ -229,3 +242,62 @@ def test_ordering_grouping_by_sum(self):
.order_by("sum")
)
self.assertQuerySetEqual(qs, [0, 2, 4, 6, 8, 10], operator.itemgetter("sum"))


class SubqueryExistsTestCase(TestCase):
def setUp(self):
# Create test data
address1 = Address.objects.create(city="New York", state="NY", zip_code=10001)
address2 = Address.objects.create(city="Boston", state="MA", zip_code=20002)
author1 = Author.objects.create(name="Alice", age=30, address=address1)
author2 = Author.objects.create(name="Bob", age=40, address=address2)
book1 = Book.objects.create(name="Book A", author=author1)
book2 = Book.objects.create(name="Book B", author=author2)
Book.objects.create(name="Book C", author=author2)
Book.objects.create(name="Book D", author=author2)
Book.objects.create(name="Book E", author=author1)

library1 = Library.objects.create(
name="Central Library", location="Downtown", best_seller="Book A"
)
library2 = Library.objects.create(
name="Community Library", location="Suburbs", best_seller="Book A"
)

# Add books to libraries
library1.books.add(book1, book2)
library2.books.add(book2)

def test_exists_subquery(self):
subquery = Book.objects.filter(
author__name=OuterRef("name"), author__address__city="Boston"
)
queryset = Author.objects.filter(Exists(subquery))

self.assertEqual(queryset.count(), 1)

def test_in_subquery(self):
subquery = Author.objects.filter(age__gt=35).values("name")
queryset = Book.objects.filter(author__name__in=Subquery(subquery)).order_by("name")

self.assertEqual(queryset.count(), 3)
self.assertQuerySetEqual(queryset, ["Book B", "Book C", "Book D"], lambda book: book.name)

def test_range_query(self):
queryset = Author.objects.filter(age__range=(25, 45)).order_by("name")

self.assertEqual(queryset.count(), 2)
self.assertQuerySetEqual(queryset, ["Alice", "Bob"], lambda author: author.name)

def test_exists_with_foreign_object(self):
subquery = Library.objects.filter(best_seller=OuterRef("name"))
queryset = Book.objects.filter(Exists(subquery))

self.assertEqual(queryset.count(), 1)
self.assertEqual(queryset.first().name, "Book A")

def test_foreign_field_with_ranges(self):
queryset = Library.objects.filter(books__author__age__range=(25, 35))

self.assertEqual(queryset.count(), 1)
self.assertEqual(queryset.first().name, "Central Library")

0 comments on commit 2d75b39

Please sign in to comment.