diff --git a/docs/core-concepts/building-blocks/events.md b/docs/core-concepts/building-blocks/events.md index 1fce445f..e8277ee2 100644 --- a/docs/core-concepts/building-blocks/events.md +++ b/docs/core-concepts/building-blocks/events.md @@ -7,6 +7,11 @@ consistent and informed. ## Facts +### Events are always associated with aggregates. { data-toc-label="Linked to Aggregates" } +An event is always associated to the aggregate that emits it. Events of an +event type are emitted to the aggregate stream that the event type is +associated with. + ### Events are essentially Data Transfer Objects (DTO). { data-toc-label="Data Transfer Objects" } They can only hold simple fields and Value Objects. @@ -97,6 +102,11 @@ or notifying external consumers for choice events, like `LowInventoryAlert`. They are also appropriate for composing a custom view of the state based on events (for example in Command Query Resource Separation). +#### Multiple Event Types + +Aggregates usually emit events of multiple delta event types. Each event +is individually associated with the aggregate. + ### Fact Events A fact event encloses the entire state of the aggregate at that specific point @@ -112,6 +122,7 @@ multiple delta event types, which can be risky and error-prone, especially as data schemas evolve and change over time. Instead, they rely on the owning service to compute and produce a fully detailed fact event. + ## Persistence ### Events are stored in an Event Store. { data-toc-label="Event Store" } diff --git a/docs/stylesheets/extra.css b/docs/stylesheets/extra.css index ce767212..df67b07f 100644 --- a/docs/stylesheets/extra.css +++ b/docs/stylesheets/extra.css @@ -23,7 +23,7 @@ p a, article ul li a { } /* Primary color and tighter space */ -.md-typeset h1, .md-typeset h2, .md-typeset h3 { +.md-typeset h1, .md-typeset h2, .md-typeset h3, .md-typeset h4 { color: var(--md-primary-fg-color); letter-spacing: -.025em; } diff --git a/src/protean/adapters/event_store/__init__.py b/src/protean/adapters/event_store/__init__.py index c460285d..4e21d45c 100644 --- a/src/protean/adapters/event_store/__init__.py +++ b/src/protean/adapters/event_store/__init__.py @@ -31,38 +31,32 @@ def __init__(self, domain): @property def store(self): - if self._event_store is None: - self._initialize() - return self._event_store def _initialize(self): - if not self._event_store: - logger.debug("Initializing Event Store...") - - configured_event_store = self.domain.config["event_store"] - if configured_event_store and isinstance(configured_event_store, dict): - event_store_full_path = EVENT_STORE_PROVIDERS[ - configured_event_store["provider"] - ] - event_store_module, event_store_class = event_store_full_path.rsplit( - ".", maxsplit=1 - ) + logger.debug("Initializing Event Store...") + + configured_event_store = self.domain.config["event_store"] + if configured_event_store and isinstance(configured_event_store, dict): + event_store_full_path = EVENT_STORE_PROVIDERS[ + configured_event_store["provider"] + ] + event_store_module, event_store_class = event_store_full_path.rsplit( + ".", maxsplit=1 + ) - event_store_cls = getattr( - importlib.import_module(event_store_module), event_store_class - ) + event_store_cls = getattr( + importlib.import_module(event_store_module), event_store_class + ) - store = event_store_cls(self.domain, configured_event_store) - else: - raise ConfigurationError( - "Configure at least one event store in the domain" - ) + store = event_store_cls(self.domain, configured_event_store) + else: + raise ConfigurationError("Configure at least one event store in the domain") - self._event_store = store + self._event_store = store - self._initialize_event_streams() - self._initialize_command_streams() + self._initialize_event_streams() + self._initialize_command_streams() return self._event_store @@ -85,9 +79,6 @@ def _initialize_command_streams(self): ) def repository_for(self, part_of): - if self._event_store is None: - self._initialize() - repository_cls = type( part_of.__name__ + "Repository", (BaseEventSourcedRepository,), {} ) @@ -97,20 +88,12 @@ def repository_for(self, part_of): return repository_cls(self.domain) def handlers_for(self, event: BaseEvent) -> List[BaseEventHandler]: - if self._event_streams is None: - self._initialize_event_streams() - + """Return all handlers configured to run on the given event.""" + # Gather handlers configured to run on all events all_stream_handlers = self._event_streams.get("$all", set()) - # Get the Aggregate's stream_name - aggregate_stream_name = None - if event.meta_.aggregate_cluster: - aggregate_stream_name = event.meta_.aggregate_cluster.meta_.stream_name - - stream_name = event.meta_.stream_name or aggregate_stream_name - stream_handlers = self._event_streams.get(stream_name, set()) - - # Gather all handlers that are configured to run on this event + # Gather all handlers configured to run on this event + stream_handlers = self._event_streams.get(event.meta_.stream_name, set()) configured_stream_handlers = set() for stream_handler in stream_handlers: if fqn(event.__class__) in stream_handler._handlers: @@ -119,9 +102,6 @@ def handlers_for(self, event: BaseEvent) -> List[BaseEventHandler]: return set.union(configured_stream_handlers, all_stream_handlers) def command_handler_for(self, command: BaseCommand) -> Optional[BaseCommandHandler]: - if self._command_streams is None: - self._initialize_command_streams() - stream_name = command.meta_.stream_name or ( command.meta_.part_of.meta_.stream_name if command.meta_.part_of else None ) diff --git a/src/protean/adapters/event_store/memory.py b/src/protean/adapters/event_store/memory.py index 30dff9ec..1317649d 100644 --- a/src/protean/adapters/event_store/memory.py +++ b/src/protean/adapters/event_store/memory.py @@ -2,10 +2,11 @@ from typing import Any, Dict, List from protean.core.aggregate import BaseAggregate +from protean.core.event import Metadata from protean.core.repository import BaseRepository from protean.globals import current_domain from protean.port.event_store import BaseEventStore -from protean.utils.mixins import MessageMetadata, MessageRecord +from protean.utils.mixins import MessageRecord class MemoryMessage(BaseAggregate, MessageRecord): @@ -52,7 +53,7 @@ def write( position=next_position, type=message_type, data=data, - metadata=MessageMetadata(**metadata) if metadata else None, + metadata=metadata, time=datetime.now(UTC), ) ) diff --git a/src/protean/container.py b/src/protean/container.py index 9dd85355..edbfec3a 100644 --- a/src/protean/container.py +++ b/src/protean/container.py @@ -13,7 +13,7 @@ ValidationError, ) from protean.fields import Auto, Field, FieldBase, ValueObject -from protean.globals import current_domain +from protean.globals import g from protean.reflection import id_field from protean.utils import generate_identity @@ -396,10 +396,11 @@ def raise_(self, event, fact_event=False) -> None: event_with_metadata = event.__class__( event.to_dict(), _metadata={ - "id": ( - f"{current_domain.name}.{self.__class__.__name__}.{event._metadata.version}" - f".{identifier}.{self._version}" - ), + "id": (f"{self.meta_.stream_name}-{identifier}-{self._version}"), + "type": f"{self.__class__.__name__}.{event.__class__.__name__}.{event._metadata.version}", + "kind": "EVENT", + "stream_name": self.meta_.stream_name, + "origin_stream_name": event._metadata.origin_stream_name, "timestamp": event._metadata.timestamp, "version": event._metadata.version, "sequence_id": self._version, diff --git a/src/protean/core/command.py b/src/protean/core/command.py index e846e566..62da2aad 100644 --- a/src/protean/core/command.py +++ b/src/protean/core/command.py @@ -1,11 +1,13 @@ from protean.container import BaseContainer, OptionsMixin +from protean.core.event import Metadata from protean.exceptions import ( IncorrectUsageError, InvalidDataError, NotSupportedError, ValidationError, ) -from protean.fields import Field +from protean.fields import Field, ValueObject +from protean.globals import g from protean.reflection import _ID_FIELD_NAME, declared_fields from protean.utils import DomainObjects, derive_element_class @@ -24,6 +26,9 @@ def __new__(cls, *args, **kwargs): raise NotSupportedError("BaseCommand cannot be instantiated") return super().__new__(cls) + # Track Metadata + _metadata = ValueObject(Metadata, default=lambda: Metadata()) # pragma: no cover + def __init_subclass__(subclass) -> None: super().__init_subclass__() @@ -32,7 +37,30 @@ def __init_subclass__(subclass) -> None: def __init__(self, *args, **kwargs): try: - super().__init__(*args, **kwargs) + super().__init__(*args, finalize=False, **kwargs) + + version = ( + self.__class__.__version__ + if hasattr(self.__class__, "__version__") + else "v1" + ) + + origin_stream_name = None + if hasattr(g, "message_in_context"): + if g.message_in_context.metadata.kind == "EVENT": + origin_stream_name = g.message_in_context.stream_name + + # Value Objects are immutable, so we create a clone/copy and associate it + self._metadata = Metadata( + self._metadata.to_dict(), # Template + kind="COMMAND", + origin_stream_name=origin_stream_name, + version=version, + ) + + # Finally lock the event and make it immutable + self._initialized = True + except ValidationError as exception: raise InvalidDataError(exception.messages) @@ -50,11 +78,22 @@ def __setattr__(self, name, value): @classmethod def _default_options(cls): + part_of = ( + getattr(cls.meta_, "part_of") if hasattr(cls.meta_, "part_of") else None + ) + + # This method is called during class import, so we cannot use part_of if it + # is still a string. We ignore it for now, and resolve `stream_name` later + # when the domain has resolved references. + # FIXME A better mechanism would be to not set stream_name here, unless explicitly + # specified, and resolve it during `domain.init()` + part_of = None if isinstance(part_of, str) else part_of + return [ ("abstract", False), ("aggregate_cluster", None), ("part_of", None), - ("stream_name", None), + ("stream_name", part_of.meta_.stream_name if part_of else None), ] @classmethod diff --git a/src/protean/core/entity.py b/src/protean/core/entity.py index e8134876..0e1ad4ab 100644 --- a/src/protean/core/entity.py +++ b/src/protean/core/entity.py @@ -11,7 +11,7 @@ from protean.exceptions import IncorrectUsageError, NotSupportedError, ValidationError from protean.fields import Auto, HasMany, Reference, ValueObject from protean.fields.association import Association -from protean.globals import current_domain +from protean.globals import g from protean.reflection import ( _FIELDS, attributes, @@ -449,9 +449,12 @@ def raise_(self, event) -> None: event.to_dict(), _metadata={ "id": ( - f"{current_domain.name}.{self.__class__.__name__}.{event._metadata.version}" - f".{identifier}.{aggregate_version}.{event_number}" + f"{self._root.meta_.stream_name}-{identifier}-{aggregate_version}.{event_number}" ), + "type": f"{self._root.__class__.__name__}.{event.__class__.__name__}.{event._metadata.version}", + "kind": "EVENT", + "stream_name": self._root.meta_.stream_name, + "origin_stream_name": event._metadata.origin_stream_name, "timestamp": event._metadata.timestamp, "version": event._metadata.version, "sequence_id": f"{aggregate_version}.{event_number}", diff --git a/src/protean/core/event.py b/src/protean/core/event.py index 3c015f4b..e7e454d8 100644 --- a/src/protean/core/event.py +++ b/src/protean/core/event.py @@ -6,6 +6,7 @@ from protean.core.value_object import BaseValueObject from protean.exceptions import IncorrectUsageError, NotSupportedError from protean.fields import DateTime, Field, Integer, String, ValueObject +from protean.globals import g from protean.reflection import _ID_FIELD_NAME, declared_fields, fields from protean.utils import DomainObjects, derive_element_class @@ -17,6 +18,20 @@ class Metadata(BaseValueObject): # Format is .... id = String() + # Type of the event + # Format is .. + type = String() + + # Kind of the object + # Can be one of "EVENT", "COMMAND" + kind = String() + + # Name of the stream to which the event/command is written + stream_name = String() + + # Name of the stream that originated this event/command + origin_stream_name = String() + # Time of event generation timestamp = DateTime(default=lambda: datetime.now(timezone.utc)) @@ -114,11 +129,27 @@ def __track_id_field(subclass): def __init__(self, *args, **kwargs): super().__init__(*args, finalize=False, **kwargs) - if hasattr(self.__class__, "__version__"): - # Value Objects are immutable, so we create a clone/copy and associate it - self._metadata = Metadata( - self._metadata.to_dict(), version=self.__class__.__version__ - ) + version = ( + self.__class__.__version__ + if hasattr(self.__class__, "__version__") + else "v1" + ) + + origin_stream_name = None + if hasattr(g, "message_in_context"): + if ( + g.message_in_context.metadata.kind == "COMMAND" + and g.message_in_context.metadata.origin_stream_name is not None + ): + origin_stream_name = g.message_in_context.metadata.origin_stream_name + + # Value Objects are immutable, so we create a clone/copy and associate it + self._metadata = Metadata( + self._metadata.to_dict(), # Template + kind="EVENT", + origin_stream_name=origin_stream_name, + version=version, + ) # Finally lock the event and make it immutable self._initialized = True diff --git a/src/protean/domain/__init__.py b/src/protean/domain/__init__.py index 356b775a..a0192590 100644 --- a/src/protean/domain/__init__.py +++ b/src/protean/domain/__init__.py @@ -298,6 +298,7 @@ def _initialize(self): self.providers._initialize() self.caches._initialize() self.brokers._initialize() + self.event_store._initialize() def make_config(self): """Used to construct the config; invoked by the Domain constructor.""" diff --git a/src/protean/utils/mixins.py b/src/protean/utils/mixins.py index d541e598..35b6191b 100644 --- a/src/protean/utils/mixins.py +++ b/src/protean/utils/mixins.py @@ -10,7 +10,7 @@ from protean import fields from protean.container import BaseContainer, OptionsMixin from protean.core.command import BaseCommand -from protean.core.event import BaseEvent +from protean.core.event import BaseEvent, Metadata from protean.core.event_sourced_aggregate import BaseEventSourcedAggregate from protean.core.unit_of_work import UnitOfWork from protean.core.value_object import BaseValueObject @@ -28,26 +28,6 @@ class MessageType(Enum): READ_POSITION = "READ_POSITION" -class MessageMetadata(BaseValueObject): - # Marks message as a `COMMAND` or an `EVENT` - kind = fields.String(required=True, max_length=15, choices=MessageType) - - # Name of service that owns the contract of the message - owner = fields.String(max_length=50) - - # Allows for parsing of different versions, in case of - # breaking changes. - schema_version = fields.Integer() - - # `origin_stream_name` helps keep track of the origin of a message. - # A command created by an event is automatically associated with the original stream. - # Events raised subsequently by the commands also carry forward the original stream name. - origin_stream_name = fields.String() - - # FIXME Provide mechanism to add custom metadata fields/structure - # Can come handy in case of multi-tenancy, etc. - - class MessageRecord(BaseContainer): """ Base Container holding all fields of a message. @@ -77,7 +57,7 @@ class MessageRecord(BaseContainer): data = fields.Dict() # JSON representation of the message metadata - metadata = fields.ValueObject(MessageMetadata) + metadata = fields.ValueObject(Metadata) class Message(MessageRecord, OptionsMixin): # FIXME Remove OptionsMixin @@ -120,7 +100,7 @@ def from_dict(cls, message: Dict) -> Message: stream_name=message["stream_name"], type=message["type"], data=message["data"], - metadata=MessageMetadata(**message["metadata"]), + metadata=message["metadata"], position=message["position"], global_position=message["global_position"], time=message["time"], @@ -133,35 +113,25 @@ def to_aggregate_event_message( ) -> Message: identifier = getattr(aggregate, id_field(aggregate).field_name) - # Take the Aggregate's stream_name - aggregate_stream_name = None - if event.meta_.aggregate_cluster: - aggregate_stream_name = event.meta_.aggregate_cluster.meta_.stream_name - - # Use explicit stream name if provided, or fallback on Aggregate's stream name - stream_name = event.meta_.stream_name or aggregate_stream_name - - if not stream_name: + if not event.meta_.stream_name: raise ConfigurationError( f"No stream name found for `{event.__class__.__name__}`. " "Either specify an explicit stream name or associate the event with an aggregate." ) + # If this is a Fact Event, don't set an expected version. + # Otherwise, expect the previous version + if event.__class__.__name__.endswith("FactEvent"): + expected_version = None + else: + expected_version = int(event._metadata.sequence_id) - 1 + return cls( - stream_name=f"{stream_name}-{identifier}", + stream_name=f"{event.meta_.stream_name}-{identifier}", type=fully_qualified_name(event.__class__), data=event.to_dict(), - metadata=MessageMetadata( - kind=MessageType.EVENT.value, - owner=current_domain.name, - **cls.derived_metadata(MessageType.EVENT.value), - # schema_version=event.meta_.version, # FIXME Maintain version for event - ), - # If this is a Fact Event, don't set an expected version. - # Otherwise, expect the previous version - expected_version=None - if event.__class__.__name__.endswith("FactEvent") - else int(event._metadata.sequence_id) - 1, + metadata=event._metadata, + expected_version=expected_version, ) def to_object(self) -> Union[BaseEvent, BaseCommand]: @@ -186,22 +156,16 @@ def to_message(cls, message_object: Union[BaseEvent, BaseCommand]) -> Message: else: identifier = str(uuid4()) - # Take the Aggregate's stream_name - aggregate_stream_name = None - if message_object.meta_.aggregate_cluster: - aggregate_stream_name = ( - message_object.meta_.aggregate_cluster.meta_.stream_name + if not message_object.meta_.stream_name: + raise ConfigurationError( + f"No stream name found for `{message_object.__class__.__name__}`. " + "Either specify an explicit stream name or associate the event with an aggregate." ) - # Use explicit stream name if provided, or fallback on Aggregate's stream name - stream_name = message_object.meta_.stream_name or aggregate_stream_name - if isinstance(message_object, BaseEvent): - stream_name = f"{stream_name}-{identifier}" - kind = MessageType.EVENT.value + stream_name = f"{message_object.meta_.stream_name}-{identifier}" elif isinstance(message_object, BaseCommand): - stream_name = f"{stream_name}:command-{identifier}" - kind = MessageType.COMMAND.value + stream_name = f"{message_object.meta_.stream_name}:command-{identifier}" else: raise NotImplementedError # FIXME Handle unknown messages better @@ -209,12 +173,7 @@ def to_message(cls, message_object: Union[BaseEvent, BaseCommand]) -> Message: stream_name=stream_name, type=fully_qualified_name(message_object.__class__), data=message_object.to_dict(), - metadata=MessageMetadata( - kind=kind, - owner=current_domain.name, - **cls.derived_metadata(kind), - ), - # schema_version=command.meta_.version, # FIXME Maintain version + metadata=message_object._metadata, ) diff --git a/tests/adapters/event_store/memory_event_store/test_memory_message_repository.py b/tests/adapters/event_store/memory_event_store/test_memory_message_repository.py index 99e78611..8fd995cc 100644 --- a/tests/adapters/event_store/memory_event_store/test_memory_message_repository.py +++ b/tests/adapters/event_store/memory_event_store/test_memory_message_repository.py @@ -2,7 +2,6 @@ def test_is_category(test_domain): - test_domain.event_store.store # Establish connection to event store repo = test_domain.repository_for(MemoryMessage) assert repo.is_category("testStream-123") is False assert repo.is_category("testStream") is True diff --git a/tests/adapters/event_store/message_db_event_store/tests.py b/tests/adapters/event_store/message_db_event_store/tests.py index 6d16f60b..b01c66f2 100644 --- a/tests/adapters/event_store/message_db_event_store/tests.py +++ b/tests/adapters/event_store/message_db_event_store/tests.py @@ -7,6 +7,10 @@ @pytest.mark.message_db class TestMessageDBEventStore: + @pytest.fixture(autouse=True) + def initialize_domain(self, test_domain): + test_domain.init(traverse=False) + def test_retrieving_message_store_from_domain(self, test_domain): assert test_domain.event_store is not None assert test_domain.event_store.store is not None @@ -18,6 +22,7 @@ def test_error_on_message_db_initialization(self): domain.config["event_store"]["database_uri"] = ( "postgresql://message_store@localhost:5433/dummy" ) + domain.init(traverse=False) with pytest.raises(ConfigurationError) as exc: domain.event_store.store._write( diff --git a/tests/adapters/model/dict_model/tests.py b/tests/adapters/model/dict_model/tests.py index 93fc2c4c..ae4d5de0 100644 --- a/tests/adapters/model/dict_model/tests.py +++ b/tests/adapters/model/dict_model/tests.py @@ -9,6 +9,7 @@ class TestModel: @pytest.fixture(autouse=True) def register_person_aggregate(self, test_domain): test_domain.register(Person) + test_domain.init(traverse=False) def test_that_model_class_is_created_automatically(self, test_domain): model_cls = test_domain.repository_for(Person)._model diff --git a/tests/aggregate/events/test_aggregate_streams.py b/tests/aggregate/events/test_aggregate_streams.py new file mode 100644 index 00000000..4d818c0e --- /dev/null +++ b/tests/aggregate/events/test_aggregate_streams.py @@ -0,0 +1,100 @@ +import pytest + +from protean import BaseAggregate, BaseEntity, BaseEvent +from protean.fields import HasOne, Identifier, String +from protean.utils.mixins import Message + + +class Account(BaseEntity): + password_hash = String(max_length=512) + + def change_password(self, password): + self.password_hash = password + self.raise_(PasswordChanged(account_id=self.id, user_id=self.user_id)) + + +class PasswordChanged(BaseEvent): + account_id = Identifier(required=True) + user_id = Identifier(required=True) + + +class User(BaseAggregate): + name = String(max_length=50, required=True) + email = String(required=True) + status = String(choices=["ACTIVE", "ARCHIVED"]) + + account = HasOne(Account) + + def activate(self): + self.raise_(UserActivated(user_id=self.id)) + + def change_name(self, name): + self.raise_(UserRenamed(user_id=self.id, name=name)) + + +class UserActivated(BaseEvent): + user_id = Identifier(identifier=True) + + +class UserRenamed(BaseEvent): + user_id = Identifier(identifier=True) + name = String(required=True, max_length=50) + + +@pytest.fixture(autouse=True) +def register_elements(test_domain): + test_domain.register(User, fact_events=True) + test_domain.register(Account, part_of=User) + test_domain.register(UserActivated, part_of=User) + test_domain.register(UserRenamed, part_of=User) + test_domain.register(PasswordChanged, part_of=User) + test_domain.init(traverse=False) + + +class TestDeltaEvents: + def test_aggregate_stream_name(self): + assert User.meta_.stream_name == "user" + + def test_event_metadata(self): + user = User(name="John Doe", email="john.doe@example.com") + user.change_name("Jane Doe") + user.activate() + + assert len(user._events) == 2 + assert user._events[0]._metadata.id == f"user-{user.id}-0.1" + assert user._events[0]._metadata.type == "User.UserRenamed.v1" + assert user._events[0]._metadata.version == "v1" + assert user._events[0]._metadata.sequence_id == "0.1" + + assert user._events[1]._metadata.id == f"user-{user.id}-0.2" + assert user._events[1]._metadata.type == "User.UserActivated.v1" + assert user._events[1]._metadata.version == "v1" + assert user._events[1]._metadata.sequence_id == "0.2" + + def test_event_stream_name_in_message(self): + user = User(name="John Doe", email="john.doe@example.com") + user.change_name("Jane Doe") + + message = Message.to_message(user._events[0]) + + assert message.stream_name == f"user-{user.id}" + + def test_event_metadata_from_stream(self, test_domain): + user = User(name="John Doe", email="john.doe@example.com") + user.change_name("Jane Doe") + user.activate() + + test_domain.repository_for(User).add(user) + + event_messages = test_domain.event_store.store.read(f"user-{user.id}") + assert len(event_messages) == 2 + + assert event_messages[0].metadata.id == f"user-{user.id}-0.1" + assert event_messages[0].metadata.type == "User.UserRenamed.v1" + assert event_messages[0].metadata.version == "v1" + assert event_messages[0].metadata.sequence_id == "0.1" + + assert event_messages[1].metadata.id == f"user-{user.id}-0.2" + assert event_messages[1].metadata.type == "User.UserActivated.v1" + assert event_messages[1].metadata.version == "v1" + assert event_messages[1].metadata.sequence_id == "0.2" diff --git a/tests/aggregate/events/test_event_regular_metadata_id_and_sequence.py b/tests/aggregate/events/test_event_regular_metadata_id_and_sequence.py index f1bd2f12..4b5453bd 100644 --- a/tests/aggregate/events/test_event_regular_metadata_id_and_sequence.py +++ b/tests/aggregate/events/test_event_regular_metadata_id_and_sequence.py @@ -56,7 +56,7 @@ def test_initialization_with_first_event(): user = User(name="John Doe", email="john.doe@example.com") user.activate() - assert user._events[0]._metadata.id == f"Test.User.v1.{user.id}.0.1" + assert user._events[0]._metadata.id == f"user-{user.id}-0.1" assert user._events[0]._metadata.sequence_id == "0.1" @@ -65,9 +65,9 @@ def test_initialization_with_multiple_events(): user.activate() user.change_name("Jane Doe") - assert user._events[0]._metadata.id == f"Test.User.v1.{user.id}.0.1" + assert user._events[0]._metadata.id == f"user-{user.id}-0.1" assert user._events[0]._metadata.sequence_id == "0.1" - assert user._events[1]._metadata.id == f"Test.User.v1.{user.id}.0.2" + assert user._events[1]._metadata.id == f"user-{user.id}-0.2" assert user._events[1]._metadata.sequence_id == "0.2" @@ -79,10 +79,7 @@ def test_one_event_after_persistence(test_domain): refreshed_user = test_domain.repository_for(User).get(user.id) refreshed_user.change_name("Jane Doe") - assert ( - refreshed_user._events[0]._metadata.id - == f"Test.User.v1.{refreshed_user.id}.1.1" - ) + assert refreshed_user._events[0]._metadata.id == f"user-{refreshed_user.id}-1.1" assert refreshed_user._events[0]._metadata.sequence_id == "1.1" @@ -95,15 +92,9 @@ def test_multiple_events_after_persistence(test_domain): refreshed_user.change_name("Jane Doe") refreshed_user.change_name("Baby Doe") - assert ( - refreshed_user._events[0]._metadata.id - == f"Test.User.v1.{refreshed_user.id}.1.1" - ) + assert refreshed_user._events[0]._metadata.id == f"user-{refreshed_user.id}-1.1" assert refreshed_user._events[0]._metadata.sequence_id == "1.1" - assert ( - refreshed_user._events[1]._metadata.id - == f"Test.User.v1.{refreshed_user.id}.1.2" - ) + assert refreshed_user._events[1]._metadata.id == f"user-{refreshed_user.id}-1.2" assert refreshed_user._events[1]._metadata.sequence_id == "1.2" @@ -121,13 +112,7 @@ def test_multiple_events_after_multiple_persistence(test_domain): refreshed_user.change_name("Ark Doe") refreshed_user.change_name("Zing Doe") - assert ( - refreshed_user._events[0]._metadata.id - == f"Test.User.v1.{refreshed_user.id}.2.1" - ) + assert refreshed_user._events[0]._metadata.id == f"user-{refreshed_user.id}-2.1" assert refreshed_user._events[0]._metadata.sequence_id == "2.1" - assert ( - refreshed_user._events[1]._metadata.id - == f"Test.User.v1.{refreshed_user.id}.2.2" - ) + assert refreshed_user._events[1]._metadata.id == f"user-{refreshed_user.id}-2.2" assert refreshed_user._events[1]._metadata.sequence_id == "2.2" diff --git a/tests/aggregate/test_aggregate_events.py b/tests/aggregate/events/test_raising_aggregate_events.py similarity index 100% rename from tests/aggregate/test_aggregate_events.py rename to tests/aggregate/events/test_raising_aggregate_events.py diff --git a/tests/aggregate/events/test_raising_fact_events.py b/tests/aggregate/events/test_raising_fact_events.py index cf4f3813..e8952276 100644 --- a/tests/aggregate/events/test_raising_fact_events.py +++ b/tests/aggregate/events/test_raising_fact_events.py @@ -1,26 +1,19 @@ import pytest -from protean import BaseAggregate, BaseEntity -from protean.fields import HasOne, String +from protean import BaseAggregate +from protean.fields import String from protean.utils.mixins import Message -class Account(BaseEntity): - password_hash = String(max_length=512) - - class User(BaseAggregate): name = String(max_length=50, required=True) email = String(required=True) status = String(choices=["ACTIVE", "ARCHIVED"]) - account = HasOne(Account) - @pytest.fixture(autouse=True) def register_elements(test_domain): test_domain.register(User, fact_events=True) - test_domain.register(Account, part_of=User) test_domain.init(traverse=False) @@ -47,7 +40,7 @@ def test_generation_of_first_fact_event_on_persistence(event): def test_fact_event_version_metadata(event): - assert event._metadata.id.endswith(".0.1") + assert event._metadata.id.endswith("-0.1") assert event._metadata.sequence_id == "0.1" assert event._version == 0 @@ -67,6 +60,6 @@ def test_fact_event_version_metadata_after_second_edit(test_domain): # Deserialize event event = Message.to_object(event_messages[1]) - assert event._metadata.id.endswith(".1.1") + assert event._metadata.id.endswith("-1.1") assert event._metadata.sequence_id == "1.1" assert event._version == 1 diff --git a/tests/command_handler/test_inline_command_processing.py b/tests/command_handler/test_inline_command_processing.py index e1c8290e..958e0d9f 100644 --- a/tests/command_handler/test_inline_command_processing.py +++ b/tests/command_handler/test_inline_command_processing.py @@ -29,6 +29,7 @@ def register(self, event: Register) -> None: def test_that_command_can_be_processed_inline(test_domain): test_domain.register(User) test_domain.register(UserCommandHandlers, part_of=User) + test_domain.init(traverse=False) assert test_domain.config["command_processing"] == CommandProcessing.SYNC.value @@ -39,6 +40,7 @@ def test_that_command_can_be_processed_inline(test_domain): def test_that_command_is_persisted_in_message_store(test_domain): test_domain.register(User) test_domain.register(UserCommandHandlers, part_of=User) + test_domain.init(traverse=False) identifier = str(uuid4()) test_domain.process(Register(user_id=identifier, email="john.doe@gmail.com")) diff --git a/tests/command_handler/test_retrieving_handlers_by_command.py b/tests/command_handler/test_retrieving_handlers_by_command.py index 922ddb7e..bc735279 100644 --- a/tests/command_handler/test_retrieving_handlers_by_command.py +++ b/tests/command_handler/test_retrieving_handlers_by_command.py @@ -63,6 +63,7 @@ def test_retrieving_handler_by_command(test_domain): test_domain.register(Post) test_domain.register(Create, part_of=Post) test_domain.register(PostCommandHandler, part_of=Post) + test_domain.init(traverse=False) assert test_domain.command_handler_for(Register()) == UserCommandHandlers assert test_domain.command_handler_for(Create()) == PostCommandHandler @@ -82,6 +83,7 @@ def test_error_on_defining_multiple_handlers_for_a_command(test_domain): test_domain.register(User) test_domain.register(UserCommandHandlers, part_of=User) test_domain.register(AdminUserCommandHandlers, part_of=User) + test_domain.init(traverse=False) with pytest.raises(NotSupportedError) as exc: test_domain.command_handler_for(Register()) diff --git a/tests/conftest.py b/tests/conftest.py index 6952e01c..1478917b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -186,6 +186,9 @@ def test_domain(db_config, store_config, request): domain.config["databases"]["default"] = db_config domain.config["event_store"] = store_config + # We initialize and load default configuration into the domain here + # so that test cases that don't need explicit domain setup can + # still function. domain._initialize() with domain.domain_context(): @@ -232,4 +235,5 @@ def run_around_tests(test_domain): cache = test_domain.caches[cache_name] cache.flush_all() - test_domain.event_store.store._data_reset() + if test_domain.event_store.store: + test_domain.event_store.store._data_reset() diff --git a/tests/domain/test_domain_traversal.py b/tests/domain/test_domain_traversal.py index f9abae7b..c8b67058 100644 --- a/tests/domain/test_domain_traversal.py +++ b/tests/domain/test_domain_traversal.py @@ -22,7 +22,9 @@ def test_loading_domain_with_init(self): assert publishing7.domain is not None publishing7.domain.init() - assert len(publishing7.domain.registry.aggregates) == 1 + assert ( + len(publishing7.domain.registry.aggregates) == 2 + ) # Includes MemoryMessage Aggregate @pytest.mark.no_test_domain def test_loading_nested_domain_with_init(self): @@ -30,7 +32,9 @@ def test_loading_nested_domain_with_init(self): assert publishing13.domain is not None publishing13.domain.init() - assert len(publishing13.domain.registry.aggregates) == 2 + assert ( + len(publishing13.domain.registry.aggregates) == 3 + ) # Includes MemoryMessage Aggregate @pytest.mark.no_test_domain @@ -54,7 +58,7 @@ def test_all_elements_in_nested_structure_are_registered(self): assert domain.name == "Publishing20" domain.init() - assert len(domain.registry.aggregates) == 2 + assert len(domain.registry.aggregates) == 3 # Includes MemoryMessage Aggregate def test_elements_in_folder_with_their_own_toml_are_ignored(self): change_working_directory_to("test21") @@ -64,4 +68,4 @@ def test_elements_in_folder_with_their_own_toml_are_ignored(self): assert domain.name == "Publishing21" domain.init() - assert len(domain.registry.aggregates) == 1 + assert len(domain.registry.aggregates) == 2 # Includes MemoryMessage Aggregate diff --git a/tests/domain/test_init.py b/tests/domain/test_init.py index c28372a2..e63ff12d 100644 --- a/tests/domain/test_init.py +++ b/tests/domain/test_init.py @@ -1,36 +1,60 @@ -import mock - - -def test_domain_init_calls_validate_domain(test_domain): - mock_validate_domain = mock.Mock() - test_domain._validate_domain = mock_validate_domain - test_domain.init(traverse=False) - mock_validate_domain.assert_called_once() - - -def test_domain_init_calls_traverse(test_domain): - mock_traverse = mock.Mock() - test_domain._traverse = mock_traverse - test_domain.init() - mock_traverse.assert_called_once() - - -def test_domain_init_does_not_call_traverse_when_false(test_domain): - mock_traverse = mock.Mock() - test_domain._traverse = mock_traverse - test_domain.init(traverse=False) - mock_traverse.assert_not_called() - - -def test_domain_init_calls_resolve_references(test_domain): - mock_resolve_references = mock.Mock() - test_domain._resolve_references = mock_resolve_references - test_domain.init(traverse=False) - mock_resolve_references.assert_called_once() - - -def test_domain_init_constructs_fact_events(test_domain): - mock_generate_fact_event_classes = mock.Mock() - test_domain._generate_fact_event_classes = mock_generate_fact_event_classes - test_domain.init(traverse=False) - mock_generate_fact_event_classes.assert_called_once() +from mock import Mock, patch + +from protean.adapters.broker import Brokers +from protean.adapters.cache import Caches +from protean.adapters.event_store import EventStore +from protean.adapters.repository import Providers + + +class TestDomainInitMethodCalls: + def test_domain_init_calls_validate_domain(self, test_domain): + mock_validate_domain = Mock() + test_domain._validate_domain = mock_validate_domain + test_domain.init(traverse=False) + mock_validate_domain.assert_called_once() + + def test_domain_init_calls_traverse(self, test_domain): + mock_traverse = Mock() + test_domain._traverse = mock_traverse + test_domain.init() + mock_traverse.assert_called_once() + + def test_domain_init_does_not_call_traverse_when_false(self, test_domain): + mock_traverse = Mock() + test_domain._traverse = mock_traverse + test_domain.init(traverse=False) + mock_traverse.assert_not_called() + + def test_domain_init_calls_resolve_references(self, test_domain): + mock_resolve_references = Mock() + test_domain._resolve_references = mock_resolve_references + test_domain.init(traverse=False) + mock_resolve_references.assert_called_once() + + def test_domain_init_constructs_fact_events(self, test_domain): + mock_generate_fact_event_classes = Mock() + test_domain._generate_fact_event_classes = mock_generate_fact_event_classes + test_domain.init(traverse=False) + mock_generate_fact_event_classes.assert_called_once() + + +class TestDomainInitializationCalls: + @patch.object(Providers, "_initialize") + def test_domain_initializes_providers(self, mock_initialize, test_domain): + test_domain._initialize() + mock_initialize.assert_called_once() + + @patch.object(Brokers, "_initialize") + def test_domain_initializes_brokers(self, mock_initialize, test_domain): + test_domain._initialize() + mock_initialize.assert_called_once() + + @patch.object(Caches, "_initialize") + def test_domain_initializes_caches(self, mock_initialize, test_domain): + test_domain._initialize() + mock_initialize.assert_called_once() + + @patch.object(EventStore, "_initialize") + def test_domain_initializes_event_store(self, mock_initialize, test_domain): + test_domain._initialize() + mock_initialize.assert_called_once() diff --git a/tests/email_provider/elements.py b/tests/email_provider/elements.py index 2cda760b..8cb09a19 100644 --- a/tests/email_provider/elements.py +++ b/tests/email_provider/elements.py @@ -21,7 +21,9 @@ def add_newcomer(cls, person_dict): ) # Publish Event via the domain - current_domain.publish(PersonAdded(**newcomer.to_dict())) + payload = newcomer.to_dict() + payload.pop("_version") + current_domain.publish(PersonAdded(**payload)) return newcomer diff --git a/tests/entity/test_lifecycle_methods.py b/tests/entity/test_lifecycle_methods.py index da5a04bf..f42afe81 100644 --- a/tests/entity/test_lifecycle_methods.py +++ b/tests/entity/test_lifecycle_methods.py @@ -8,6 +8,12 @@ class TestDefaults: + @pytest.fixture(autouse=True) + def register_elements(self, test_domain): + test_domain.register(Area) + test_domain.register(Building, part_of=Area) + test_domain.init(traverse=False) + def test_that_building_is_marked_as_done_if_above_4_floors(self): building = Building(name="Foo", floors=4) diff --git a/tests/event/test_event_metadata.py b/tests/event/test_event_metadata.py index 5c2f4d67..35fa2cbf 100644 --- a/tests/event/test_event_metadata.py +++ b/tests/event/test_event_metadata.py @@ -79,11 +79,15 @@ def test_event_metadata(): assert event._metadata is not None assert isinstance(event._metadata.timestamp, datetime) - assert event._metadata.id == f"Test.User.v1.{user.id}.0" + assert event._metadata.id == f"user-{user.id}-0" assert event.to_dict() == { "_metadata": { - "id": f"Test.User.v1.{user.id}.0", + "id": f"user-{user.id}-0", + "type": "User.UserLoggedIn.v1", + "kind": "EVENT", + "stream_name": "user", + "origin_stream_name": None, "timestamp": str(event._metadata.timestamp), "version": "v1", "sequence_id": "0", diff --git a/tests/event/test_event_payload.py b/tests/event/test_event_payload.py index 10eae7ca..5c61b1e8 100644 --- a/tests/event/test_event_payload.py +++ b/tests/event/test_event_payload.py @@ -36,7 +36,11 @@ def test_event_payload(): assert event.to_dict() == { "_metadata": { - "id": f"Test.User.v1.{user_id}.0", + "id": f"user-{user_id}-0", + "type": "User.UserLoggedIn.v1", + "kind": "EVENT", + "stream_name": "user", + "origin_stream_name": None, "timestamp": str(event._metadata.timestamp), "version": "v1", "sequence_id": "0", diff --git a/tests/event/test_stream_name_derivation.py b/tests/event/test_stream_name_derivation.py new file mode 100644 index 00000000..f407442c --- /dev/null +++ b/tests/event/test_stream_name_derivation.py @@ -0,0 +1,35 @@ +import pytest + +from protean import BaseAggregate, BaseEvent +from protean.fields import String +from protean.fields.basic import Identifier + + +class User(BaseAggregate): + email = String() + name = String() + + +class UserLoggedIn(BaseEvent): + user_id = Identifier(identifier=True) + + +def test_stream_name_from_part_of(test_domain): + test_domain.register(User) + test_domain.register(UserLoggedIn, part_of=User) + + assert UserLoggedIn.meta_.stream_name == "user" + + +def test_stream_name_from_explicit_stream_name_in_aggregate(test_domain): + test_domain.register(User, stream_name="authentication") + test_domain.register(UserLoggedIn, part_of=User) + + assert UserLoggedIn.meta_.stream_name == "authentication" + + +def test_stream_name_from_explicit_stream_name(test_domain): + test_domain.register(User) + test_domain.register(UserLoggedIn, stream_name="authentication") + + assert UserLoggedIn.meta_.stream_name == "authentication" diff --git a/tests/event/tests.py b/tests/event/tests.py index 4dee98e7..ad4bf6e6 100644 --- a/tests/event/tests.py +++ b/tests/event/tests.py @@ -46,6 +46,10 @@ class UserAdded(BaseEvent): == { "_metadata": { "id": None, # ID is none because the event is not being raised in the proper way (with `_raise`) + "type": None, # Type is none here because of the same reason as above + "kind": "EVENT", + "stream_name": None, # Type is none here because of the same reason as above + "origin_stream_name": None, "timestamp": str(event._metadata.timestamp), "version": "v1", "sequence_id": None, # Sequence is unknown as event is not being raised as part of an aggregate diff --git a/tests/event_handler/test_consuming_fact_events.py b/tests/event_handler/test_consuming_fact_events.py new file mode 100644 index 00000000..cee2f5ef --- /dev/null +++ b/tests/event_handler/test_consuming_fact_events.py @@ -0,0 +1,30 @@ +import pytest + +from protean import BaseAggregate, BaseEventHandler, BaseView, handle +from protean.fields import String +from protean.utils.mixins import Message + + +class User(BaseAggregate): + name = String(max_length=50, required=True) + email = String(required=True) + status = String(choices=["ACTIVE", "ARCHIVED"]) + + +class UserView(BaseView): + id = String(identifier=True) + name = String(max_length=50, required=True) + email = String(required=True) + status = String(required=True) + + +class ManageUserView(BaseEventHandler): + @handle("Test.UserFact.v1") + def record_user_fact_event(self, message: Message) -> None: + pass + + +@pytest.fixture(autouse=True) +def register_elements(test_domain): + test_domain.register(User, fact_events=True) + test_domain.init(traverse=False) diff --git a/tests/event_handler/test_retrieving_handlers_by_event.py b/tests/event_handler/test_retrieving_handlers_by_event.py index 76f84d9d..df33f74c 100644 --- a/tests/event_handler/test_retrieving_handlers_by_event.py +++ b/tests/event_handler/test_retrieving_handlers_by_event.py @@ -85,17 +85,20 @@ def register_elements(test_domain): def test_retrieving_handler_by_event(test_domain): + test_domain._initialize() assert test_domain.handlers_for(Registered()) == {UserEventHandler, UserMetrics} assert test_domain.handlers_for(Sent()) == {EmailEventHandler} def test_that_all_streams_handler_is_returned(test_domain): test_domain.register(AllEventsHandler, stream_name="$all") + test_domain._initialize() assert test_domain.handlers_for(Renamed()) == {AllEventsHandler} def test_that_all_streams_handler_is_always_returned_with_other_handlers(test_domain): test_domain.register(AllEventsHandler, stream_name="$all") + test_domain._initialize() assert test_domain.handlers_for(Registered()) == { UserEventHandler, diff --git a/tests/event_sourced_aggregates/events/test_event_es_metadata_id_and_sequence.py b/tests/event_sourced_aggregates/events/test_event_es_metadata_id_and_sequence.py index 48dd7406..9bad33bc 100644 --- a/tests/event_sourced_aggregates/events/test_event_es_metadata_id_and_sequence.py +++ b/tests/event_sourced_aggregates/events/test_event_es_metadata_id_and_sequence.py @@ -33,5 +33,5 @@ def test_event_is_generated_with_unique_id(): user.login() event = user._events[0] - assert event._metadata.id == f"Test.User.v1.{identifier}.0" + assert event._metadata.id == f"user-{identifier}-0" assert event._metadata.sequence_id == "0" diff --git a/tests/event_sourced_aggregates/events/test_fact_event_generation.py b/tests/event_sourced_aggregates/events/test_fact_event_generation.py index 020dcf24..62b3db58 100644 --- a/tests/event_sourced_aggregates/events/test_fact_event_generation.py +++ b/tests/event_sourced_aggregates/events/test_fact_event_generation.py @@ -64,7 +64,7 @@ def test_generation_of_first_fact_event_on_persistence(test_domain): assert event.email == "john.doe@example.com" # Check event versions - assert event._metadata.id.endswith(".0") + assert event._metadata.id.endswith("-0") assert event._metadata.sequence_id == "0" assert event._version == 0 @@ -101,7 +101,7 @@ def test_generation_of_subsequent_fact_events_after_fetch(test_domain): assert event.__class__.__name__ == "UserFactEvent" assert event.name == "John Doe" - assert event._metadata.id.endswith(".0") + assert event._metadata.id.endswith("-0") assert event._metadata.sequence_id == "0" assert event._version == 0 @@ -111,6 +111,6 @@ def test_generation_of_subsequent_fact_events_after_fetch(test_domain): assert event.__class__.__name__ == "UserFactEvent" assert event.name == "Jane Doe" - assert event._metadata.id.endswith(".1") + assert event._metadata.id.endswith("-1") assert event._metadata.sequence_id == "1" assert event._version == 1 diff --git a/tests/event_sourced_aggregates/test_generated_event_version.py b/tests/event_sourced_aggregates/test_generated_event_version.py index fe1f2487..afc90683 100644 --- a/tests/event_sourced_aggregates/test_generated_event_version.py +++ b/tests/event_sourced_aggregates/test_generated_event_version.py @@ -71,7 +71,7 @@ def register_elements(test_domain): def test_aggregate_and_event_version_on_initialization(): user = User.register(user_id="1", name="John Doe", email="john.doe@example.com") assert user._version == 0 - assert user._events[0]._metadata.id.endswith(".0") + assert user._events[0]._metadata.id.endswith("-0") assert user._events[0]._metadata.sequence_id == "0" @@ -88,7 +88,7 @@ def test_aggregate_and_event_version_after_first_persistence(test_domain): # Deserialize event event = Message.to_object(event_messages[0]) - assert event._metadata.id.endswith(".0") + assert event._metadata.id.endswith("-0") assert event._metadata.sequence_id == "0" @@ -112,7 +112,7 @@ def test_aggregate_and_event_version_after_first_persistence_after_multiple_pers # Deserialize event event = Message.to_object(event_messages[-1]) - assert event._metadata.id.endswith(".10") + assert event._metadata.id.endswith("-10") assert event._metadata.sequence_id == "10" @@ -124,9 +124,9 @@ def test_aggregate_and_event_version_after_multiple_event_generation_in_one_upda # Check event versions before persistence assert user._version == 1 - assert user._events[0]._metadata.id.endswith(".0") + assert user._events[0]._metadata.id.endswith("-0") assert user._events[0]._metadata.sequence_id == "0" - assert user._events[1]._metadata.id.endswith(".1") + assert user._events[1]._metadata.id.endswith("-1") assert user._events[1]._metadata.sequence_id == "1" # Persist user just once @@ -143,7 +143,7 @@ def test_aggregate_and_event_version_after_multiple_event_generation_in_one_upda event1 = Message.to_object(event_messages[0]) event2 = Message.to_object(event_messages[1]) - assert event1._metadata.id.endswith(".0") + assert event1._metadata.id.endswith("-0") assert event1._metadata.sequence_id == "0" - assert event2._metadata.id.endswith(".1") + assert event2._metadata.id.endswith("-1") assert event2._metadata.sequence_id == "1" diff --git a/tests/event_store/test_appending_commands.py b/tests/event_store/test_appending_commands.py index 57419d22..c7da2384 100644 --- a/tests/event_store/test_appending_commands.py +++ b/tests/event_store/test_appending_commands.py @@ -22,6 +22,7 @@ class Register(BaseCommand): def test_command_submission_without_aggregate(test_domain): test_domain.register(User) + test_domain.init(traverse=False) with pytest.raises(IncorrectUsageError) as exc: test_domain.register(Register) diff --git a/tests/event_store/test_appending_events.py b/tests/event_store/test_appending_events.py index 911c7fa9..dcc5fa90 100644 --- a/tests/event_store/test_appending_events.py +++ b/tests/event_store/test_appending_events.py @@ -16,6 +16,7 @@ class UserLoggedIn(BaseEvent): @pytest.mark.eventstore def test_appending_raw_events(test_domain): test_domain.register(UserLoggedIn, stream_name="authentication") + test_domain.init(traverse=False) identifier = str(uuid4()) event = UserLoggedIn(user_id=identifier) diff --git a/tests/event_store/test_deriving_category.py b/tests/event_store/test_deriving_category.py index 865f4086..32099566 100644 --- a/tests/event_store/test_deriving_category.py +++ b/tests/event_store/test_deriving_category.py @@ -3,6 +3,8 @@ @pytest.mark.eventstore def test_deriving_category(test_domain): + test_domain.init(traverse=False) + assert test_domain.event_store.store.category(None) == "" assert test_domain.event_store.store.category("") == "" diff --git a/tests/event_store/test_event_store_adapter_initialization.py b/tests/event_store/test_event_store_adapter_initialization.py index 64769d16..8c3500a5 100644 --- a/tests/event_store/test_event_store_adapter_initialization.py +++ b/tests/event_store/test_event_store_adapter_initialization.py @@ -10,6 +10,6 @@ def test_domain_event_store_attribute(test_domain): @mock.patch("protean.adapters.event_store.EventStore._initialize") def test_event_store_initialization(mock_store_initialize, test_domain): - test_domain.event_store.store # Initializes store if not initialized already + test_domain._initialize() mock_store_initialize.assert_called_once() diff --git a/tests/event_store/test_inline_event_processing_on_publish.py b/tests/event_store/test_inline_event_processing_on_publish.py index e5d1a4ad..0d1720fe 100644 --- a/tests/event_store/test_inline_event_processing_on_publish.py +++ b/tests/event_store/test_inline_event_processing_on_publish.py @@ -38,6 +38,7 @@ def registered(self, _: Registered) -> None: def test_inline_event_processing_on_publish_in_sync_mode(test_domain): test_domain.register(Registered, stream_name="user") test_domain.register(UserEventHandler, stream_name="user") + test_domain.init(traverse=False) current_domain.publish( Registered( diff --git a/tests/event_store/test_reading_all_streams.py b/tests/event_store/test_reading_all_streams.py index dcede259..11a14c7c 100644 --- a/tests/event_store/test_reading_all_streams.py +++ b/tests/event_store/test_reading_all_streams.py @@ -80,6 +80,8 @@ def register_elements(test_domain): test_domain.register(Created, part_of=Post) test_domain.register(Published, part_of=Post) + test_domain.init(traverse=False) + @pytest.mark.eventstore def test_reading_messages_from_all_streams(test_domain): diff --git a/tests/event_store/test_streams_initialization.py b/tests/event_store/test_streams_initialization.py index 299b3e6a..03b5ff10 100644 --- a/tests/event_store/test_streams_initialization.py +++ b/tests/event_store/test_streams_initialization.py @@ -60,11 +60,10 @@ def register(test_domain): test_domain.register(Email) test_domain.register(UserEventHandler, part_of=User) test_domain.register(EmailEventHandler, part_of=Email) + test_domain.init(traverse=False) def test_streams_initialization(test_domain): - test_domain.event_store.store # Initializes store if not initialized already - assert len(test_domain.event_store._event_streams) == 2 assert all( stream_name in test_domain.event_store._event_streams diff --git a/tests/message/test_object_to_message.py b/tests/message/test_object_to_message.py index 888a0692..59ef36e3 100644 --- a/tests/message/test_object_to_message.py +++ b/tests/message/test_object_to_message.py @@ -62,7 +62,6 @@ def test_construct_message_from_event(test_domain): assert message.type == fully_qualified_name(Registered) assert message.stream_name == f"{User.meta_.stream_name}-{identifier}" assert message.metadata.kind == "EVENT" - assert message.metadata.owner == test_domain.name assert message.data == user._events[-1].to_dict() assert message.time is None assert message.expected_version == user._version - 1 @@ -72,7 +71,6 @@ def test_construct_message_from_event(test_domain): assert message_dict["type"] == fully_qualified_name(Registered) assert message_dict["metadata"]["kind"] == "EVENT" - assert message_dict["metadata"]["owner"] == test_domain.name assert message_dict["stream_name"] == f"{User.meta_.stream_name}-{identifier}" assert message_dict["data"] == user._events[-1].to_dict() assert message_dict["time"] is None @@ -94,7 +92,6 @@ def test_construct_message_from_command(test_domain): assert message.type == fully_qualified_name(Register) assert message.stream_name == f"{User.meta_.stream_name}:command-{identifier}" assert message.metadata.kind == "COMMAND" - assert message.metadata.owner == test_domain.name assert message.data == command.to_dict() assert message.time is None @@ -102,7 +99,6 @@ def test_construct_message_from_command(test_domain): message_dict = message.to_dict() assert message_dict["type"] == fully_qualified_name(Register) assert message_dict["metadata"]["kind"] == "COMMAND" - assert message_dict["metadata"]["owner"] == test_domain.name assert ( message_dict["stream_name"] == f"{User.meta_.stream_name}:command-{identifier}" ) diff --git a/tests/message/test_origin_stream_name_in_metadata.py b/tests/message/test_origin_stream_name_in_metadata.py index 6d578a8b..7a25f5d8 100644 --- a/tests/message/test_origin_stream_name_in_metadata.py +++ b/tests/message/test_origin_stream_name_in_metadata.py @@ -3,10 +3,11 @@ import pytest from protean import BaseCommand, BaseEvent, BaseEventSourcedAggregate +from protean.core.event import Metadata from protean.fields import String from protean.fields.basic import Identifier from protean.globals import g -from protean.utils.mixins import Message, MessageMetadata +from protean.utils.mixins import Message class User(BaseEventSourcedAggregate): @@ -76,9 +77,9 @@ def test_origin_stream_name_in_event_from_command_without_origin_stream_name(use def test_origin_stream_name_in_event_from_command_with_origin_stream_name(user_id): command_message = register_command_message(user_id) - command_message.metadata = MessageMetadata( + command_message.metadata = Metadata( command_message.metadata.to_dict(), origin_stream_name="foo" - ) # MessageMetadata is a VO and immutable, so creating a copy with updated value + ) # Metadata is a VO and immutable, so creating a copy with updated value g.message_in_context = command_message event_message = Message.to_message( @@ -118,9 +119,9 @@ def test_origin_stream_name_in_aggregate_event_from_command_with_origin_stream_n ): command_message = register_command_message(user_id) - command_message.metadata = MessageMetadata( + command_message.metadata = Metadata( command_message.metadata.to_dict(), origin_stream_name="foo" - ) # MessageMetadata is a VO and immutable, so creating a copy with updated value + ) # Metadata is a VO and immutable, so creating a copy with updated value g.message_in_context = command_message user = User( diff --git a/tests/subscription/test_message_filtering_with_origin_stream.py b/tests/subscription/test_message_filtering_with_origin_stream.py index d02a4255..14cfd481 100644 --- a/tests/subscription/test_message_filtering_with_origin_stream.py +++ b/tests/subscription/test_message_filtering_with_origin_stream.py @@ -7,10 +7,11 @@ import pytest from protean import BaseEvent, BaseEventHandler, BaseEventSourcedAggregate, handle +from protean.core.event import Metadata from protean.fields import DateTime, Identifier, String from protean.server import Engine from protean.utils import fqn -from protean.utils.mixins import Message, MessageMetadata +from protean.utils.mixins import Message class User(BaseEventSourcedAggregate): @@ -98,9 +99,9 @@ async def test_message_filtering_for_event_handlers_with_defined_origin_stream( Message.to_aggregate_event_message(email, email._events[0]), ] - messages[2].metadata = MessageMetadata( + messages[2].metadata = Metadata( messages[2].metadata.to_dict(), origin_stream_name=f"user-{identifier}" - ) # MessageMetadata is a VO and immutable, so creating a copy with updated value + ) # Metadata is a VO and immutable, so creating a copy with updated value # Mock `read` method and have it return the 3 messages mock_store_read = mock.Mock() diff --git a/tests/subscription/test_no_message_filtering.py b/tests/subscription/test_no_message_filtering.py index 3b5345ed..1436232c 100644 --- a/tests/subscription/test_no_message_filtering.py +++ b/tests/subscription/test_no_message_filtering.py @@ -7,10 +7,11 @@ import pytest from protean import BaseEvent, BaseEventHandler, BaseEventSourcedAggregate, handle +from protean.core.event import Metadata from protean.fields import DateTime, Identifier, String from protean.server import Engine from protean.utils import fqn -from protean.utils.mixins import Message, MessageMetadata +from protean.utils.mixins import Message class User(BaseEventSourcedAggregate): @@ -97,9 +98,9 @@ async def test_no_filtering_for_event_handlers_without_defined_origin_stream( Message.to_aggregate_event_message(email, email._events[0]), ] - messages[2].metadata = MessageMetadata( + messages[2].metadata = Metadata( messages[2].metadata.to_dict(), origin_stream_name=f"user-{identifier}" - ) # MessageMetadata is a VO and immutable, so creating a copy with updated value + ) # Metadata is a VO and immutable, so creating a copy with updated value # Mock `read` method and have it return the 3 messages mock_store_read = mock.Mock() diff --git a/tests/unit_of_work/test_inline_event_processing.py b/tests/unit_of_work/test_inline_event_processing.py index 4d9dcd34..f54eeca0 100644 --- a/tests/unit_of_work/test_inline_event_processing.py +++ b/tests/unit_of_work/test_inline_event_processing.py @@ -96,6 +96,7 @@ def test_inline_event_processing_in_sync_mode(test_domain): test_domain.register(Registered, part_of=User) test_domain.register(UserEventHandler, part_of=User) test_domain.register(UserMetrics, part_of=User) + test_domain.init(traverse=False) identifier = str(uuid4()) UserCommandHandler().register_user( diff --git a/tests/unit_of_work/test_nested_inline_event_processing.py b/tests/unit_of_work/test_nested_inline_event_processing.py index 8f4f9268..55bbbe66 100644 --- a/tests/unit_of_work/test_nested_inline_event_processing.py +++ b/tests/unit_of_work/test_nested_inline_event_processing.py @@ -99,6 +99,7 @@ def test_nested_uow_processing(test_domain): test_domain.register(Published, part_of=Post) test_domain.register(PostEventHandler, part_of=Post) test_domain.register(Metrics, stream_name="post") + test_domain.init(traverse=False) identifier = str(uuid4()) PostCommandHandler().create_new_post(