From 828a4b118e50205a9ce0de374c93f227f8e374ce Mon Sep 17 00:00:00 2001 From: Subhash Bhushan Date: Sat, 6 Jul 2024 09:14:24 -0700 Subject: [PATCH] Stream Enhancements - Part 3 Changes: - Add `_event_position` temp variable in aggregates to track last event position in event store - Track expected version per event with `_event_position` - Fetch last event position on aggregate load and update `_event_position` - Unify method to append event to event store in aggregates and event sourced aggregates - Unify method to construct message from event for aggregates and event sourced aggregates - Track database state within the memory provider instead of a global object --- .vscode/launch.json | 2 +- src/protean/adapters/repository/memory.py | 27 +-- src/protean/container.py | 7 + src/protean/core/aggregate.py | 9 +- src/protean/core/entity.py | 18 +- src/protean/core/event.py | 3 + src/protean/core/event_sourced_aggregate.py | 5 + src/protean/core/event_sourced_repository.py | 2 + src/protean/core/repository.py | 14 +- src/protean/core/unit_of_work.py | 15 +- src/protean/port/event_store.py | 21 +- src/protean/reflection.py | 2 - src/protean/utils/mixins.py | 32 +-- .../postgresql/test_json_datatype.py | 14 +- .../repository/elasticsearch_repo/test_dao.py | 6 + .../elasticsearch_repo/test_repo.py | 1 + .../sqlite/test_transactions.py | 1 + .../events/test_aggregate_event_version.py | 208 ++++++++++++++++++ .../test_appending_aggregate_events.py | 8 +- tests/event_store/test_reading_all_streams.py | 10 +- .../test_reading_events_of_type.py | 14 +- .../test_reading_last_event_of_type.py | 26 +-- tests/event_store/test_reading_messages.py | 10 +- tests/message/test_message_to_object.py | 2 +- tests/message/test_object_to_message.py | 2 +- .../test_origin_stream_name_in_metadata.py | 12 +- tests/server/test_any_event_handler.py | 2 +- tests/server/test_error_handling.py | 4 +- tests/server/test_event_handling.py | 2 +- tests/server/test_handling_all_events.py | 6 +- ...st_message_filtering_with_origin_stream.py | 6 +- .../subscription/test_no_message_filtering.py | 6 +- .../test_read_position_updates.py | 6 +- tests/unit_of_work/test_uow_transactions.py | 8 +- 34 files changed, 360 insertions(+), 151 deletions(-) create mode 100644 tests/aggregate/events/test_aggregate_event_version.py diff --git a/.vscode/launch.json b/.vscode/launch.json index 57c0dd41..b64d53d0 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -66,7 +66,7 @@ "module": "pytest", "justMyCode": false, "args": [ - "tests/adapters/model/sqlalchemy_model/postgresql/test_array_datatype.py::test_array_data_type_association", + "tests/adapters/model/sqlalchemy_model/postgresql/test_json_datatype.py::test_persistence_of_json_with_array_data", "--postgresql" ] }, diff --git a/src/protean/adapters/repository/memory.py b/src/protean/adapters/repository/memory.py index 03054485..4c7d774c 100644 --- a/src/protean/adapters/repository/memory.py +++ b/src/protean/adapters/repository/memory.py @@ -21,12 +21,6 @@ from protean.reflection import attributes, fields, id_field from protean.utils.query import Q -# Global in-memory store of dict data. Keyed by name, to provide -# multiple named local memory caches. -_databases = defaultdict(dict) -_locks = defaultdict(Lock) -_counters = defaultdict(count) - def derive_schema_name(model_cls): if hasattr(model_cls.meta_, "schema_name"): @@ -63,9 +57,9 @@ def __init__(self, provider, new_connection=False): self._db = current_uow._sessions[self._provider.name]._db else: self._db = { - "data": copy.deepcopy(_databases), - "lock": _locks.setdefault(self._provider.name, Lock()), - "counters": _counters, + "data": copy.deepcopy(self._provider._databases), + "lock": self._provider._locks.setdefault(self._provider.name, Lock()), + "counters": self._provider._counters, } def add(self, element): @@ -84,8 +78,7 @@ def commit(self): if current_uow and self._provider.name in current_uow._sessions: current_uow._sessions[self._provider.name]._db["data"] = self._db["data"] else: - global _databases - _databases = self._db["data"] + self._provider._databases = self._db["data"] def rollback(self): pass @@ -104,6 +97,11 @@ def __init__(self, name, domain, conn_info: dict): conn_info["database"] = "memory" super().__init__(name, domain, conn_info) + # Global in-memory store of dict data. + self._databases = defaultdict(dict) + self._locks = defaultdict(Lock) + self._counters = defaultdict(count) + # A temporary cache of already constructed model classes self._model_classes = {} @@ -122,10 +120,9 @@ def get_connection(self, session_cls=None): def _data_reset(self): """Reset data""" - global _databases, _locks, _counters - _databases = defaultdict(dict) - _locks = defaultdict(Lock) - _counters = defaultdict(count) + self._databases = defaultdict(dict) + self._locks = defaultdict(Lock) + self._counters = defaultdict(count) # Discard any active Unit of Work if current_uow and current_uow.in_progress: diff --git a/src/protean/container.py b/src/protean/container.py index 1447bc63..093d1f14 100644 --- a/src/protean/container.py +++ b/src/protean/container.py @@ -342,6 +342,9 @@ def __setattr__(self, name, value): "_root", # Root entity in the hierarchy "_owner", # Owner entity in the hierarchy "_disable_invariant_checks", # Flag to disable invariant checks + "_next_version", # Temp placeholder to track next version of the entity + "_event_position", # Temp placeholder to track event version of the entity + "_expected_version", # Temp placeholder to track expected version of an event ] or name.startswith(("add_", "remove_", "get_one_from_", "filter_")) ): @@ -408,6 +411,7 @@ def raise_(self, event, fact_event=False) -> None: event_with_metadata = event.__class__( event.to_dict(), + _expected_version=self._event_position, _metadata={ "id": (f"{stream_name}-{identifier}-{self._version}"), "type": f"{self.__class__.__name__}.{event.__class__.__name__}.{event._metadata.version}", @@ -426,6 +430,9 @@ def raise_(self, event, fact_event=False) -> None: }, ) + # Increment the event position after generating event + self._event_position = self._event_position + 1 + self._events.append(event_with_metadata) diff --git a/src/protean/core/aggregate.py b/src/protean/core/aggregate.py index 1a521f6f..3a0c2225 100644 --- a/src/protean/core/aggregate.py +++ b/src/protean/core/aggregate.py @@ -49,7 +49,12 @@ def __new__(cls, *args, **kwargs): _version = Integer(default=-1) # Temporary variable to track next version of Aggregate - _next_version = Integer(default=0) + _next_version = 0 + + # Temporary variable to track version of events of Aggregate + # This can be different from the version of the Aggregate itself because + # a single aggregate update could have triggered multiple events. + _event_position = -1 def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -95,7 +100,7 @@ def element_to_fact_event(element_cls): attrs = { key: value for key, value in fields(element_cls).items() - if not isinstance(value, Reference) and key not in ["_next_version"] + if not isinstance(value, Reference) } # Recursively convert HasOne and HasMany associations to Value Objects diff --git a/src/protean/core/entity.py b/src/protean/core/entity.py index f58719b1..18b08e9d 100644 --- a/src/protean/core/entity.py +++ b/src/protean/core/entity.py @@ -8,7 +8,12 @@ from functools import partial from protean.container import BaseContainer, IdentityMixin, OptionsMixin -from protean.exceptions import IncorrectUsageError, NotSupportedError, ValidationError +from protean.exceptions import ( + ConfigurationError, + IncorrectUsageError, + NotSupportedError, + ValidationError, +) from protean.fields import Auto, HasMany, Reference, ValueObject from protean.fields.association import Association from protean.reflection import ( @@ -427,6 +432,13 @@ def raise_(self, event) -> None: The event is always registered on the aggregate, irrespective of where it is raised in the entity cluster.""" + # Verify that event is indeed associated with this aggregate + if event.meta_.part_of != self._root.__class__: + raise ConfigurationError( + f"Event `{event.__class__.__name__}` is not associated with" + f" aggregate `{self._root.__class__.__name__}`" + ) + # Events are sometimes raised from within the aggregate, well-before persistence. # In that case, the aggregate's next version has to be considered in events, # because we want to associate the event with the version that will be persisted. @@ -452,6 +464,7 @@ def raise_(self, event) -> None: event_with_metadata = event.__class__( event.to_dict(), + _expected_version=self._root._event_position, _metadata={ "id": ( f"{stream_name}-{identifier}-{aggregate_version}.{event_number}" @@ -472,6 +485,9 @@ def raise_(self, event) -> None: }, ) + # Increment the event position after generating event + self._root._event_position = self._root._event_position + 1 + self._root._events.append(event_with_metadata) def __eq__(self, other): diff --git a/src/protean/core/event.py b/src/protean/core/event.py index c6c4afff..526970f5 100644 --- a/src/protean/core/event.py +++ b/src/protean/core/event.py @@ -117,6 +117,9 @@ def __track_id_field(subclass): def __init__(self, *args, **kwargs): super().__init__(*args, finalize=False, **kwargs) + # Store the expected version temporarily for use during persistence + self._expected_version = kwargs.pop("_expected_version", -1) + version = ( self.__class__.__version__ if hasattr(self.__class__, "__version__") diff --git a/src/protean/core/event_sourced_aggregate.py b/src/protean/core/event_sourced_aggregate.py index 0ba0e7ea..c240ae60 100644 --- a/src/protean/core/event_sourced_aggregate.py +++ b/src/protean/core/event_sourced_aggregate.py @@ -33,6 +33,11 @@ class BaseEventSourcedAggregate( # Track current version of Aggregate _version = Integer(default=-1) + # Temporary variable to track version of events of Aggregate + # This can be different from the version of the Aggregate itself because + # a single aggregate update could have triggered multiple events. + _event_position = -1 + def __new__(cls, *args, **kwargs): if cls is BaseEventSourcedAggregate: raise NotSupportedError("BaseEventSourcedAggregate cannot be instantiated") diff --git a/src/protean/core/event_sourced_repository.py b/src/protean/core/event_sourced_repository.py index f799f00e..6aa03bd9 100644 --- a/src/protean/core/event_sourced_repository.py +++ b/src/protean/core/event_sourced_repository.py @@ -96,6 +96,8 @@ def get(self, identifier: Identifier) -> BaseEventSourcedAggregate: } ) + aggregate._event_position = aggregate._version + return aggregate diff --git a/src/protean/core/repository.py b/src/protean/core/repository.py index ecd0cee5..ac2ea580 100644 --- a/src/protean/core/repository.py +++ b/src/protean/core/repository.py @@ -9,7 +9,7 @@ from protean.core.unit_of_work import UnitOfWork from protean.exceptions import IncorrectUsageError, NotSupportedError from protean.fields import HasMany, HasOne -from protean.globals import current_uow +from protean.globals import current_domain, current_uow from protean.port.dao import BaseDAO from protean.port.provider import BaseProvider from protean.reflection import association_fields, has_association_fields @@ -139,7 +139,6 @@ def add(self, aggregate: BaseAggregate) -> BaseAggregate: # noqa: C901 # Remove state attribute from the payload, as it is not needed for the Fact Event payload.pop("state_", None) - payload.pop("_next_version", None) # Construct and raise the Fact Event fact_event = aggregate._fact_event_cls(**payload) @@ -246,7 +245,16 @@ def get(self, identifier) -> BaseAggregate: `find_residents_of_area(zipcode)`, etc. It is also possible to make use of more complicated, domain-friendly design patterns like the `Specification` pattern. """ - return self._dao.get(identifier) + aggregate = self._dao.get(identifier) + + # Fetch and sync events version + last_message = current_domain.event_store.store.read_last_message( + f"{aggregate.meta_.stream_name}-{identifier}" + ) + if last_message: + aggregate._event_position = last_message.position + + return aggregate def repository_factory(element_cls, **opts): diff --git a/src/protean/core/unit_of_work.py b/src/protean/core/unit_of_work.py index f7cb9c19..50151fc0 100644 --- a/src/protean/core/unit_of_work.py +++ b/src/protean/core/unit_of_work.py @@ -8,7 +8,7 @@ ) from protean.globals import _uow_context_stack, current_domain from protean.reflection import id_field -from protean.utils import DomainObjects, EventProcessing, fqn +from protean.utils import EventProcessing, fqn logger = logging.getLogger(__name__) @@ -78,16 +78,9 @@ def commit(self): # noqa: C901 events = [] for _, item in self._identity_map.items(): if item._events: - if item.element_type == DomainObjects.EVENT_SOURCED_AGGREGATE: - for event in item._events: - current_domain.event_store.store.append_aggregate_event( - item, event - ) - events.append((item, event)) - else: - for event in item._events: - current_domain.event_store.store.append(event) - events.append((item, event)) + for event in item._events: + current_domain.event_store.store.append(event) + events.append((item, event)) item._events = [] # Iteratively consume all events produced in this session diff --git a/src/protean/port/event_store.py b/src/protean/port/event_store.py index 5a2fc49c..57c870ce 100644 --- a/src/protean/port/event_store.py +++ b/src/protean/port/event_store.py @@ -85,12 +85,13 @@ def read( def read_last_message(self, stream_name) -> Message: # FIXME Rename to read_last_stream_message raw_message = self._read_last_message(stream_name) - return Message.from_dict(raw_message) + if raw_message: + return Message.from_dict(raw_message) - def append_aggregate_event( - self, aggregate: BaseEventSourcedAggregate, event: BaseEvent - ) -> int: - message = Message.to_aggregate_event_message(aggregate, event) + return None + + def append(self, object: Union[BaseEvent, BaseCommand]) -> int: + message = Message.to_message(object) position = self._write( message.stream_name, @@ -102,16 +103,6 @@ def append_aggregate_event( return position - def append(self, object: Union[BaseEvent, BaseCommand]) -> int: - message = Message.to_message(object) - - return self._write( - message.stream_name, - message.type, - message.data, - metadata=message.metadata.to_dict(), - ) - def load_aggregate( self, part_of: Type[BaseEventSourcedAggregate], identifier: Identifier ) -> Optional[BaseEventSourcedAggregate]: diff --git a/src/protean/reflection.py b/src/protean/reflection.py index 3975b59c..3c8038de 100644 --- a/src/protean/reflection.py +++ b/src/protean/reflection.py @@ -34,7 +34,6 @@ def data_fields(class_or_instance): fields_dict = dict(getattr(class_or_instance, _FIELDS)) # Remove internal fields - fields_dict.pop("_next_version", None) fields_dict.pop("_metadata", None) except AttributeError: raise IncorrectUsageError( @@ -118,7 +117,6 @@ def declared_fields(class_or_instance): # Remove internal fields fields_dict.pop("_version", None) - fields_dict.pop("_next_version", None) fields_dict.pop("_metadata", None) except AttributeError: raise IncorrectUsageError( diff --git a/src/protean/utils/mixins.py b/src/protean/utils/mixins.py index 314a53d5..32b0b823 100644 --- a/src/protean/utils/mixins.py +++ b/src/protean/utils/mixins.py @@ -10,7 +10,6 @@ from protean.container import BaseContainer, OptionsMixin from protean.core.command import BaseCommand from protean.core.event import BaseEvent, Metadata -from protean.core.event_sourced_aggregate import BaseEventSourcedAggregate from protean.core.unit_of_work import UnitOfWork from protean.exceptions import ConfigurationError from protean.globals import current_domain @@ -81,25 +80,6 @@ def from_dict(cls, message: Dict) -> Message: id=message["id"], ) - @classmethod - def to_aggregate_event_message( - cls, aggregate: BaseEventSourcedAggregate, event: BaseEvent - ) -> Message: - # If this is a Fact Event, don't set an expected version. - # Otherwise, expect the previous version - if event.__class__.__name__.endswith("FactEvent"): - expected_version = None - else: - expected_version = int(event._metadata.sequence_id) - 1 - - return cls( - stream_name=event._metadata.stream_name, - type=fully_qualified_name(event.__class__), - data=event.to_dict(), - metadata=event._metadata, - expected_version=expected_version, - ) - def to_object(self) -> Union[BaseEvent, BaseCommand]: if self.metadata.kind == MessageType.EVENT.value: element_record = current_domain.registry.events[self.type] @@ -123,13 +103,21 @@ def to_message(cls, message_object: Union[BaseEvent, BaseCommand]) -> Message: "Either specify an explicit stream name or associate the event with an aggregate." ) - stream_name = message_object._metadata.stream_name + # Set the expected version of the stream + # Applies only to events + expected_version = None + if message_object._metadata.kind == MessageType.EVENT.value: + # If this is a Fact Event, don't set an expected version. + # Otherwise, expect the previous version + if not message_object.__class__.__name__.endswith("FactEvent"): + expected_version = message_object._expected_version return cls( - stream_name=stream_name, + stream_name=message_object._metadata.stream_name, type=fully_qualified_name(message_object.__class__), data=message_object.to_dict(), metadata=message_object._metadata, + expected_version=expected_version, ) diff --git a/tests/adapters/model/sqlalchemy_model/postgresql/test_json_datatype.py b/tests/adapters/model/sqlalchemy_model/postgresql/test_json_datatype.py index efbc3fd7..405ebac9 100644 --- a/tests/adapters/model/sqlalchemy_model/postgresql/test_json_datatype.py +++ b/tests/adapters/model/sqlalchemy_model/postgresql/test_json_datatype.py @@ -13,18 +13,20 @@ class Event(BaseAggregate): payload = Dict() -@pytest.mark.postgresql -def test_json_data_type_association(test_domain): +@pytest.fixture(autouse=True) +def register_elements(test_domain): test_domain.register(Event) + test_domain.init(traverse=False) + +@pytest.mark.postgresql +def test_json_data_type_association(test_domain): model_cls = test_domain.repository_for(Event)._model type(model_cls.payload.property.columns[0].type) is sa_types.JSON @pytest.mark.postgresql def test_basic_dict_data_type_operations(test_domain): - test_domain.register(Event) - model_cls = test_domain.repository_for(Event)._model event = Event( @@ -39,8 +41,6 @@ def test_basic_dict_data_type_operations(test_domain): @pytest.mark.postgresql def test_json_with_array_data(test_domain): - test_domain.register(Event) - model_cls = test_domain.repository_for(Event)._model event = Event( @@ -62,8 +62,6 @@ def test_json_with_array_data(test_domain): @pytest.mark.postgresql def test_persistence_of_json_with_array_data(test_domain): - test_domain.register(Event) - event = Event( name="UserCreated", payload=[ diff --git a/tests/adapters/repository/elasticsearch_repo/test_dao.py b/tests/adapters/repository/elasticsearch_repo/test_dao.py index c52d085e..39735730 100644 --- a/tests/adapters/repository/elasticsearch_repo/test_dao.py +++ b/tests/adapters/repository/elasticsearch_repo/test_dao.py @@ -17,6 +17,7 @@ class TestDAO: @pytest.fixture(autouse=True) def register_elements(self, test_domain): test_domain.register(Person) + test_domain.init(traverse=False) def test_successful_initialization_of_dao(self, test_domain): test_domain.repository_for(Person)._dao.query.all() @@ -59,6 +60,7 @@ class TestDAODeleteFunctionality: @pytest.fixture(autouse=True) def register_elements(self, test_domain): test_domain.register(Person) + test_domain.init(traverse=False) def test_delete_an_object_in_repository_by_id(self, test_domain): """Delete an object in the repository by ID""" @@ -214,6 +216,7 @@ class TestDAORetrievalFunctionality: @pytest.fixture(autouse=True) def register_elements(self, test_domain): test_domain.register(Person) + test_domain.init(traverse=False) @pytest.fixture def identifier(self): @@ -670,6 +673,7 @@ class TestDAOSaveFunctionality: @pytest.fixture(autouse=True) def register_elements(self, test_domain): test_domain.register(Person) + test_domain.init(traverse=False) def test_creation_throws_error_on_missing_fields(self, test_domain): """Add an entity to the repository missing a required attribute""" @@ -712,6 +716,7 @@ class TestDAOUpdateFunctionality: @pytest.fixture(autouse=True) def register_elements(self, test_domain): test_domain.register(Person) + test_domain.init(traverse=False) def test_update_an_existing_entity_in_the_repository(self, test_domain): identifier = uuid4() @@ -902,6 +907,7 @@ class TestDAOValidations: def register_elements(self, test_domain): test_domain.register(Person) test_domain.register(User) + test_domain.init(traverse=False) @pytest.mark.xfail def test_unique(self, test_domain): diff --git a/tests/adapters/repository/elasticsearch_repo/test_repo.py b/tests/adapters/repository/elasticsearch_repo/test_repo.py index d3cfac8c..ba756e65 100644 --- a/tests/adapters/repository/elasticsearch_repo/test_repo.py +++ b/tests/adapters/repository/elasticsearch_repo/test_repo.py @@ -12,6 +12,7 @@ class TestElasticsearchRepository: @pytest.fixture(autouse=True) def register_elements(self, test_domain): test_domain.register(Person) + test_domain.init(traverse=False) @pytest.fixture def identifier(self): diff --git a/tests/adapters/repository/sqlalchemy_repo/sqlite/test_transactions.py b/tests/adapters/repository/sqlalchemy_repo/sqlite/test_transactions.py index da7a392b..a0850c95 100644 --- a/tests/adapters/repository/sqlalchemy_repo/sqlite/test_transactions.py +++ b/tests/adapters/repository/sqlalchemy_repo/sqlite/test_transactions.py @@ -22,6 +22,7 @@ def clear_uow(self): def register_elements(self, test_domain): test_domain.register(Person) test_domain.register(PersonRepository, part_of=Person) + test_domain.init(traverse=False) def random_name(self): return "".join(random.choices(string.ascii_uppercase + string.digits, k=15)) diff --git a/tests/aggregate/events/test_aggregate_event_version.py b/tests/aggregate/events/test_aggregate_event_version.py new file mode 100644 index 00000000..947e20de --- /dev/null +++ b/tests/aggregate/events/test_aggregate_event_version.py @@ -0,0 +1,208 @@ +from enum import Enum + +import pytest + +from protean import BaseAggregate, BaseEntity, BaseEvent +from protean.fields import HasOne, Identifier, String + + +class UserStatus(Enum): + INACTIVE = "INACTIVE" + ACTIVE = "ACTIVE" + ARCHIVED = "ARCHIVED" + + +class Account(BaseEntity): + password_hash = String(max_length=512) + + def change_password(self, password): + self.password_hash = password + self.raise_(PasswordChanged(account_id=self.id, user_id=self.user_id)) + + +class PasswordChanged(BaseEvent): + account_id = Identifier(required=True) + user_id = Identifier(required=True) + + +class User(BaseAggregate): + name = String(max_length=50, required=True) + email = String(required=True) + status = String(choices=UserStatus, default=UserStatus.INACTIVE.value) + + account = HasOne(Account) + + @classmethod + def register(cls, name, email): + user = cls(name=name, email=email) + user.raise_(UserRegistered(user_id=user.id, name=name, email=email)) + + return user + + def activate(self): + self.status = UserStatus.ACTIVE.value + self.raise_(UserActivated(user_id=self.id)) + + def change_email(self, email): + # This method generates no events + self.email = email + + def change_name(self, name): + self.name = name + self.raise_(UserRenamed(user_id=self.id, name=name)) + + +class UserRegistered(BaseEvent): + user_id = Identifier(required=True) + name = String(max_length=50, required=True) + email = String(required=True) + + +class UserActivated(BaseEvent): + user_id = Identifier(required=True) + + +class UserRenamed(BaseEvent): + user_id = Identifier(required=True) + name = String(required=True, max_length=50) + + +@pytest.fixture(autouse=True) +def register_elements(test_domain): + test_domain.register(User) + test_domain.register(Account, part_of=User) + test_domain.register(UserRegistered, part_of=User) + test_domain.register(UserActivated, part_of=User) + test_domain.register(UserRenamed, part_of=User) + test_domain.register(PasswordChanged, part_of=User) + test_domain.init(traverse=False) + + +@pytest.fixture +def user(): + return User.register(name="John Doe", email="john.doe@gmail.com") + + +def test_aggregate_tracks_event_version(user): + assert user._version == -1 + + # The aggregate's event position would have been incremented + assert user._event_position == 0 + + # Check for expected version inside the event + assert user._events[0]._expected_version == -1 + + +def test_aggregate_tracks_event_version_after_first_update(user, test_domain): + assert user._events[0]._expected_version == -1 + + test_domain.repository_for(User).add(user) + + refreshed_user = test_domain.repository_for(User).get(user.id) + + assert refreshed_user._version == 0 + assert refreshed_user._event_position == 0 + + +def test_aggregate_tracks_event_version_after_multiple_updates(user, test_domain): + test_domain.repository_for(User).add(user) + + refreshed_user = test_domain.repository_for(User).get(user.id) + refreshed_user.activate() + + assert refreshed_user._events[0]._expected_version == 0 + + test_domain.repository_for(User).add(refreshed_user) + + refreshed_user = test_domain.repository_for(User).get(user.id) + assert refreshed_user._version == 1 + assert refreshed_user._event_position == 1 + + +def test_aggregate_manages_event_version_with_an_update_and_no_events(test_domain): + # We initialize user directly here to avoid raising events + user = User(name="John Doe", email="john.doe@gmail.com") + + assert len(user._events) == 0 + + test_domain.repository_for(User).add(user) + + refreshed_user = test_domain.repository_for(User).get(user.id) + assert refreshed_user._version == 0 + assert refreshed_user._event_position == -1 + + +def test_aggregate_manages_event_version_with_multiple_updates_and_no_events( + test_domain, +): + user = User(name="John Doe", email="john.doe@gmail.com") + test_domain.repository_for(User).add(user) + + refreshed_user = test_domain.repository_for(User).get(user.id) + refreshed_user.change_email("jane.doe@gmail.com") + + assert len(user._events) == 0 + + test_domain.repository_for(User).add(refreshed_user) + + refreshed_user = test_domain.repository_for(User).get(user.id) + assert refreshed_user._version == 1 + assert refreshed_user._event_position == -1 + + +def test_aggregate_tracks_event_version_after_an_update_with_multiple_events( + user, test_domain +): + test_domain.repository_for(User).add(user) + + refreshed_user = test_domain.repository_for(User).get(user.id) + refreshed_user.change_name("Jane Doe") + refreshed_user.activate() + + assert refreshed_user._events[0]._expected_version == 0 + assert refreshed_user._events[1]._expected_version == 1 + + test_domain.repository_for(User).add(refreshed_user) + + refreshed_user = test_domain.repository_for(User).get(user.id) + assert refreshed_user._version == 1 + assert refreshed_user._event_position == 2 + + +@pytest.mark.xfail +def test_aggregate_tracks_event_version_after_multiple_updates_with_multiple_events( + user, test_domain +): + test_domain.repository_for(User).add(user) + + refreshed_user = test_domain.repository_for(User).get(user.id) + refreshed_user.change_name("Jane Doe") + refreshed_user.activate() + + test_domain.repository_for(User).add(refreshed_user) + + refreshed_user = test_domain.repository_for(User).get(user.id) + + assert refreshed_user._version == 1 + assert refreshed_user._event_position == 2 + + refreshed_user.account = Account(password_hash="hashed_password") + + test_domain.repository_for(User).add(refreshed_user) + + refreshed_user = test_domain.repository_for(User).get(user.id) + + assert refreshed_user._version == 2 + assert refreshed_user._event_position == 2 + + refreshed_user = test_domain.repository_for(User).get(user.id) + refreshed_user.account.change_password("new_password") + + test_domain.repository_for(User).add(refreshed_user) + + refreshed_user = test_domain.repository_for(User).get(user.id) + + # FIXME This is a bug. Version and event position should be 3 + # The problem is that the aggregate root is not aware of changes within its child entities + assert refreshed_user._version == 3 + assert refreshed_user._event_position == 3 diff --git a/tests/event_store/test_appending_aggregate_events.py b/tests/event_store/test_appending_aggregate_events.py index c3218208..0f5053ea 100644 --- a/tests/event_store/test_appending_aggregate_events.py +++ b/tests/event_store/test_appending_aggregate_events.py @@ -70,7 +70,7 @@ def register_elements(test_domain): def test_appending_messages_to_aggregate(test_domain): identifier = str(uuid4()) user = User.register(id=identifier, email="john.doe@example.com", name="John Doe") - test_domain.event_store.store.append_aggregate_event(user, user._events[0]) + test_domain.event_store.store.append(user._events[0]) messages = test_domain.event_store.store._read("user") @@ -81,19 +81,19 @@ def test_appending_messages_to_aggregate(test_domain): def test_version_increment_on_new_event(test_domain): identifier = str(uuid4()) user = User.register(id=identifier, email="john.doe@example.com", name="John Doe") - test_domain.event_store.store.append_aggregate_event(user, user._events[0]) + test_domain.event_store.store.append(user._events[0]) events = test_domain.event_store.store._read(f"user-{identifier}") assert events[0]["position"] == 0 user.activate() - test_domain.event_store.store.append_aggregate_event(user, user._events[1]) + test_domain.event_store.store.append(user._events[1]) events = test_domain.event_store.store._read(f"user-{identifier}") assert events[-1]["position"] == 1 user.rename(name="John Doe 2") - test_domain.event_store.store.append_aggregate_event(user, user._events[2]) + test_domain.event_store.store.append(user._events[2]) events = test_domain.event_store.store._read(f"user-{identifier}") assert events[-1]["position"] == 2 diff --git a/tests/event_store/test_reading_all_streams.py b/tests/event_store/test_reading_all_streams.py index 11a14c7c..58ec623a 100644 --- a/tests/event_store/test_reading_all_streams.py +++ b/tests/event_store/test_reading_all_streams.py @@ -89,20 +89,20 @@ def test_reading_messages_from_all_streams(test_domain): user = User.register( id=user_identifier, email="john.doe@example.com", name="John Doe" ) - test_domain.event_store.store.append_aggregate_event(user, user._events[0]) + test_domain.event_store.store.append(user._events[0]) user.activate() - test_domain.event_store.store.append_aggregate_event(user, user._events[1]) + test_domain.event_store.store.append(user._events[1]) user.rename(name="Johnny Doe") - test_domain.event_store.store.append_aggregate_event(user, user._events[2]) + test_domain.event_store.store.append(user._events[2]) post_identifier = str(uuid4()) post = Post.create(id=post_identifier, topic="Foo", content="Bar") - test_domain.event_store.store.append_aggregate_event(post, post._events[0]) + test_domain.event_store.store.append(post._events[0]) post.publish() - test_domain.event_store.store.append_aggregate_event(post, post._events[1]) + test_domain.event_store.store.append(post._events[1]) messages = test_domain.event_store.store.read("$all") assert len(messages) == 5 diff --git a/tests/event_store/test_reading_events_of_type.py b/tests/event_store/test_reading_events_of_type.py index 1b14ac1a..5185debc 100644 --- a/tests/event_store/test_reading_events_of_type.py +++ b/tests/event_store/test_reading_events_of_type.py @@ -57,7 +57,7 @@ def registered_user(test_domain): identifier = str(uuid4()) user = User.register(id=identifier, email="john.doe@example.com", name="John Doe") - test_domain.event_store.store.append_aggregate_event(user, user._events[0]) + test_domain.event_store.store.append(user._events[0]) return user @@ -72,9 +72,7 @@ def test_reading_events_of_type_with_just_one_message(test_domain, registered_us @pytest.mark.eventstore def test_reading_events_of_type_with_other_events_present(test_domain, registered_user): registered_user.activate() - test_domain.event_store.store.append_aggregate_event( - registered_user, registered_user._events[1] - ) + test_domain.event_store.store.append(registered_user._events[1]) assert isinstance(test_domain.event_store.events_of_type(Registered)[0], Registered) assert isinstance(test_domain.event_store.events_of_type(Activated)[0], Activated) @@ -84,15 +82,11 @@ class TestEventStoreEventsOfType: @pytest.fixture(autouse=True) def activate_and_rename(self, registered_user, test_domain): registered_user.activate() - test_domain.event_store.store.append_aggregate_event( - registered_user, registered_user._events[1] - ) + test_domain.event_store.store.append(registered_user._events[1]) for i in range(10): registered_user.rename(name=f"John Doe {i}") - test_domain.event_store.store.append_aggregate_event( - registered_user, registered_user._events[-1] - ) + test_domain.event_store.store.append(registered_user._events[-1]) yield diff --git a/tests/event_store/test_reading_last_event_of_type.py b/tests/event_store/test_reading_last_event_of_type.py index 7deb9a5d..40de226a 100644 --- a/tests/event_store/test_reading_last_event_of_type.py +++ b/tests/event_store/test_reading_last_event_of_type.py @@ -57,7 +57,7 @@ def registered_user(test_domain): identifier = str(uuid4()) user = User.register(id=identifier, email="john.doe@example.com", name="John Doe") - test_domain.event_store.store.append_aggregate_event(user, user._events[0]) + test_domain.event_store.store.append(user._events[0]) return user @@ -73,9 +73,7 @@ def test_reading_the_last_event_of_type_with_other_events_present( test_domain, registered_user ): registered_user.activate() - test_domain.event_store.store.append_aggregate_event( - registered_user, registered_user._events[1] - ) + test_domain.event_store.store.append(registered_user._events[1]) assert isinstance( test_domain.event_store.last_event_of_type(Registered), Registered @@ -87,15 +85,11 @@ class TestEventStoreEventsOfType: @pytest.fixture(autouse=True) def activate_and_rename(self, registered_user, test_domain): registered_user.activate() - test_domain.event_store.store.append_aggregate_event( - registered_user, registered_user._events[1] - ) + test_domain.event_store.store.append(registered_user._events[1]) for i in range(10): registered_user.rename(name=f"John Doe {i}") - test_domain.event_store.store.append_aggregate_event( - registered_user, registered_user._events[-1] - ) + test_domain.event_store.store.append(registered_user._events[-1]) yield @@ -105,9 +99,7 @@ def test_reading_the_last_event_of_type_with_multiple_events( ): for i in range(10): registered_user.rename(name=f"John Doe {i}") - test_domain.event_store.store.append_aggregate_event( - registered_user, registered_user._events[-1] - ) + test_domain.event_store.store.append(registered_user._events[-1]) event = test_domain.event_store.last_event_of_type(Renamed) assert event.name == "John Doe 9" @@ -118,9 +110,7 @@ def test_reading_the_last_event_of_type_with_multiple_events_in_stream( ): for i in range(10): registered_user.rename(name=f"John Doe {i}") - test_domain.event_store.store.append_aggregate_event( - registered_user, registered_user._events[-1] - ) + test_domain.event_store.store.append(registered_user._events[-1]) event = test_domain.event_store.last_event_of_type(Renamed, "user") assert event.name == "John Doe 9" @@ -131,9 +121,7 @@ def test_reading_the_last_event_of_type_with_multiple_events_in_different_stream ): for i in range(10): registered_user.rename(name=f"John Doe {i}") - test_domain.event_store.store.append_aggregate_event( - registered_user, registered_user._events[-1] - ) + test_domain.event_store.store.append(registered_user._events[-1]) event = test_domain.event_store.last_event_of_type(Renamed, "group") assert event is None diff --git a/tests/event_store/test_reading_messages.py b/tests/event_store/test_reading_messages.py index 23c458f3..38d58281 100644 --- a/tests/event_store/test_reading_messages.py +++ b/tests/event_store/test_reading_messages.py @@ -44,7 +44,7 @@ def registered_user(test_domain): identifier = str(uuid4()) user = User(id=identifier, email="john.doe@example.com") user.raise_(Registered(id=identifier, email="john.doe@example.com")) - test_domain.event_store.store.append_aggregate_event(user, user._events[-1]) + test_domain.event_store.store.append(user._events[-1]) return user @@ -52,9 +52,7 @@ def registered_user(test_domain): @pytest.fixture def activated_user(test_domain, registered_user): registered_user.raise_(Activated(id=registered_user.id)) - test_domain.event_store.store.append_aggregate_event( - registered_user, registered_user._events[-1] - ) + test_domain.event_store.store.append(registered_user._events[-1]) return registered_user @@ -63,9 +61,7 @@ def activated_user(test_domain, registered_user): def renamed_user(test_domain, activated_user): for i in range(10): activated_user.raise_(Renamed(id=activated_user.id, name=f"John Doe {i}")) - test_domain.event_store.store.append_aggregate_event( - activated_user, activated_user._events[-1] - ) + test_domain.event_store.store.append(activated_user._events[-1]) return activated_user diff --git a/tests/message/test_message_to_object.py b/tests/message/test_message_to_object.py index 54aeff11..72f4e281 100644 --- a/tests/message/test_message_to_object.py +++ b/tests/message/test_message_to_object.py @@ -49,7 +49,7 @@ def test_construct_event_from_message(): identifier = str(uuid4()) user = User(id=identifier, email="john.doe@gmail.com", name="John Doe") user.raise_(Registered(id=identifier, email="john.doe@gmail.com", name="John Doe")) - message = Message.to_aggregate_event_message(user, user._events[-1]) + message = Message.to_message(user._events[-1]) reconstructed_event = message.to_object() assert isinstance(reconstructed_event, Registered) diff --git a/tests/message/test_object_to_message.py b/tests/message/test_object_to_message.py index 6f4f9892..31ca12d8 100644 --- a/tests/message/test_object_to_message.py +++ b/tests/message/test_object_to_message.py @@ -53,7 +53,7 @@ def test_construct_message_from_event(test_domain): user.raise_(Registered(id=identifier, email="john.doe@gmail.com", name="John Doe")) # This simulates the call by UnitOfWork - message = Message.to_aggregate_event_message(user, user._events[-1]) + message = Message.to_message(user._events[-1]) assert message is not None assert type(message) is Message diff --git a/tests/message/test_origin_stream_name_in_metadata.py b/tests/message/test_origin_stream_name_in_metadata.py index 3d82c264..a20f882c 100644 --- a/tests/message/test_origin_stream_name_in_metadata.py +++ b/tests/message/test_origin_stream_name_in_metadata.py @@ -71,13 +71,15 @@ def test_origin_stream_name_in_event_from_command_without_origin_stream_name( ): g.message_in_context = register_command_message - event_message = Message.to_message( + user = User(id=user_id, email="john.doe@gmail.com", name="John Doe") + user.raise_( Registered( user_id=user_id, email="john.doe@gmail.com", name="John Doe", ) ) + event_message = Message.to_message(user._events[-1]) assert event_message.metadata.origin_stream_name is None @@ -91,13 +93,15 @@ def test_origin_stream_name_in_event_from_command_with_origin_stream_name( ) # Metadata is a VO and immutable, so creating a copy with updated value g.message_in_context = command_message - event_message = Message.to_message( + user = User(id=user_id, email="john.doe@gmail.com", name="John Doe") + user.raise_( Registered( user_id=user_id, email="john.doe@gmail.com", name="John Doe", ) ) + event_message = Message.to_message(user._events[-1]) assert event_message.metadata.origin_stream_name == "foo" @@ -118,7 +122,7 @@ def test_origin_stream_name_in_aggregate_event_from_command_without_origin_strea name="John Doe", ) ) - event_message = Message.to_aggregate_event_message(user, user._events[-1]) + event_message = Message.to_message(user._events[-1]) assert event_message.metadata.origin_stream_name is None @@ -145,7 +149,7 @@ def test_origin_stream_name_in_aggregate_event_from_command_with_origin_stream_n name="John Doe", ) ) - event_message = Message.to_aggregate_event_message(user, user._events[-1]) + event_message = Message.to_message(user._events[-1]) assert event_message.metadata.origin_stream_name == "foo" diff --git a/tests/server/test_any_event_handler.py b/tests/server/test_any_event_handler.py index df4951f7..36a48b96 100644 --- a/tests/server/test_any_event_handler.py +++ b/tests/server/test_any_event_handler.py @@ -55,7 +55,7 @@ async def test_that_an_event_handler_can_be_associated_with_an_all_stream(test_d password_hash="hash", ) ) - message = Message.to_aggregate_event_message(user, user._events[-1]) + message = Message.to_message(user._events[-1]) engine = Engine(domain=test_domain, test_mode=True) await engine.handle_message(UserEventHandler, message) diff --git a/tests/server/test_error_handling.py b/tests/server/test_error_handling.py index a94f2d6e..8f2fca26 100644 --- a/tests/server/test_error_handling.py +++ b/tests/server/test_error_handling.py @@ -67,7 +67,7 @@ async def test_that_exception_is_raised(test_domain): password_hash="hash", ) ) - message = Message.to_aggregate_event_message(user, user._events[-1]) + message = Message.to_message(user._events[-1]) engine = Engine(domain=test_domain, test_mode=True) @@ -96,7 +96,7 @@ def test_exceptions_stop_processing(test_domain): password_hash="hash", ) ) - test_domain.event_store.store.append_aggregate_event(user, user._events[0]) + test_domain.event_store.store.append(user._events[0]) engine = Engine(domain=test_domain) engine.run() diff --git a/tests/server/test_event_handling.py b/tests/server/test_event_handling.py index aacc83e1..53a1c3ec 100644 --- a/tests/server/test_event_handling.py +++ b/tests/server/test_event_handling.py @@ -57,7 +57,7 @@ async def test_handler_invocation(test_domain): password_hash="hash", ) ) - message = Message.to_aggregate_event_message(user, user._events[-1]) + message = Message.to_message(user._events[-1]) engine = Engine(domain=test_domain, test_mode=True) await engine.handle_message(UserEventHandler, message) diff --git a/tests/server/test_handling_all_events.py b/tests/server/test_handling_all_events.py index f9510414..a66fbaba 100644 --- a/tests/server/test_handling_all_events.py +++ b/tests/server/test_handling_all_events.py @@ -69,7 +69,7 @@ async def test_that_any_message_can_be_handled_with_any_handler(test_domain): password_hash="hash", ) ) - message1 = Message.to_aggregate_event_message(user, user._events[-1]) + message1 = Message.to_message(user._events[-1]) post_identifier = str(uuid4()) post = Post( @@ -79,8 +79,8 @@ async def test_that_any_message_can_be_handled_with_any_handler(test_domain): ) post.raise_(Created(id=post_identifier, topic="Foo", content="Bar")) - test_domain.event_store.store.append_aggregate_event(post, post._events[-1]) - message2 = Message.to_aggregate_event_message(post, post._events[-1]) + test_domain.event_store.store.append(post._events[-1]) + message2 = Message.to_message(post._events[-1]) engine = Engine(domain=test_domain, test_mode=True) await engine.handle_message(SystemMetrics, message1) diff --git a/tests/subscription/test_message_filtering_with_origin_stream.py b/tests/subscription/test_message_filtering_with_origin_stream.py index 14cfd481..be3595c7 100644 --- a/tests/subscription/test_message_filtering_with_origin_stream.py +++ b/tests/subscription/test_message_filtering_with_origin_stream.py @@ -94,9 +94,9 @@ async def test_message_filtering_for_event_handlers_with_defined_origin_stream( email.raise_(Sent(email="john.doe@gmail.com", sent_at=datetime.now(UTC))) # Construct 3 dummy messages and modify Sent message to have originated from the user stream messages = [ - Message.to_aggregate_event_message(user, user._events[0]), - Message.to_aggregate_event_message(user, user._events[1]), - Message.to_aggregate_event_message(email, email._events[0]), + Message.to_message(user._events[0]), + Message.to_message(user._events[1]), + Message.to_message(email._events[0]), ] messages[2].metadata = Metadata( diff --git a/tests/subscription/test_no_message_filtering.py b/tests/subscription/test_no_message_filtering.py index 1436232c..fffa869d 100644 --- a/tests/subscription/test_no_message_filtering.py +++ b/tests/subscription/test_no_message_filtering.py @@ -93,9 +93,9 @@ async def test_no_filtering_for_event_handlers_without_defined_origin_stream( email.raise_(Sent(email="john.doe@gmail.com", sent_at=datetime.now(UTC))) # Construct 3 dummy messages and modify Sent message to have originated from the user stream messages = [ - Message.to_aggregate_event_message(user, user._events[0]), - Message.to_aggregate_event_message(user, user._events[1]), - Message.to_aggregate_event_message(email, email._events[0]), + Message.to_message(user._events[0]), + Message.to_message(user._events[1]), + Message.to_message(email._events[0]), ] messages[2].metadata = Metadata( diff --git a/tests/subscription/test_read_position_updates.py b/tests/subscription/test_read_position_updates.py index 7d5d7ca7..9179e20e 100644 --- a/tests/subscription/test_read_position_updates.py +++ b/tests/subscription/test_read_position_updates.py @@ -103,7 +103,7 @@ async def test_write_position_after_interval(test_domain): last_written_position = await email_event_handler_subscription.fetch_last_position() assert last_written_position == -1 # Default value - test_domain.event_store.store.append_aggregate_event(email, email._events[0]) + test_domain.event_store.store.append(email._events[0]) await email_event_handler_subscription.tick() @@ -115,7 +115,7 @@ async def test_write_position_after_interval(test_domain): # Populate 15 messages (5 more than default interval) for _ in range(15): email.raise_(event) - test_domain.event_store.store.append_aggregate_event(email, email._events[-1]) + test_domain.event_store.store.append(email._events[-1]) await email_event_handler_subscription.tick() last_written_position = await email_event_handler_subscription.fetch_last_position() @@ -150,7 +150,7 @@ async def test_that_positions_are_not_written_when_already_in_sync(test_domain): # Populate 15 messages (5 more than default interval) for _ in range(15): email.raise_(event) - test_domain.event_store.store.append_aggregate_event(email, email._events[-1]) + test_domain.event_store.store.append(email._events[-1]) # Consume messages (By default, 10 messages per tick) await email_event_handler_subscription.tick() diff --git a/tests/unit_of_work/test_uow_transactions.py b/tests/unit_of_work/test_uow_transactions.py index 6e4edc96..d87c0bec 100644 --- a/tests/unit_of_work/test_uow_transactions.py +++ b/tests/unit_of_work/test_uow_transactions.py @@ -85,14 +85,14 @@ def test_changed_objects_are_committed_as_part_of_one_transaction( repo_with_uow.add(person_to_be_added) # Update an existing Person record - person_to_be_updated.last_name = "FooBar" - repo_with_uow.add(person_to_be_updated) + persisted_person = repo.get(person_to_be_updated.id) + persisted_person.last_name = "FooBar" + repo_with_uow.add(persisted_person) # Test that the underlying database is untouched assert len(person_dao.outside_uow().query.all().items) == 1 assert ( - person_dao.outside_uow().get(person_to_be_updated.id).last_name - != "FooBar" + person_dao.outside_uow().get(persisted_person.id).last_name != "FooBar" ) assert len(person_dao.query.all().items) == 2