From 8dae00d618b11879af7d8cc3077224254e52142f Mon Sep 17 00:00:00 2001 From: Subhash Bhushan Date: Mon, 1 Jul 2024 15:44:32 -0700 Subject: [PATCH] Bugfixes to reflect correct versions in Events Also: - Introduce `data_fields` to reflect `_version` in `to_dict()` output - Avoid associating aggregates to events via the `apply()` method. All events need to be registered with `part_of` explicitly. - Track current and next version better with `_next_version` field in aggregates. - Support registering custom Event Sourced repositories, in preparation for accepting custom SQL to run on MessageDB store. - Aggregate core concepts documentation --- .../building-blocks/aggregates.md | 74 ++++++++ docs/core-concepts/building-blocks/events.md | 41 +++++ docs/guides/domain-definition/events.md | 174 +++++++++++++----- .../guides/domain-definition/events/002.py | 75 ++++++++ .../guides/domain-definition/events/003.py | 52 ++++++ mkdocs.yml | 4 +- src/protean/container.py | 16 +- src/protean/core/aggregate.py | 12 +- src/protean/core/entity.py | 28 ++- src/protean/core/event.py | 11 ++ src/protean/core/event_sourced_aggregate.py | 11 -- src/protean/core/event_sourced_repository.py | 6 +- src/protean/core/repository.py | 1 + src/protean/core/unit_of_work.py | 5 + src/protean/domain/__init__.py | 4 + src/protean/port/dao.py | 8 +- src/protean/reflection.py | 21 +++ src/protean/utils/mixins.py | 6 + .../events/test_raising_fact_events.py | 34 +++- tests/aggregate/test_aggregate_new_version.py | 47 +++++ tests/aggregate/test_aggregate_properties.py | 1 + .../test_aggregates_with_entities.py | 1 + tests/aggregate/test_as_dict.py | 6 + tests/event/tests.py | 15 ++ .../events/test_fact_event_generation.py | 17 +- .../test_event_association_with_aggregate.py | 26 ++- .../test_generated_event_version.py | 149 +++++++++++++++ tests/event_sourced_repository/test_add.py | 37 +++- ...est_retrieving_event_sourced_repository.py | 31 +++- tests/event_sourced_repository/tests.py | 11 ++ tests/event_store/test_reading_all_streams.py | 12 ++ tests/field/test_auto.py | 3 +- tests/reflection/test_data_fields.py | 13 ++ tests/reflection/test_fields.py | 10 +- ...st_message_filtering_with_origin_stream.py | 15 +- .../subscription/test_no_message_filtering.py | 14 +- tests/value_object/test_to_dict.py | 14 +- 37 files changed, 905 insertions(+), 100 deletions(-) create mode 100644 docs/core-concepts/building-blocks/events.md create mode 100644 docs_src/guides/domain-definition/events/002.py create mode 100644 docs_src/guides/domain-definition/events/003.py create mode 100644 tests/aggregate/test_aggregate_new_version.py create mode 100644 tests/event_sourced_aggregates/test_generated_event_version.py create mode 100644 tests/event_sourced_repository/tests.py create mode 100644 tests/reflection/test_data_fields.py diff --git a/docs/core-concepts/building-blocks/aggregates.md b/docs/core-concepts/building-blocks/aggregates.md index a853c7da..fbf6ffbe 100644 --- a/docs/core-concepts/building-blocks/aggregates.md +++ b/docs/core-concepts/building-blocks/aggregates.md @@ -1,2 +1,76 @@ # Aggregates +An aggregate is a cluster of domain objects that can be treated as a single +unit for data changes. + +Each aggregate has a root entity, known as the aggregate root, which is +responsible for enforcing business rules and ensuring the consistency of +changes within the aggregate. In Protean, **aggregate** and **aggregate root** +are synonymous. + +Aggregates help to maintain the integrity of the data by defining boundaries +within which invariants must be maintained. + +## Facts + +### Aggregates are black boxes. { data-toc-label="Black Boxes" } +The external world communicates with aggregates solely through their published +API. Aggregates, in turn, communicate with the external world through domain +events. + +### Aggregates are versioned. { data-toc-label="Versioning" } +The version is a simple incrementing number. Every aggregate instance's version +starts at 0. + +### Aggregates have concurrency control. { data-toc-label="Concurrency Control" } +Aggregates are persisted with optimistic concurrency. If the expected version +of the aggregate does not match the version in the database, the transaction +is aborted. + +### Aggregates enclose business invariants. { data-toc-label="Invariants" } + +Aggregates contain invariants that should be satisfied at all times - they +are checked before and after every change to the aggregate. Invariants can be +specified at the level of an aggregate's fields, the entire aggregate cluster, +individual entities, or domain services that operate on multiple aggregates. + +## Object Graphs + +Aggregates compose a graph of enclosed elements. The objects themselves can nest +other objects and so on infinitely, though it is recommended to not go beyond +2 levels. + +### Aggregates can hold two types of objects - Entites and Value Objects. { data-toc-label="Types of Objects" } +Entities are objects with an identity. Value objects don't have identity; their +data defines their identity. + +### Entities are accessible only via aggregates. { data-toc-label="Entity Access" } +Entities within aggregates are loaded and accessible only through the aggregate. +All changes to entities should be driven through the aggregates. + +## Persistence + +Data persistence and retrieval are always at the level of an aggregate. +They internally load and manage the objects within their cluster. + +### Aggregates persist data with the help of Repositories. { data-toc-label="Repositories" } + +Aggregates are persisted and retrieved with the help of repositories. +Repositories are collection-oriented - they mimic how a collection data type, +like list, dictionary and set, would work. Repositories can be augmented with +custom methods to perform business queries. + +### Aggregates are transaction boundaries. { data-toc-label="Transactions" } + +All changes to aggregates are performed within a transaction. This means that +all objects in the aggregates cluster are enclosed in a single transaction +during persistence. This also translates to mean that all objects within an +aggregate cluster are kep together in the same persistence store. + +### Aggregates can enclose up to 500 entities. { data-toc-label="Limits" } + +The object graph under an aggregate is loaded eagerly. The number of associations +under an aggregate are limited to 500. If you expect the number of entities to +exceed this limit, rethink your aggregate boundary. One way would be to split +the aggregate into multiple aggregates. Another would be to make the underlying +entity an aggregate by itself. diff --git a/docs/core-concepts/building-blocks/events.md b/docs/core-concepts/building-blocks/events.md new file mode 100644 index 00000000..df8243b5 --- /dev/null +++ b/docs/core-concepts/building-blocks/events.md @@ -0,0 +1,41 @@ +# Events + +### Events allows different components to communicate with each other. + +Within a domain or across, events can be used as a mechanism to implement +eventual consistency, in the same bounded context or across. This promotes +loose coupling by decoupling the producer (e.g., an aggregate that raises +an event) from the consumers (e.g., various components that handle the +event). + +Such a design eliminates the need for two-phase commits (global +transactions) across bounded contexts, optimizing performance at the level +of each transaction. + +### Events act as API contracts. + +Events define a clear and consistent structure for data that is shared +between different components of the system. This promotes system-wide +interoperability and integration between components. + +### Events help preserve context boundaries. + +Events propagate information across bounded contexts, thus helping to +sync changes throughout the application domain. This allows each domain +to be modeled in the architecture pattern that is most appropriate for its +use case. + +- Events should be named in past tense, because we observe domain events _after +the fact_. `StockDepleted` is a better choice than the imperative +`DepleteStock` as an event name. +- An event is associated with an aggregate or a stream, specified with +`part_of` or `stream` parameters to the decorator, as above. We will +dive deeper into these parameters in the Processing Events section. + +- Events are essentially Data Transfer Objects (DTO)- they can only hold +simple fields and Value Objects. +- Events should only contain information directly relevant to the event. A +receiver that needs more information should be listening to other pertinent +events and add read-only structures to its own state to take decisions later. +A receiver should not query the current state from the sender because the +sender's state could have already mutated. \ No newline at end of file diff --git a/docs/guides/domain-definition/events.md b/docs/guides/domain-definition/events.md index 712bfbb3..12994ad3 100644 --- a/docs/guides/domain-definition/events.md +++ b/docs/guides/domain-definition/events.md @@ -10,33 +10,6 @@ occurrence or change in the domain. Events are raised by aggregates to signal that something noteworthy has happened, allowing other parts of the system to react - and sync - to these changes in a decoupled manner. -Events have a few primary functions: - -1. **Events allows different components to communicate with each other.** - - Within a domain or across, events can be used as a mechanism to implement - eventual consistency, in the same bounded context or across. This promotes - loose coupling by decoupling the producer (e.g., an aggregate that raises - an event) from the consumers (e.g., various components that handle the - event). - - Such a design eliminates the need for two-phase commits (global - transactions) across bounded contexts, optimizing performance at the level - of each transaction. - -2. **Events act as API contracts.** - - Events define a clear and consistent structure for data that is shared - between different components of the system. This promotes system-wide - interoperability and integration between components. - -3. **Events help preserve context boundaries.** - - Events propagate information across bounded contexts, thus helping to - sync changes throughout the application domain. This allows each domain - to be modeled in the architecture pattern that is most appropriate for its - use case. - ## Defining Events Event names should be descriptive and convey the specific change or occurrence @@ -54,22 +27,135 @@ Events are always connected to an Aggregate class, specified with the `part_of` param in the decorator. An exception to this rule is when the Event class has been marked _Abstract_. -## Key Facts - -- Events should be named in past tense, because we observe domain events _after -the fact_. `StockDepleted` is a better choice than the imperative -`DepleteStock` as an event name. -- An event is associated with an aggregate or a stream, specified with -`part_of` or `stream` parameters to the decorator, as above. We will -dive deeper into these parameters in the Processing Events section. - -- Events are essentially Data Transfer Objects (DTO)- they can only hold -simple fields and Value Objects. -- Events should only contain information directly relevant to the event. A -receiver that needs more information should be listening to other pertinent -events and add read-only structures to its own state to take decisions later. -A receiver should not query the current state from the sender because the -sender's state could have already mutated. + +## Event Structure + +An event is made of three parts: + +### Headers + +#### `trace_id` + +The `trace_id` is a unique identifier of UUID format, that connects all +processing originating from a request. Trace IDs provide a detailed view of +the request's journey through the system. It helps in understanding the +complete flow of a request, showing each service interaction, the time taken, +and where any delays occur. + +### Metadata + +An event's metadata provides additional context about the event. + +#### `id` + +The unique identifier of the event. The event ID is a structured string, of the +format **....**. + +#### `timestamp` + +The timestamp of event generation. + +#### `version` + +The version of the event. + +#### `sequence_id` + +The sequence ID is the version of the aggregate when the event was generated, +along with the sequence number of the event within the update. + +For example, if the aggregate was updated twice, the first update would have a +sequence ID of `1.1`, and the second update would have a sequence ID of `2.1`. +If the next update generated two events, then the sequence ID of the second +event would be `3.2`. + +#### `payload_hash` + +The hash of the event's payload. + +## Payload + +The payload is a dictionary of key-value pairs that convey the information +about the event. + +The payload is made available as the data in the event. If +you want to extract just the payload, you can use the `payload` property +of the event. + +```shell hl_lines="17 19-20" +In [1]: user = User(id="1", email="", name="") + +In [2]: user.login() + +In [3]: event = user._events[0] + +In [4]: event +Out[4]: + +In [5]: event.to_dict() +Out[5]: +{'_metadata': {'id': '002.User.v1.1.0.1', + 'timestamp': '2024-06-30 19:20:53.587542+00:00', + 'version': 'v1', + 'sequence_id': '0.1', + 'payload_hash': 5473995227001335107}, + 'user_id': '1'} + +In [6]: event.payload +Out[6]: {'user_id': '1'} +``` + +## Versioning + +Because events serve as API contracts of an aggregate with the rest of the +ecosystem, they are versioned to signal changes to contract. + +By default, events have a version of **v1**. + +You can specify a version with the `__version__` class attribute: + +```python hl_lines="3" +@domain.event(part_of=User) +class UserActivated: + __version__ = "v2" + + user_id = Identifier(required=True) + activated_at = DateTime(required=True) +``` + +The configured version is reflected in `version` and `id` attributes of the +generated event: + +```python hl_lines="34 50 52 66 68" +{! docs_src/guides/domain-definition/events/002.py !} +``` + +## Fact Events + +A fact event encloses the entire state of the aggregate at that specific point +in time. It contains all of the attributes and values necessary to completely +describe the fact in the context of your business. You can think of a fact +event similarly to how you may think of a row in a database: a complete set of +data pertaining to the row at that point in time. + +Fact events enable a pattern known as **Event-carried State Transfer**, which is +one of the best ways to asynchronously distribute immutable state to all +consumers who need it. With fact events, consumers do not have to build up the +state themselves from multiple delta event types, which can be risky and +error-prone, especially as data schemas evolve and change over time. Instead, +they rely on the owning service to compute and produce a fully detailed fact +event. + +Fact events are generated automatically by the framework with the +`fact_events=True` option in the `domain.aggregate` decorator. + +Fact events are automatically generated by Protean. The event name is of the +format `FactEvent`, and the stream name will be +`--`. + +```python hl_lines="11 38-52" +{! docs_src/guides/domain-definition/events/003.py!} +``` ## Immutability @@ -89,4 +175,4 @@ IncorrectUsageError: { 'Event Objects are immutable and cannot be modified once created' ] } -``` \ No newline at end of file +``` diff --git a/docs_src/guides/domain-definition/events/002.py b/docs_src/guides/domain-definition/events/002.py new file mode 100644 index 00000000..e207393c --- /dev/null +++ b/docs_src/guides/domain-definition/events/002.py @@ -0,0 +1,75 @@ +import json +from datetime import datetime, timezone + +from protean import BaseEvent, Domain +from protean.fields import DateTime, Identifier, String + +domain = Domain(__name__) + + +@domain.aggregate +class User: + id = Identifier(identifier=True) + email = String() + name = String() + status = String(choices=["INACTIVE", "ACTIVE", "ARCHIVED"], default="INACTIVE") + + def login(self): + self.raise_(UserLoggedIn(user_id=self.id)) + + def activate(self): + self.status = "ACTIVE" + self.raise_(UserActivated(user_id=self.id)) + + +@domain.event(part_of="User") +class UserLoggedIn(BaseEvent): + user_id = Identifier(identifier=True) + + +@domain.event(part_of="User") +class UserActivated: + __version__ = "v2" + + user_id = Identifier(required=True) + activated_at = DateTime(required=True, default=lambda: datetime.now(timezone.utc)) + + +domain.init(traverse=False) +with domain.domain_context(): + user = User(id="1", email="", name="") + + user.login() + print(json.dumps(user._events[0].to_dict(), indent=4)) + + """ Output: + { + "_metadata": { + "id": "__main__.User.v1.1.0.1", + "timestamp": "2024-06-30 16:29:31.312727+00:00", + "version": "v1", + "sequence_id": "0.1", + "payload_hash": -7433283101704735063 + }, + "user_id": "1" + } + """ + + user.activate() + print(json.dumps(user._events[1].to_dict(), indent=4)) + + """ Output: + { + "_metadata": { + "id": "__main__.User.v2.1.0.2", + "timestamp": "2024-06-30 16:32:59.703965+00:00", + "version": "v2", + "sequence_id": "0.2", + "payload_hash": 7340170219237812824 + }, + "user_id": "1", + "activated_at": "2024-06-30 16:32:59.704063+00:00" + } + """ + + print(json.dumps(user._events[1].payload, indent=4)) diff --git a/docs_src/guides/domain-definition/events/003.py b/docs_src/guides/domain-definition/events/003.py new file mode 100644 index 00000000..e27eccef --- /dev/null +++ b/docs_src/guides/domain-definition/events/003.py @@ -0,0 +1,52 @@ +import json + +from protean import Domain +from protean.fields import HasOne, String +from protean.utils.mixins import Message + +domain = Domain(__file__, load_toml=False) + + +@domain.aggregate(fact_events=True) +class User: + name = String(max_length=50, required=True) + email = String(required=True) + status = String(choices=["ACTIVE", "ARCHIVED"]) + + account = HasOne("Account") + + +@domain.entity(part_of=User) +class Account: + password_hash = String(max_length=512) + + +domain.init(traverse=False) +with domain.domain_context(): + user = User(name="John Doe", email="john.doe@example.com") + + # Persist the user + domain.repository_for(User).add(user) + + event_message = domain.event_store.store.read(f"user-fact-{user.id}")[0] + event = Message.to_object(event_message) + + print(json.dumps(event.to_dict(), indent=4)) + + """ Output: + { + "_metadata": { + "id": "__main__.User.v1.e6bb751f-1304-4609-b1ff-b0ffad8e01ad.0.1", + "timestamp": "2024-06-30 19:41:15.997664+00:00", + "version": "v1", + "sequence_id": "0.1", + "payload_hash": 2404640527973230107 + }, + "_version": 0, + "name": "John Doe", + "email": "john.doe@example.com", + "status": null, + "account": null, + "id": "e6bb751f-1304-4609-b1ff-b0ffad8e01ad" + } + """ diff --git a/mkdocs.yml b/mkdocs.yml index 8ccb6c7f..2dc6d999 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -74,13 +74,13 @@ nav: # - core-concepts/domain-driven-design/tactical-design.md - Building Blocks: - core-concepts/building-blocks/index.md - # - core-concepts/building-blocks/aggregates.md + - core-concepts/building-blocks/aggregates.md # - core-concepts/building-blocks/entities.md # - core-concepts/building-blocks/value-objects.md # - core-concepts/building-blocks/domain-services.md # - core-concepts/building-blocks/commands.md # - core-concepts/building-blocks/command-handlers.md - # - core-concepts/building-blocks/events.md + - core-concepts/building-blocks/events.md # - core-concepts/building-blocks/event-handlers.md # - core-concepts/building-blocks/repositories.md # - core-concepts/building-blocks/models.md diff --git a/src/protean/container.py b/src/protean/container.py index 6db6853e..6125f1ff 100644 --- a/src/protean/container.py +++ b/src/protean/container.py @@ -17,7 +17,14 @@ from protean.reflection import id_field from protean.utils import generate_identity -from .reflection import _FIELDS, _ID_FIELD_NAME, attributes, declared_fields, fields +from .reflection import ( + _FIELDS, + _ID_FIELD_NAME, + attributes, + data_fields, + declared_fields, + fields, +) logger = logging.getLogger(__name__) @@ -342,7 +349,7 @@ def to_dict(self): """Return data as a dictionary""" return { field_name: field_obj.as_dict(getattr(self, field_name, None)) - for field_name, field_obj in fields(self).items() + for field_name, field_obj in data_fields(self).items() } @classmethod @@ -368,7 +375,7 @@ def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self._events = [] - def raise_(self, event) -> None: + def raise_(self, event, fact_event=False) -> None: """Raise an event in the aggregate cluster. The version of the aggregate is incremented with every event raised, which is true @@ -377,7 +384,8 @@ def raise_(self, event) -> None: Event is immutable, so we clone a new event object from the event raised, and add the enhanced metadata to it. """ - self._version += 1 + if not fact_event: + self._version += 1 identifier = getattr(self, id_field(self).field_name) diff --git a/src/protean/core/aggregate.py b/src/protean/core/aggregate.py index 7425860c..1a521f6f 100644 --- a/src/protean/core/aggregate.py +++ b/src/protean/core/aggregate.py @@ -48,6 +48,9 @@ def __new__(cls, *args, **kwargs): # Track current version of Aggregate _version = Integer(default=-1) + # Temporary variable to track next version of Aggregate + _next_version = Integer(default=0) + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -55,6 +58,9 @@ def __init__(self, *args, **kwargs): # This is where we kick-off the process of setting the owner and root self._set_root_and_owner(self, self) + # Increment version and set next version + self._next_version = self._version + 1 + @classmethod def _default_options(cls): return [ @@ -83,11 +89,13 @@ def element_to_fact_event(element_cls): The target class of associations is constructed as the Value Object. """ # Gather all fields defined in the element, except References. - # We ignore references. + # We ignore references in event payloads. We also ignore + # the `_next_version` field because it is a temporary in-flight + # field used to track the next version of the aggregate. attrs = { key: value for key, value in fields(element_cls).items() - if not isinstance(value, Reference) + if not isinstance(value, Reference) and key not in ["_next_version"] } # Recursively convert HasOne and HasMany associations to Value Objects diff --git a/src/protean/core/entity.py b/src/protean/core/entity.py index 7f4a86af..e8134876 100644 --- a/src/protean/core/entity.py +++ b/src/protean/core/entity.py @@ -12,7 +12,14 @@ from protean.fields import Auto, HasMany, Reference, ValueObject from protean.fields.association import Association from protean.globals import current_domain -from protean.reflection import _FIELDS, attributes, declared_fields, fields, id_field +from protean.reflection import ( + _FIELDS, + attributes, + data_fields, + declared_fields, + fields, + id_field, +) from protean.utils import ( DomainObjects, derive_element_class, @@ -421,9 +428,16 @@ def raise_(self, event) -> None: The event is always registered on the aggregate, irrespective of where it is raised in the entity cluster.""" - - # We consider the version of the aggregate *after* persistence - new_version = self._root._version + 1 + # Events are sometimes raised from within the aggregate, well-before persistence. + # In that case, the aggregate's next version has to be considered in events, + # because we want to associate the event with the version that will be persisted. + # + # Other times, an event is generated after persistence, like in the case of + # fact events. In this case, the aggregate's current version and next version + # will be the same. + # + # So we simply take the latest version, among `_version` and `_next_version`. + aggregate_version = max(self._root._version, self._root._next_version) # This is just a counter to uniquely gather all events generated # in the same edit session @@ -436,11 +450,11 @@ def raise_(self, event) -> None: _metadata={ "id": ( f"{current_domain.name}.{self.__class__.__name__}.{event._metadata.version}" - f".{identifier}.{new_version}.{event_number}" + f".{identifier}.{aggregate_version}.{event_number}" ), "timestamp": event._metadata.timestamp, "version": event._metadata.version, - "sequence_id": f"{new_version}.{event_number}", + "sequence_id": f"{aggregate_version}.{event_number}", "payload_hash": hash( json.dumps( event.payload, @@ -512,7 +526,7 @@ def to_dict(self): # FIXME Memoize this function field_values = {} - for field_name, field_obj in declared_fields(self).items(): + for field_name, field_obj in data_fields(self).items(): if ( not isinstance(field_obj, (ValueObject, Reference)) and getattr(self, field_name, None) is not None diff --git a/src/protean/core/event.py b/src/protean/core/event.py index 87facd5a..3c015f4b 100644 --- a/src/protean/core/event.py +++ b/src/protean/core/event.py @@ -143,6 +143,17 @@ def __hash__(self) -> int: """Hash based on data.""" return hash(json.dumps(self.payload, sort_keys=True)) + def to_dict(self): + """Return data as a dictionary. + + We need to override this method in Event, because `to_dict()` of `BaseContainer` + eliminates `_metadata`. + """ + return { + field_name: field_obj.as_dict(getattr(self, field_name, None)) + for field_name, field_obj in fields(self).items() + } + def domain_event_factory(element_cls, **kwargs): element_cls = derive_element_class(element_cls, BaseEvent, **kwargs) diff --git a/src/protean/core/event_sourced_aggregate.py b/src/protean/core/event_sourced_aggregate.py index 7dca00a6..0ba0e7ea 100644 --- a/src/protean/core/event_sourced_aggregate.py +++ b/src/protean/core/event_sourced_aggregate.py @@ -175,15 +175,4 @@ def event_sourced_aggregate_factory(element_cls, **opts): method._event_cls ) - # Associate Event with the aggregate class - # - # This can potentially cause a problem because an Event can only be associated - # with one aggregate class, but multiple event handlers can consume it. - # By resetting the event's aggregate class, its previous association is lost. - # We catch this problem during domain validation. - # - # The domain validation should check for the same event class being present - # in `_events_cls_map` of multiple aggregate classes. - method._event_cls.meta_.part_of = element_cls - return element_cls diff --git a/src/protean/core/event_sourced_repository.py b/src/protean/core/event_sourced_repository.py index e64aff45..f799f00e 100644 --- a/src/protean/core/event_sourced_repository.py +++ b/src/protean/core/event_sourced_repository.py @@ -54,8 +54,8 @@ def add(self, aggregate: BaseEventSourcedAggregate) -> None: payload = aggregate.to_dict() # Construct and raise the Fact Event - fact_event = aggregate._fact_event_cls(**payload) - aggregate.raise_(fact_event) + fact_event_obj = aggregate._fact_event_cls(**payload) + aggregate.raise_(fact_event_obj, fact_event=True) uow._add_to_identity_map(aggregate) @@ -115,7 +115,7 @@ def event_sourced_repository_factory(element_cls, **opts): raise IncorrectUsageError( { "_entity": [ - f"Repository `{element_cls.__name__}` can only be associated with an Aggregate" + f"Repository `{element_cls.__name__}` can only be associated with an Event Sourced Aggregate" ] } ) diff --git a/src/protean/core/repository.py b/src/protean/core/repository.py index 53efef87..ecd0cee5 100644 --- a/src/protean/core/repository.py +++ b/src/protean/core/repository.py @@ -139,6 +139,7 @@ def add(self, aggregate: BaseAggregate) -> BaseAggregate: # noqa: C901 # Remove state attribute from the payload, as it is not needed for the Fact Event payload.pop("state_", None) + payload.pop("_next_version", None) # Construct and raise the Fact Event fact_event = aggregate._fact_event_cls(**payload) diff --git a/src/protean/core/unit_of_work.py b/src/protean/core/unit_of_work.py index bf6e156a..f7cb9c19 100644 --- a/src/protean/core/unit_of_work.py +++ b/src/protean/core/unit_of_work.py @@ -1,6 +1,7 @@ import logging from protean.exceptions import ( + ConfigurationError, ExpectedVersionError, InvalidOperationError, ValidationError, @@ -114,6 +115,10 @@ def commit(self): # noqa: C901 else: msg = str(exc) raise ExpectedVersionError(msg) from None + except ConfigurationError as exc: + # Configuration errors can be raised if events are misconfigured + # We just re-raise it for the client to handle. + raise exc except Exception as exc: logger.error( f"Error during Commit: {str(exc)}. Rolling back Transaction..." diff --git a/src/protean/domain/__init__.py b/src/protean/domain/__init__.py index 5423014e..356b775a 100644 --- a/src/protean/domain/__init__.py +++ b/src/protean/domain/__init__.py @@ -391,6 +391,9 @@ def factory_for(self, domain_object_type): from protean.core.event import domain_event_factory from protean.core.event_handler import event_handler_factory from protean.core.event_sourced_aggregate import event_sourced_aggregate_factory + from protean.core.event_sourced_repository import ( + event_sourced_repository_factory, + ) from protean.core.model import model_factory from protean.core.repository import repository_factory from protean.core.serializer import serializer_factory @@ -406,6 +409,7 @@ def factory_for(self, domain_object_type): DomainObjects.EVENT.value: domain_event_factory, DomainObjects.EVENT_HANDLER.value: event_handler_factory, DomainObjects.EVENT_SOURCED_AGGREGATE.value: event_sourced_aggregate_factory, + DomainObjects.EVENT_SOURCED_REPOSITORY.value: event_sourced_repository_factory, DomainObjects.DOMAIN_SERVICE.value: domain_service_factory, DomainObjects.EMAIL.value: email_factory, DomainObjects.ENTITY.value: entity_factory, diff --git a/src/protean/port/dao.py b/src/protean/port/dao.py index 84a0bd59..fd0847e9 100644 --- a/src/protean/port/dao.py +++ b/src/protean/port/dao.py @@ -353,15 +353,17 @@ def _validate_and_update_version(self, entity_obj) -> None: identifier = getattr(entity_obj, id_field(self.entity_cls).field_name) persisted_entity = self.get(identifier) + # The version of aggregate in the persistence store should be the same as + # the version we are dealing with. if persisted_entity._version != entity_obj._version: raise ExpectedVersionError( f"Wrong expected version: {entity_obj._version} " f"(Aggregate: {self.entity_cls.__name__}({identifier}), Version: {persisted_entity._version})" ) - entity_obj._version += 1 - else: - entity_obj._version = 0 + # Now that we are certain we are dealing with the correct version, + # we can safely update the version to the next version. + entity_obj._version = entity_obj._next_version def save(self, entity_obj): """Create or update an entity in the data store, depending on its state. An identity for entity record is diff --git a/src/protean/reflection.py b/src/protean/reflection.py index b6c2753a..3975b59c 100644 --- a/src/protean/reflection.py +++ b/src/protean/reflection.py @@ -24,6 +24,26 @@ def fields(class_or_instance): return fields_dict +def data_fields(class_or_instance): + """Return a tuple describing the data fields of this dataclass. + + Accepts a dataclass or an instance of one. Tuple elements are of + type Field. + """ + try: + fields_dict = dict(getattr(class_or_instance, _FIELDS)) + + # Remove internal fields + fields_dict.pop("_next_version", None) + fields_dict.pop("_metadata", None) + except AttributeError: + raise IncorrectUsageError( + {"field": [f"{class_or_instance} does not have fields"]} + ) + + return fields_dict + + def id_field(class_or_instance): try: field_name = getattr(class_or_instance, _ID_FIELD_NAME) @@ -98,6 +118,7 @@ def declared_fields(class_or_instance): # Remove internal fields fields_dict.pop("_version", None) + fields_dict.pop("_next_version", None) fields_dict.pop("_metadata", None) except AttributeError: raise IncorrectUsageError( diff --git a/src/protean/utils/mixins.py b/src/protean/utils/mixins.py index 5beaaac9..d541e598 100644 --- a/src/protean/utils/mixins.py +++ b/src/protean/utils/mixins.py @@ -141,6 +141,12 @@ def to_aggregate_event_message( # Use explicit stream name if provided, or fallback on Aggregate's stream name stream_name = event.meta_.stream_name or aggregate_stream_name + if not stream_name: + raise ConfigurationError( + f"No stream name found for `{event.__class__.__name__}`. " + "Either specify an explicit stream name or associate the event with an aggregate." + ) + return cls( stream_name=f"{stream_name}-{identifier}", type=fully_qualified_name(event.__class__), diff --git a/tests/aggregate/events/test_raising_fact_events.py b/tests/aggregate/events/test_raising_fact_events.py index 15f31fac..cf4f3813 100644 --- a/tests/aggregate/events/test_raising_fact_events.py +++ b/tests/aggregate/events/test_raising_fact_events.py @@ -24,7 +24,8 @@ def register_elements(test_domain): test_domain.init(traverse=False) -def test_generation_of_first_fact_event_on_persistence(test_domain): +@pytest.fixture +def event(test_domain): user = User(name="John Doe", email="john.doe@example.com") test_domain.repository_for(User).add(user) @@ -34,7 +35,38 @@ def test_generation_of_first_fact_event_on_persistence(test_domain): # Deserialize event event = Message.to_object(event_messages[0]) + + return event + + +def test_generation_of_first_fact_event_on_persistence(event): assert event is not None assert event.__class__.__name__ == "UserFactEvent" assert event.name == "John Doe" assert event.email == "john.doe@example.com" + + +def test_fact_event_version_metadata(event): + assert event._metadata.id.endswith(".0.1") + assert event._metadata.sequence_id == "0.1" + assert event._version == 0 + + +def test_fact_event_version_metadata_after_second_edit(test_domain): + user = User(name="John Doe", email="john.doe@example.com") + test_domain.repository_for(User).add(user) + + refreshed_user = test_domain.repository_for(User).get(user.id) + refreshed_user.name = "Jane Doe" + test_domain.repository_for(User).add(refreshed_user) + + # Read event from event store + event_messages = test_domain.event_store.store.read(f"user-fact-{user.id}") + assert len(event_messages) == 2 + + # Deserialize event + event = Message.to_object(event_messages[1]) + + assert event._metadata.id.endswith(".1.1") + assert event._metadata.sequence_id == "1.1" + assert event._version == 1 diff --git a/tests/aggregate/test_aggregate_new_version.py b/tests/aggregate/test_aggregate_new_version.py new file mode 100644 index 00000000..e6834c2e --- /dev/null +++ b/tests/aggregate/test_aggregate_new_version.py @@ -0,0 +1,47 @@ +import pytest + +from protean import BaseAggregate +from protean.fields import String + + +class User(BaseAggregate): + name = String(max_length=50, required=True) + email = String(required=True) + status = String(choices=["ACTIVE", "ARCHIVED"], default="ACTIVE") + + +@pytest.fixture(autouse=True) +def register_elements(test_domain): + test_domain.register(User) + test_domain.init(traverse=False) + + +def test_aggregate_on_initializaton_has_next_version_0(): + user = User(name="John Doe", email="john.doe@example.com") + assert user._version == -1 + assert user._next_version == 0 + + +def test_aggregate_after_first_persistence_has_next_version_1(test_domain): + user = User(name="John Doe", email="john.doe@example.com") + test_domain.repository_for(User).add(user) + + refreshed_user = test_domain.repository_for(User).get(user.id) + assert refreshed_user._version == 0 + assert refreshed_user._next_version == 1 + + +def test_aggregate_after_multiple_persistences_has_next_version_incremented( + test_domain, +): + user = User(name="John Doe", email="john.doe@example.com") + test_domain.repository_for(User).add(user) + + for i in range(10): + refreshed_user = test_domain.repository_for(User).get(user.id) + refreshed_user.name = f"Jane Doe {i}" + test_domain.repository_for(User).add(refreshed_user) + + refreshed_user = test_domain.repository_for(User).get(user.id) + assert refreshed_user._version == 10 + assert refreshed_user._next_version == 11 diff --git a/tests/aggregate/test_aggregate_properties.py b/tests/aggregate/test_aggregate_properties.py index dbf8df30..831c1bc5 100644 --- a/tests/aggregate/test_aggregate_properties.py +++ b/tests/aggregate/test_aggregate_properties.py @@ -18,6 +18,7 @@ def test_conversion_of_aggregate_values_to_dict(self): "id": 12, "name": "ADMIN", "created_on": str(current_time), + "_version": -1, } def test_repr_output_of_aggregate(self): diff --git a/tests/aggregate/test_aggregates_with_entities.py b/tests/aggregate/test_aggregates_with_entities.py index e16d05ad..0457b8fa 100644 --- a/tests/aggregate/test_aggregates_with_entities.py +++ b/tests/aggregate/test_aggregates_with_entities.py @@ -111,4 +111,5 @@ def test_conversion_of_enclosed_entity_values_to_dict(self, persisted_post): "commented_at": str(comment2.commented_at), }, ], + "_version": -1, } diff --git a/tests/aggregate/test_as_dict.py b/tests/aggregate/test_as_dict.py index 6e9be42e..24695a12 100644 --- a/tests/aggregate/test_as_dict.py +++ b/tests/aggregate/test_as_dict.py @@ -28,6 +28,7 @@ class Post(BaseAggregate): "title": "Test Post", "slug": "test-post", "content": "Do Re Mi Fa", + "_version": -1, } def test_as_dict_with_date_fields(self): @@ -51,6 +52,7 @@ class Post(BaseAggregate): "slug": "test-post", "content": "Do Re Mi Fa", "posted_at": str(current_time), + "_version": -1, } def test_as_dict_with_aggregate_that_has_many_entities(self, test_domain): @@ -84,6 +86,7 @@ class Post(BaseAggregate): {"id": comment1.id, "content": "first comment"}, {"id": comment2.id, "content": "second comment"}, ], + "_version": -1, } def test_as_dict_with_aggregate_that_has_many_entities_with_reference( @@ -118,6 +121,7 @@ class Post(BaseAggregate): {"id": comment1.id, "content": "first comment"}, {"id": comment2.id, "content": "second comment"}, ], + "_version": -1, } def test_as_dict_with_aggregate_that_has_one_entity(self, test_domain): @@ -146,6 +150,7 @@ class PostMeta(BaseEntity): "slug": "test-post", "content": "Do Re Mi Fa", "meta": {"id": meta.id, "likes": 27}, + "_version": -1, } def test_as_dict_with_aggregate_that_has_a_value_object(self, test_domain): @@ -163,4 +168,5 @@ class User(BaseAggregate): "address": "john.doe@gmail.com", }, "password": "secret", + "_version": -1, } diff --git a/tests/event/tests.py b/tests/event/tests.py index dae160a3..4dee98e7 100644 --- a/tests/event/tests.py +++ b/tests/event/tests.py @@ -5,12 +5,27 @@ from protean import BaseEvent, BaseValueObject from protean.exceptions import NotSupportedError from protean.fields import String, ValueObject +from protean.reflection import data_fields, declared_fields, fields from protean.utils import fully_qualified_name from .elements import Person, PersonAdded class TestDomainEventDefinition: + def test_domain_event_dict_keys(self): + assert all( + key in declared_fields(PersonAdded) + for key in ["first_name", "last_name", "age", "id"] + ) + assert all( + key in data_fields(PersonAdded) + for key in ["first_name", "last_name", "age", "id"] + ) + assert all( + key in fields(PersonAdded) + for key in ["first_name", "last_name", "age", "id", "_metadata"] + ) + def test_that_domain_event_can_accommodate_value_objects(self, test_domain): class Email(BaseValueObject): address = String(max_length=255) diff --git a/tests/event_sourced_aggregates/events/test_fact_event_generation.py b/tests/event_sourced_aggregates/events/test_fact_event_generation.py index 32ca7a02..020dcf24 100644 --- a/tests/event_sourced_aggregates/events/test_fact_event_generation.py +++ b/tests/event_sourced_aggregates/events/test_fact_event_generation.py @@ -63,6 +63,11 @@ def test_generation_of_first_fact_event_on_persistence(test_domain): assert event.name == "John Doe" assert event.email == "john.doe@example.com" + # Check event versions + assert event._metadata.id.endswith(".0") + assert event._metadata.sequence_id == "0" + assert event._version == 0 + def test_generation_of_subsequent_fact_events_after_fetch(test_domain): # Initialize and save @@ -90,14 +95,22 @@ def test_generation_of_subsequent_fact_events_after_fetch(test_domain): fact_event_messages = test_domain.event_store.store.read(f"user-fact-{user.id}") assert len(fact_event_messages) == 2 - # Deserialize 1st event + # Deserialize 1st event and verify event = Message.to_object(fact_event_messages[0]) assert event is not None assert event.__class__.__name__ == "UserFactEvent" assert event.name == "John Doe" - # Deserialize 2nd event + assert event._metadata.id.endswith(".0") + assert event._metadata.sequence_id == "0" + assert event._version == 0 + + # Deserialize 2nd event and verify event = Message.to_object(fact_event_messages[1]) assert event is not None assert event.__class__.__name__ == "UserFactEvent" assert event.name == "Jane Doe" + + assert event._metadata.id.endswith(".1") + assert event._metadata.sequence_id == "1" + assert event._version == 1 diff --git a/tests/event_sourced_aggregates/test_event_association_with_aggregate.py b/tests/event_sourced_aggregates/test_event_association_with_aggregate.py index 94757d63..3843a792 100644 --- a/tests/event_sourced_aggregates/test_event_association_with_aggregate.py +++ b/tests/event_sourced_aggregates/test_event_association_with_aggregate.py @@ -3,7 +3,7 @@ import pytest from protean import BaseEvent, BaseEventSourcedAggregate, apply -from protean.exceptions import IncorrectUsageError +from protean.exceptions import ConfigurationError, IncorrectUsageError from protean.fields import Identifier, String @@ -28,6 +28,10 @@ class UserRenamed(BaseEvent): name = String(required=True, max_length=50) +class UserArchived(BaseEvent): + user_id = Identifier(required=True) + + class User(BaseEventSourcedAggregate): user_id = Identifier(identifier=True) name = String(max_length=50, required=True) @@ -70,10 +74,14 @@ def registered(self, _: UserRegistered): @pytest.fixture(autouse=True) def register_elements(test_domain): test_domain.register(User) + test_domain.register(UserRegistered, part_of=User) + test_domain.register(UserActivated, part_of=User) + test_domain.register(UserRenamed, part_of=User) + test_domain.register(Email) @pytest.mark.eventstore -def test_that_event_is_associated_with_aggregate_by_apply_methods(): +def test_that_event_is_associated_with_aggregate(): assert UserRegistered.meta_.part_of == User assert UserActivated.meta_.part_of == User assert UserRenamed.meta_.part_of == User @@ -93,3 +101,17 @@ def test_that_trying_to_associate_an_event_with_multiple_aggregates_throws_an_er "tests.event_sourced_aggregates.test_event_association_with_aggregate.UserRegistered" ] } + + +@pytest.mark.eventstore +def test_an_unassociated_event_throws_error(test_domain): + user = User.register(user_id="1", name="", email="") + user.raise_(UserArchived(user_id=user.user_id)) + + with pytest.raises(ConfigurationError) as exc: + test_domain.repository_for(User).add(user) + + assert exc.value.args[0] == ( + "No stream name found for `UserArchived`. " + "Either specify an explicit stream name or associate the event with an aggregate." + ) diff --git a/tests/event_sourced_aggregates/test_generated_event_version.py b/tests/event_sourced_aggregates/test_generated_event_version.py new file mode 100644 index 00000000..fe1f2487 --- /dev/null +++ b/tests/event_sourced_aggregates/test_generated_event_version.py @@ -0,0 +1,149 @@ +from enum import Enum + +import pytest + +from protean import BaseEvent, BaseEventSourcedAggregate, apply +from protean.fields import Identifier, String +from protean.utils.mixins import Message + + +class UserStatus(Enum): + ACTIVE = "ACTIVE" + INACTIVE = "INACTIVE" + ARCHIVED = "ARCHIVED" + + +class UserRegistered(BaseEvent): + user_id = Identifier(required=True) + name = String(max_length=50, required=True) + email = String(required=True) + + +class UserActivated(BaseEvent): + user_id = Identifier(required=True) + + +class UserRenamed(BaseEvent): + user_id = Identifier(required=True) + name = String(required=True, max_length=50) + + +class User(BaseEventSourcedAggregate): + user_id = Identifier(identifier=True) + name = String(max_length=50, required=True) + email = String(required=True) + status = String(choices=UserStatus) + + @classmethod + def register(cls, user_id, name, email): + user = cls(user_id=user_id, name=name, email=email) + user.raise_(UserRegistered(user_id=user_id, name=name, email=email)) + return user + + def activate(self): + self.raise_(UserActivated(user_id=self.user_id)) + + def change_name(self, name): + self.raise_(UserRenamed(user_id=self.user_id, name=name)) + + @apply + def registered(self, _: UserRegistered): + self.status = UserStatus.INACTIVE.value + + @apply + def activated(self, _: UserActivated): + self.status = UserStatus.ACTIVE.value + + @apply + def renamed(self, event: UserRenamed): + self.name = event.name + + +@pytest.fixture(autouse=True) +def register_elements(test_domain): + test_domain.register(User) + test_domain.register(UserRegistered, part_of=User) + test_domain.register(UserActivated, part_of=User) + test_domain.register(UserRenamed, part_of=User) + test_domain.init(traverse=False) + + +def test_aggregate_and_event_version_on_initialization(): + user = User.register(user_id="1", name="John Doe", email="john.doe@example.com") + assert user._version == 0 + assert user._events[0]._metadata.id.endswith(".0") + assert user._events[0]._metadata.sequence_id == "0" + + +def test_aggregate_and_event_version_after_first_persistence(test_domain): + user = User.register(user_id="1", name="John Doe", email="john.doe@example.com") + test_domain.repository_for(User).add(user) + + event_messages = test_domain.event_store.store.read(f"user-{user.user_id}") + assert len(event_messages) == 1 + + refreshed_user = test_domain.repository_for(User).get(user.user_id) + assert refreshed_user._version == 0 + + # Deserialize event + event = Message.to_object(event_messages[0]) + + assert event._metadata.id.endswith(".0") + assert event._metadata.sequence_id == "0" + + +def test_aggregate_and_event_version_after_first_persistence_after_multiple_persistence( + test_domain, +): + user = User.register(user_id="1", name="John Doe", email="john.doe@example.com") + test_domain.repository_for(User).add(user) + + for i in range(10): + refreshed_user = test_domain.repository_for(User).get(user.user_id) + refreshed_user.change_name(f"John Doe {i}") + test_domain.repository_for(User).add(refreshed_user) + + event_messages = test_domain.event_store.store.read(f"user-{user.user_id}") + assert len(event_messages) == 11 + + refreshed_user = test_domain.repository_for(User).get(user.user_id) + assert refreshed_user._version == 10 + + # Deserialize event + event = Message.to_object(event_messages[-1]) + + assert event._metadata.id.endswith(".10") + assert event._metadata.sequence_id == "10" + + +def test_aggregate_and_event_version_after_multiple_event_generation_in_one_update_cylce( + test_domain, +): + user = User.register(user_id="1", name="John Doe", email="john.doe@example.com") + user.change_name("Jane Doe") + + # Check event versions before persistence + assert user._version == 1 + assert user._events[0]._metadata.id.endswith(".0") + assert user._events[0]._metadata.sequence_id == "0" + assert user._events[1]._metadata.id.endswith(".1") + assert user._events[1]._metadata.sequence_id == "1" + + # Persist user just once + test_domain.repository_for(User).add(user) + + # Retrieve user + refreshed_user = test_domain.repository_for(User).get(user.user_id) + + assert refreshed_user._version == 1 + + event_messages = test_domain.event_store.store.read(f"user-{user.user_id}") + assert len(event_messages) == 2 + + event1 = Message.to_object(event_messages[0]) + event2 = Message.to_object(event_messages[1]) + + assert event1._metadata.id.endswith(".0") + assert event1._metadata.sequence_id == "0" + assert event2._metadata.id.endswith(".1") + assert event2._metadata.sequence_id == "1" diff --git a/tests/event_sourced_repository/test_add.py b/tests/event_sourced_repository/test_add.py index 451c4ed2..c4f31332 100644 --- a/tests/event_sourced_repository/test_add.py +++ b/tests/event_sourced_repository/test_add.py @@ -1,19 +1,34 @@ +from uuid import uuid4 + import pytest -from protean import BaseEventSourcedAggregate +from protean import BaseEvent, BaseEventSourcedAggregate from protean.exceptions import IncorrectUsageError from protean.fields import Identifier, String +class UserRegistered(BaseEvent): + id = Identifier(required=True) + name = String(max_length=50, required=True) + email = String(required=True) + + class User(BaseEventSourcedAggregate): id = Identifier(identifier=True) email = String() name = String() + @classmethod + def register(cls, id, name, email): + user = cls(id=id, name=name, email=email) + user.raise_(UserRegistered(id=id, name=name, email=email)) + return user + @pytest.fixture(autouse=True) def register_elements(test_domain): test_domain.register(User) + test_domain.register(UserRegistered, part_of=User) test_domain.init(traverse=False) @@ -24,3 +39,23 @@ def test_exception_on_empty_aggregate_object(test_domain): assert exception.value.messages == { "_entity": ["Aggregate object to persist is invalid"] } + + +def test_successful_persistence_of_aggregate(test_domain): + user = User.register(id=str(uuid4()), name="John Doe", email="john.doe@example.com") + assert len(user._events) == 1 + + test_domain.repository_for(User).add(user) + assert len(user._events) == 0 + + event_messages = test_domain.event_store.store.read(f"user-{user.id}") + assert len(event_messages) == 1 + + +def test_aggregate_with_no_changes_is_not_acted_on(test_domain): + user = User(id=str(uuid4()), name="John Doe", email="john.doe@example.com") + assert len(user._events) == 0 + + test_domain.repository_for(User).add(user) + event_messages = test_domain.event_store.store.read(f"user-{user.id}") + assert len(event_messages) == 0 diff --git a/tests/event_sourced_repository/test_retrieving_event_sourced_repository.py b/tests/event_sourced_repository/test_retrieving_event_sourced_repository.py index 392b79ad..1eb53561 100644 --- a/tests/event_sourced_repository/test_retrieving_event_sourced_repository.py +++ b/tests/event_sourced_repository/test_retrieving_event_sourced_repository.py @@ -1,6 +1,6 @@ import pytest -from protean import BaseEventSourcedAggregate +from protean import BaseAggregate, BaseEventSourcedAggregate from protean.core.event_sourced_repository import BaseEventSourcedRepository from protean.exceptions import IncorrectUsageError from protean.fields import Integer, String @@ -29,5 +29,30 @@ def test_that_a_custom_repository_cannot_be_associated_with_event_sourced_aggreg class CustomUserRepository(BaseEventSourcedRepository): pass - with pytest.raises(IncorrectUsageError): - test_domain.register(CustomUserRepository, part_of=User) + with pytest.raises(IncorrectUsageError) as exc: + test_domain.register(CustomUserRepository) + + assert exc.value.messages == { + "_entity": [ + "Repository `CustomUserRepository` should be associated with an Aggregate" + ] + } + + +def test_that_an_event_sourced_repository_can_only_be_associated_with_an_event_sourced_aggregate( + test_domain, +): + class CustomAggregate(BaseAggregate): + pass + + class CustomRepository(BaseEventSourcedRepository): + pass + + with pytest.raises(IncorrectUsageError) as exc: + test_domain.register(CustomRepository, part_of=CustomAggregate) + + assert exc.value.messages == { + "_entity": [ + "Repository `CustomRepository` can only be associated with an Event Sourced Aggregate" + ] + } diff --git a/tests/event_sourced_repository/tests.py b/tests/event_sourced_repository/tests.py new file mode 100644 index 00000000..e1b0b883 --- /dev/null +++ b/tests/event_sourced_repository/tests.py @@ -0,0 +1,11 @@ +import pytest + +from protean.core.event_sourced_repository import BaseEventSourcedRepository +from protean.exceptions import NotSupportedError + + +def test_event_sourced_aggregate_cannot_be_initialized(): + with pytest.raises(NotSupportedError) as exc: + BaseEventSourcedRepository() + + assert str(exc.value) == "BaseEventSourcedRepository cannot be instantiated" diff --git a/tests/event_store/test_reading_all_streams.py b/tests/event_store/test_reading_all_streams.py index e2a8929f..dcede259 100644 --- a/tests/event_store/test_reading_all_streams.py +++ b/tests/event_store/test_reading_all_streams.py @@ -69,6 +69,18 @@ class Published(BaseEvent): published_time = DateTime(default=utcnow_func) +@pytest.fixture(autouse=True) +def register_elements(test_domain): + test_domain.register(User) + test_domain.register(Registered, part_of=User) + test_domain.register(Activated, part_of=User) + test_domain.register(Renamed, part_of=User) + + test_domain.register(Post) + test_domain.register(Created, part_of=Post) + test_domain.register(Published, part_of=Post) + + @pytest.mark.eventstore def test_reading_messages_from_all_streams(test_domain): user_identifier = str(uuid4()) diff --git a/tests/field/test_auto.py b/tests/field/test_auto.py index 7d82d03b..a5f92724 100644 --- a/tests/field/test_auto.py +++ b/tests/field/test_auto.py @@ -21,7 +21,7 @@ class AutoTest(BaseAggregate): assert isinstance(auto.auto_field, str) assert_str_is_uuid(str(auto.auto_field)) - assert auto.to_dict() == {"auto_field": str(auto.auto_field)} + assert auto.to_dict() == {"_version": -1, "auto_field": str(auto.auto_field)} def test_automatic_uuid_generation_of_non_identifier_fields(self, test_domain): class AutoTest(BaseAggregate): @@ -36,6 +36,7 @@ class AutoTest(BaseAggregate): assert_str_is_uuid(str(auto.auto_field2)) assert auto.to_dict() == { + "_version": -1, "id": str(auto.id), "auto_field1": str(auto.auto_field1), "auto_field2": str(auto.auto_field2), diff --git a/tests/reflection/test_data_fields.py b/tests/reflection/test_data_fields.py new file mode 100644 index 00000000..accd6377 --- /dev/null +++ b/tests/reflection/test_data_fields.py @@ -0,0 +1,13 @@ +from protean import BaseAggregate +from protean.fields import Integer, String +from protean.reflection import data_fields + + +class Person(BaseAggregate): + name = String(max_length=50, required=True) + age = Integer() + + +def test_data_fields(): + assert len(data_fields(Person)) == 4 + assert all(key in data_fields(Person) for key in ["name", "age", "id", "_version"]) diff --git a/tests/reflection/test_fields.py b/tests/reflection/test_fields.py index 549ac39d..03295a30 100644 --- a/tests/reflection/test_fields.py +++ b/tests/reflection/test_fields.py @@ -3,7 +3,7 @@ from protean import BaseAggregate from protean.exceptions import IncorrectUsageError from protean.fields import Integer, String -from protean.reflection import fields +from protean.reflection import declared_fields class Person(BaseAggregate): @@ -11,9 +11,9 @@ class Person(BaseAggregate): age = Integer() -def test_fields(): - assert len(fields(Person)) == 4 - assert all(key in fields(Person) for key in ["name", "age", "id", "_version"]) +def test_declared_fields(): + assert len(declared_fields(Person)) == 3 + assert all(key in declared_fields(Person) for key in ["name", "age", "id"]) def test_fields_on_non_element(): @@ -21,7 +21,7 @@ class Dummy: pass with pytest.raises(IncorrectUsageError) as exception: - fields(Dummy) + declared_fields(Dummy) assert exception.value.messages == { "field": [ diff --git a/tests/subscription/test_message_filtering_with_origin_stream.py b/tests/subscription/test_message_filtering_with_origin_stream.py index 469953ac..d02a4255 100644 --- a/tests/subscription/test_message_filtering_with_origin_stream.py +++ b/tests/subscription/test_message_filtering_with_origin_stream.py @@ -65,13 +65,22 @@ def record_sent_email(self, event: Sent) -> None: pass +@pytest.fixture(autouse=True) +def register_elements(test_domain): + test_domain.register(User) + test_domain.register(Registered, part_of=User) + test_domain.register(Activated, part_of=User) + test_domain.register(UserEventHandler, part_of=User) + + test_domain.register(Email) + test_domain.register(Sent, part_of=Email) + test_domain.register(EmailEventHandler, stream_name="email", source_stream="user") + + @pytest.mark.asyncio async def test_message_filtering_for_event_handlers_with_defined_origin_stream( test_domain, ): - test_domain.register(UserEventHandler, part_of=User) - test_domain.register(EmailEventHandler, stream_name="email", source_stream="user") - engine = Engine(test_domain, test_mode=True) email_event_handler_subscription = engine._subscriptions[fqn(EmailEventHandler)] diff --git a/tests/subscription/test_no_message_filtering.py b/tests/subscription/test_no_message_filtering.py index 12acbbde..3b5345ed 100644 --- a/tests/subscription/test_no_message_filtering.py +++ b/tests/subscription/test_no_message_filtering.py @@ -65,13 +65,21 @@ def record_sent_email(self, event: Sent) -> None: pass +@pytest.fixture(autouse=True) +def register_elements(test_domain): + test_domain.register(User) + test_domain.register(Registered, part_of=User) + test_domain.register(Activated, part_of=User) + test_domain.register(UserEventHandler, part_of=User) + test_domain.register(Email) + test_domain.register(Sent, part_of=Email) + test_domain.register(EmailEventHandler, stream_name="email") + + @pytest.mark.asyncio async def test_no_filtering_for_event_handlers_without_defined_origin_stream( test_domain, ): - test_domain.register(UserEventHandler, part_of=User) - test_domain.register(EmailEventHandler, stream_name="email") - engine = Engine(test_domain, test_mode=True) email_event_handler_subscription = engine._subscriptions[fqn(EmailEventHandler)] diff --git a/tests/value_object/test_to_dict.py b/tests/value_object/test_to_dict.py index 70314666..b336b04a 100644 --- a/tests/value_object/test_to_dict.py +++ b/tests/value_object/test_to_dict.py @@ -27,7 +27,7 @@ class EntityWithDateTimeVO(BaseAggregate): class TestAsDict: def test_empty_simple_vo(self): simple = SimpleVOEntity(id=12) - assert simple.to_dict() == {"id": 12} + assert simple.to_dict() == {"_version": -1, "id": 12} def test_simple_vo_dict(self): vo = SimpleVO(foo="foo", bar="bar") @@ -36,7 +36,11 @@ def test_simple_vo_dict(self): def test_embedded_simple_vo(self): vo = SimpleVO(foo="foo", bar="bar") simple = SimpleVOEntity(id=12, vo=vo) - assert simple.to_dict() == {"id": 12, "vo": {"foo": "foo", "bar": "bar"}} + assert simple.to_dict() == { + "_version": -1, + "id": 12, + "vo": {"foo": "foo", "bar": "bar"}, + } def test_datetime_vo_dict(self): now = datetime.now(UTC) @@ -47,4 +51,8 @@ def test_embedded_datetime_vo(self): now = datetime.now(UTC) vo = VOWithDateTime(foo="foo", now=now) simple = EntityWithDateTimeVO(id=12, vo=vo) - assert simple.to_dict() == {"id": 12, "vo": {"foo": "foo", "now": str(now)}} + assert simple.to_dict() == { + "_version": -1, + "id": 12, + "vo": {"foo": "foo", "now": str(now)}, + }