From ede16f1070a9e5891ed9eca494ba256e0d971847 Mon Sep 17 00:00:00 2001 From: Javier Buzzi Date: Mon, 4 Mar 2024 14:13:17 -0500 Subject: [PATCH] Random fixes (#60) --- .github/workflows/_tests.yml | 5 ++ django_squash/db/migrations/autodetector.py | 30 ++++++-- setup.cfg | 6 +- tests/conftest.py | 26 +++++++ tests/test_migrations_autodetector.py | 85 +++++++++++++++++++++ 5 files changed, 143 insertions(+), 9 deletions(-) create mode 100644 tests/test_migrations_autodetector.py diff --git a/.github/workflows/_tests.yml b/.github/workflows/_tests.yml index dfc86db..b9a8932 100644 --- a/.github/workflows/_tests.yml +++ b/.github/workflows/_tests.yml @@ -60,6 +60,11 @@ jobs: - uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} + - name: Cache pip + uses: actions/cache@v4 + with: + path: ~/.cache/pip + key: ${{ runner.os }}-py-${{ matrix.python-version }}-dj-${{ matrix.django-version }} - name: Install prerequisites run: | DJANGO="django==${{ matrix.django-version }}" diff --git a/django_squash/db/migrations/autodetector.py b/django_squash/db/migrations/autodetector.py index 6a97827..5adc298 100644 --- a/django_squash/db/migrations/autodetector.py +++ b/django_squash/db/migrations/autodetector.py @@ -11,12 +11,17 @@ from . import operators, utils +RESERVED_MIGRATION_KEYWORDS = ("_deleted", "_dependencies_change", "_replaces_change", "_original_migration") + class Migration(dj_migrations.Migration): - _deleted = False - _dependencies_change = False - _replaces_change = False + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._deleted = False + self._dependencies_change = False + self._replaces_change = False + self._original_migration = None def describe(self): if self._deleted: @@ -38,8 +43,17 @@ def __iter__(self): @classmethod def from_migration(cls, migration): - new = Migration(name=migration.name, app_label=migration.app_label) - new.__dict__ = migration.__dict__.copy() + if cls in type(migration).mro(): + return migration + + for keyword in RESERVED_MIGRATION_KEYWORDS: + if hasattr(migration, keyword): + raise RuntimeError( + 'Cannot use keyword "%s" in Migration %s.%s' % (keyword, migration.app_label, migration.name) + ) + + new = cls(name=migration.name, app_label=migration.app_label) + new.__dict__.update(migration.__dict__) new._original_migration = migration return new @@ -129,7 +143,7 @@ def rename_migrations(self, original, graph, changes, migration_name): migration_name or "squashed", ) - def convert_migration_references_to_objects(self, original, graph, changes): + def convert_migration_references_to_objects(self, original, changes, ignore_apps): """ Swap django.db.migrations.Migration with a custom one that behaves like a tuple when read, but is still an object for the purpose of easy renames. @@ -190,7 +204,7 @@ def create_deleted_models_migrations(self, loader, changes): instance.replaces = migrations changes[app_label] = [instance] - def squash(self, real_loader, squash_loader, ignore_apps=None, migration_name=None): + def squash(self, real_loader, squash_loader, ignore_apps, migration_name=None): changes_ = self.delete_old_squashed(real_loader, ignore_apps) graph = squash_loader.graph @@ -200,7 +214,7 @@ def squash(self, real_loader, squash_loader, ignore_apps=None, migration_name=No changes.pop(app, None) self.create_deleted_models_migrations(real_loader, changes) - self.convert_migration_references_to_objects(real_loader, graph, changes) + self.convert_migration_references_to_objects(real_loader, changes, ignore_apps) self.rename_migrations(real_loader, graph, changes, migration_name) self.replace_current_migrations(real_loader, graph, changes) self.add_non_elidables(real_loader, squash_loader, changes) diff --git a/setup.cfg b/setup.cfg index 1080924..566c575 100644 --- a/setup.cfg +++ b/setup.cfg @@ -27,8 +27,9 @@ repository=https://upload.pypi.org/legacy/ username=kingbuzzman [tool:pytest] -pythonpath=tests DJANGO_SETTINGS_MODULE=settings +pythonpath=tests +addopts="--pdbcls=IPython.terminal.debugger:TerminalPdb" python_files=test_*.py *_tests.py markers= temporary_migration_module @@ -45,3 +46,6 @@ filterwarnings= # Warning: cgi is only being used by Django 3.2 ignore:'cgi' is deprecated and slated for removal in Python 3.13 + + # Django 3.2 throws a warning about the USE_I18N setting being deprecated + ignore:datetime.datetime.utcnow\(\) is deprecated and scheduled for removal in a future version. Use timezone-aware objects .* diff --git a/tests/conftest.py b/tests/conftest.py index 88365cf..ef83a7b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,6 +9,7 @@ import pytest from django.core.management import call_command +from django.db import connections from django.db.models.options import Options from django.test.utils import extend_sys_path from django.utils.module_loading import module_dir @@ -146,3 +147,28 @@ def _call_squash_migrations(*args, **kwargs): call_command("squash_migrations", *args, **kwargs) yield _call_squash_migrations + + +@pytest.fixture +def clean_db(django_db_blocker): + """ + Clean the database after each test. As in a new database. + + Usage: + + @pytest.mark.usefixtures("clean_db") + @pytest.mark.django_db(transaction=True) + def test_something(): + ... + """ + with django_db_blocker.unblock(): + # Because we're using an in memory sqlite database, we can just reset the connection + # When the connection is reestablished, the database will be empty + connections["default"].connection.close() + del connections["default"] + + with connections["default"].cursor() as c: + # make sure the database is of any tables + assert connections["default"].introspection.get_table_list(c) == [] + + yield diff --git a/tests/test_migrations_autodetector.py b/tests/test_migrations_autodetector.py new file mode 100644 index 0000000..79b6017 --- /dev/null +++ b/tests/test_migrations_autodetector.py @@ -0,0 +1,85 @@ +import pytest +from django.db.migrations import Migration as OriginalMigration + +from django_squash.db.migrations import autodetector, operators, utils + + +def test_migration(): + original = OriginalMigration("0001_inital", "app") + new = autodetector.Migration.from_migration(original) + assert new.name == "0001_inital" + assert new.app_label == "app" + assert new._original_migration == original + + assert new[0] == "app" + assert new[1] == "0001_inital" + assert list(new) == ["app", "0001_inital"] + + assert not list(new.describe()) + assert not new.is_migration_level + new._deleted = True + new._dependencies_change = True + new._replaces_change = True + assert new.is_migration_level + assert list(new.describe()) == ["Deleted", '"dependencies" changed', '"replaces" keyword removed'] + + +def test_migration_using_keywords(): + """ + Test that the migration can be created using our internal keywords + """ + + class FakeMigration: + app_label = "app" + name = "0001_inital" + + autodetector.Migration.from_migration(FakeMigration()) + + for keyword in autodetector.RESERVED_MIGRATION_KEYWORDS: + migration = OriginalMigration("0001_inital", "app") + fake_migration = FakeMigration() + new_migration = autodetector.Migration("0001_inital", "app") + + setattr(migration, keyword, True) + setattr(fake_migration, keyword, True) + setattr(new_migration, keyword, True) + + with pytest.raises(RuntimeError): + autodetector.Migration.from_migration(migration) + + with pytest.raises(RuntimeError): + autodetector.Migration.from_migration(fake_migration) + + autodetector.Migration.from_migration(new_migration) + + +def noop(*a, **k): + pass + + +def test_all_custom_operations(): + """ + Test that all_custom_operations returns the correct operations + """ + var = utils.UniqueVariableName() + + class Migration(autodetector.Migration): + operations = [ + operators.RunSQL("SELECT 1", elidable=True), + operators.RunPython(noop, elidable=True), + operators.RunSQL("SELECT 2", elidable=True), + ] + + assert list(autodetector.all_custom_operations(Migration.operations, var)) == [] + + class Migration(autodetector.Migration): + operations = [ + operators.RunSQL("SELECT 1", elidable=False), + operators.RunPython(noop, elidable=False), + operators.RunSQL("SELECT 2", elidable=True), + ] + + assert [type(x).__name__ for x in autodetector.all_custom_operations(Migration.operations, var)] == [ + "RunSQL", + "RunPython", + ]