From 841424a51904e75bb6a6bceb2e9fd87809b229f6 Mon Sep 17 00:00:00 2001 From: Subhash Bhushan Date: Fri, 5 Jul 2024 10:20:25 -0700 Subject: [PATCH] Stream enhancements - Part 2 Changes: - Ensure stream names have aggregate id within them - Use `stream_name` in Event/Command metadata as-is for persistence into event store - Base handlers and handler retreival on stream name - Ensure commands always belong to an aggregate cluster - Remove `stream_name` option from events and and commands All elements in an aggregate cluster should use the aggregate's stream name - Enhance `Domain.process` to enrich command before processing --- src/protean/adapters/event_store/__init__.py | 14 ++- src/protean/adapters/event_store/memory.py | 1 - src/protean/container.py | 19 ++- src/protean/core/command.py | 28 ++--- src/protean/core/command_handler.py | 13 +- src/protean/core/entity.py | 13 +- src/protean/core/event.py | 17 +-- src/protean/domain/__init__.py | 55 +++++++-- src/protean/utils/mixins.py | 52 +------- .../test_automatic_stream_association.py | 10 +- tests/command/test_command_meta.py | 15 +-- ...st_handle_decorator_in_command_handlers.py | 5 + .../test_inline_command_processing.py | 2 + .../test_retrieving_handlers_by_command.py | 10 +- .../test_automatic_stream_association.py | 115 ------------------ tests/event/test_event_meta.py | 9 -- tests/event/test_event_metadata.py | 2 +- tests/event/test_event_part_of_resolution.py | 5 +- tests/event/test_event_payload.py | 2 +- tests/event/test_event_properties.py | 7 ++ tests/event/test_raising_events.py | 4 +- tests/event/test_stream_name_derivation.py | 13 +- tests/event/tests.py | 20 ++- .../test_event_association_with_aggregate.py | 7 +- ...tiple_events_for_one_aggregate_in_a_uow.py | 2 + .../event_sourced_repository/test_add_uow.py | 1 + tests/event_store/test_appending_commands.py | 2 +- tests/event_store/test_appending_events.py | 15 ++- ...test_inline_event_processing_on_publish.py | 29 +++-- tests/message/test_object_to_message.py | 25 ++-- .../test_origin_stream_name_in_metadata.py | 54 ++++---- tests/server/test_engine_run.py | 27 ++-- .../test_read_position_updates.py | 2 +- tests/test_brokers.py | 46 ++++--- 34 files changed, 301 insertions(+), 340 deletions(-) delete mode 100644 tests/event/test_automatic_stream_association.py diff --git a/src/protean/adapters/event_store/__init__.py b/src/protean/adapters/event_store/__init__.py index 4e21d45c..be7be755 100644 --- a/src/protean/adapters/event_store/__init__.py +++ b/src/protean/adapters/event_store/__init__.py @@ -93,7 +93,9 @@ def handlers_for(self, event: BaseEvent) -> List[BaseEventHandler]: all_stream_handlers = self._event_streams.get("$all", set()) # Gather all handlers configured to run on this event - stream_handlers = self._event_streams.get(event.meta_.stream_name, set()) + stream_handlers = self._event_streams.get( + event.meta_.part_of.meta_.stream_name, set() + ) configured_stream_handlers = set() for stream_handler in stream_handlers: if fqn(event.__class__) in stream_handler._handlers: @@ -102,12 +104,12 @@ def handlers_for(self, event: BaseEvent) -> List[BaseEventHandler]: return set.union(configured_stream_handlers, all_stream_handlers) def command_handler_for(self, command: BaseCommand) -> Optional[BaseCommandHandler]: - stream_name = command.meta_.stream_name or ( - command.meta_.part_of.meta_.stream_name if command.meta_.part_of else None - ) + if not command.meta_.part_of: + raise ConfigurationError( + f"Command `{command.__name__}` needs to be associated with an aggregate" + ) - if not stream_name: - return None + stream_name = command.meta_.part_of.meta_.stream_name handler_classes = self._command_streams.get(stream_name, set()) diff --git a/src/protean/adapters/event_store/memory.py b/src/protean/adapters/event_store/memory.py index 1317649d..aa558b5c 100644 --- a/src/protean/adapters/event_store/memory.py +++ b/src/protean/adapters/event_store/memory.py @@ -2,7 +2,6 @@ from typing import Any, Dict, List from protean.core.aggregate import BaseAggregate -from protean.core.event import Metadata from protean.core.repository import BaseRepository from protean.globals import current_domain from protean.port.event_store import BaseEventStore diff --git a/src/protean/container.py b/src/protean/container.py index edbfec3a..1447bc63 100644 --- a/src/protean/container.py +++ b/src/protean/container.py @@ -8,12 +8,12 @@ from typing import Any, Type, Union from protean.exceptions import ( + ConfigurationError, InvalidDataError, NotSupportedError, ValidationError, ) from protean.fields import Auto, Field, FieldBase, ValueObject -from protean.globals import g from protean.reflection import id_field from protean.utils import generate_identity @@ -388,18 +388,31 @@ def raise_(self, event, fact_event=False) -> None: Event is immutable, so we clone a new event object from the event raised, and add the enhanced metadata to it. """ + # Verify that event is indeed associated with this aggregate + if event.meta_.part_of != self.__class__: + raise ConfigurationError( + f"Event `{event.__class__.__name__}` is not associated with " + f"aggregate `{self.__class__.__name__}`" + ) + if not fact_event: self._version += 1 identifier = getattr(self, id_field(self).field_name) + # Set Fact Event stream to be `-fact` + if event.__class__.__name__.endswith("FactEvent"): + stream_name = f"{self.meta_.stream_name}-fact" + else: + stream_name = self.meta_.stream_name + event_with_metadata = event.__class__( event.to_dict(), _metadata={ - "id": (f"{self.meta_.stream_name}-{identifier}-{self._version}"), + "id": (f"{stream_name}-{identifier}-{self._version}"), "type": f"{self.__class__.__name__}.{event.__class__.__name__}.{event._metadata.version}", "kind": "EVENT", - "stream_name": self.meta_.stream_name, + "stream_name": f"{stream_name}-{identifier}", "origin_stream_name": event._metadata.origin_stream_name, "timestamp": event._metadata.timestamp, "version": event._metadata.version, diff --git a/src/protean/core/command.py b/src/protean/core/command.py index 62da2aad..f0380a81 100644 --- a/src/protean/core/command.py +++ b/src/protean/core/command.py @@ -8,7 +8,7 @@ ) from protean.fields import Field, ValueObject from protean.globals import g -from protean.reflection import _ID_FIELD_NAME, declared_fields +from protean.reflection import _ID_FIELD_NAME, declared_fields, fields from protean.utils import DomainObjects, derive_element_class @@ -64,6 +64,15 @@ def __init__(self, *args, **kwargs): except ValidationError as exception: raise InvalidDataError(exception.messages) + @property + def payload(self): + """Return the payload of the event.""" + return { + field_name: field_obj.as_dict(getattr(self, field_name, None)) + for field_name, field_obj in fields(self).items() + if field_name not in {"_metadata"} + } + def __setattr__(self, name, value): if not hasattr(self, "_initialized") or not self._initialized: return super().__setattr__(name, value) @@ -78,22 +87,10 @@ def __setattr__(self, name, value): @classmethod def _default_options(cls): - part_of = ( - getattr(cls.meta_, "part_of") if hasattr(cls.meta_, "part_of") else None - ) - - # This method is called during class import, so we cannot use part_of if it - # is still a string. We ignore it for now, and resolve `stream_name` later - # when the domain has resolved references. - # FIXME A better mechanism would be to not set stream_name here, unless explicitly - # specified, and resolve it during `domain.init()` - part_of = None if isinstance(part_of, str) else part_of - return [ ("abstract", False), ("aggregate_cluster", None), ("part_of", None), - ("stream_name", part_of.meta_.stream_name if part_of else None), ] @classmethod @@ -119,10 +116,7 @@ def __track_id_field(subclass): def command_factory(element_cls, **kwargs): element_cls = derive_element_class(element_cls, BaseCommand, **kwargs) - if ( - not (element_cls.meta_.part_of or element_cls.meta_.stream_name) - and not element_cls.meta_.abstract - ): + if not element_cls.meta_.part_of and not element_cls.meta_.abstract: raise IncorrectUsageError( { "_command": [ diff --git a/src/protean/core/command_handler.py b/src/protean/core/command_handler.py index 41edd118..2a2e3706 100644 --- a/src/protean/core/command_handler.py +++ b/src/protean/core/command_handler.py @@ -75,12 +75,23 @@ def command_handler_factory(element_cls, **kwargs): } ) + # Throw error if target_cls is not associated with an aggregate + if not method._target_cls.meta_.part_of: + raise IncorrectUsageError( + { + "_command_handler": [ + f"Command `{method._target_cls.__name__}` in Command Handler `{element_cls.__name__}` " + "is not associated with an aggregate" + ] + } + ) + # Associate Command with the handler's stream # Order of preference: # 1. Stream name defined in command # 2. Stream name derived from aggregate associated with command handler method._target_cls.meta_.stream_name = ( - method._target_cls.meta_.stream_name + method._target_cls.meta_.part_of.meta_.stream_name or element_cls.meta_.part_of.meta_.stream_name ) diff --git a/src/protean/core/entity.py b/src/protean/core/entity.py index 0e1ad4ab..f58719b1 100644 --- a/src/protean/core/entity.py +++ b/src/protean/core/entity.py @@ -11,7 +11,6 @@ from protean.exceptions import IncorrectUsageError, NotSupportedError, ValidationError from protean.fields import Auto, HasMany, Reference, ValueObject from protean.fields.association import Association -from protean.globals import g from protean.reflection import ( _FIELDS, attributes, @@ -443,17 +442,23 @@ def raise_(self, event) -> None: # in the same edit session event_number = len(self._root._events) + 1 - identifier = getattr(self, id_field(self).field_name) + identifier = getattr(self._root, id_field(self._root).field_name) + + # Set Fact Event stream to be `-fact` + if event.__class__.__name__.endswith("FactEvent"): + stream_name = f"{self._root.meta_.stream_name}-fact" + else: + stream_name = self._root.meta_.stream_name event_with_metadata = event.__class__( event.to_dict(), _metadata={ "id": ( - f"{self._root.meta_.stream_name}-{identifier}-{aggregate_version}.{event_number}" + f"{stream_name}-{identifier}-{aggregate_version}.{event_number}" ), "type": f"{self._root.__class__.__name__}.{event.__class__.__name__}.{event._metadata.version}", "kind": "EVENT", - "stream_name": self._root.meta_.stream_name, + "stream_name": f"{stream_name}-{identifier}", "origin_stream_name": event._metadata.origin_stream_name, "timestamp": event._metadata.timestamp, "version": event._metadata.version, diff --git a/src/protean/core/event.py b/src/protean/core/event.py index e7e454d8..c6c4afff 100644 --- a/src/protean/core/event.py +++ b/src/protean/core/event.py @@ -89,22 +89,10 @@ def __setattr__(self, name, value): @classmethod def _default_options(cls): - part_of = ( - getattr(cls.meta_, "part_of") if hasattr(cls.meta_, "part_of") else None - ) - - # This method is called during class import, so we cannot use part_of if it - # is still a string. We ignore it for now, and resolve `stream_name` later - # when the domain has resolved references. - # FIXME A better mechanism would be to not set stream_name here, unless explicitly - # specified, and resolve it during `domain.init()` - part_of = None if isinstance(part_of, str) else part_of - return [ ("abstract", False), ("aggregate_cluster", None), ("part_of", None), - ("stream_name", part_of.meta_.stream_name if part_of else None), ] @classmethod @@ -189,10 +177,7 @@ def to_dict(self): def domain_event_factory(element_cls, **kwargs): element_cls = derive_element_class(element_cls, BaseEvent, **kwargs) - if ( - not (element_cls.meta_.part_of or element_cls.meta_.stream_name) - and not element_cls.meta_.abstract - ): + if not element_cls.meta_.part_of and not element_cls.meta_.abstract: raise IncorrectUsageError( { "_event": [ diff --git a/src/protean/domain/__init__.py b/src/protean/domain/__init__.py index a0192590..1a4f4d64 100644 --- a/src/protean/domain/__init__.py +++ b/src/protean/domain/__init__.py @@ -3,11 +3,13 @@ """ import inspect +import json import logging import sys from collections import defaultdict from functools import lru_cache from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from uuid import uuid4 from werkzeug.datastructures import ImmutableDict @@ -27,7 +29,8 @@ NotSupportedError, ) from protean.fields import HasMany, HasOne, Reference, ValueObject -from protean.reflection import declared_fields, has_fields +from protean.globals import g +from protean.reflection import declared_fields, has_fields, id_field from protean.utils import ( CommandProcessing, DomainObjects, @@ -804,11 +807,7 @@ def _generate_fact_event_classes(self): for _, element in self.registry._elements[element_type.value].items(): if element.cls.meta_.fact_events: event_cls = element_to_fact_event(element.cls) - self.register( - event_cls, - part_of=element.cls, - stream_name=element.cls.meta_.stream_name + "-fact", - ) + self.register(event_cls, part_of=element.cls) ###################### # Element Decorators # @@ -935,6 +934,47 @@ def publish(self, events: Union[BaseEvent, List[BaseEvent]]) -> None: ##################### # Handling Commands # ##################### + def _enrich_command(self, command: BaseCommand) -> BaseCommand: + # Enrich Command + identifier = None + identity_field = id_field(command) + if identity_field: + identifier = getattr(command, identity_field.field_name) + else: + identifier = str(uuid4()) + + stream_name = f"{command.meta_.part_of.meta_.stream_name}:command-{identifier}" + + origin_stream_name = None + if hasattr(g, "message_in_context"): + if g.message_in_context.metadata.kind == "EVENT": + origin_stream_name = g.message_in_context.stream_name + + command_with_metadata = command.__class__( + command.to_dict(), + _metadata={ + "id": (str(uuid4())), + "type": ( + f"{command.meta_.part_of.__class__.__name__}.{command.__class__.__name__}." + f"{command._metadata.version}" + ), + "kind": "EVENT", + "stream_name": stream_name, + "origin_stream_name": origin_stream_name, + "timestamp": command._metadata.timestamp, + "version": command._metadata.version, + "sequence_id": None, + "payload_hash": hash( + json.dumps( + command.payload, + sort_keys=True, + ) + ), + }, + ) + + return command_with_metadata + def process(self, command: BaseCommand, asynchronous: bool = True) -> Optional[Any]: """Process command and return results based on specified preference. @@ -950,7 +990,8 @@ def process(self, command: BaseCommand, asynchronous: bool = True) -> Optional[A Returns: Optional[Any]: Returns either the command handler's return value or nothing, based on preference. """ - position = self.event_store.store.append(command) + command_with_metadata = self._enrich_command(command) + position = self.event_store.store.append(command_with_metadata) if ( not asynchronous diff --git a/src/protean/utils/mixins.py b/src/protean/utils/mixins.py index 35b6191b..314a53d5 100644 --- a/src/protean/utils/mixins.py +++ b/src/protean/utils/mixins.py @@ -5,7 +5,6 @@ from collections import defaultdict from enum import Enum from typing import Callable, Dict, Union -from uuid import uuid4 from protean import fields from protean.container import BaseContainer, OptionsMixin @@ -13,10 +12,8 @@ from protean.core.event import BaseEvent, Metadata from protean.core.event_sourced_aggregate import BaseEventSourcedAggregate from protean.core.unit_of_work import UnitOfWork -from protean.core.value_object import BaseValueObject from protean.exceptions import ConfigurationError -from protean.globals import current_domain, g -from protean.reflection import has_id_field, id_field +from protean.globals import current_domain from protean.utils import fully_qualified_name logger = logging.getLogger(__name__) @@ -71,29 +68,6 @@ class Message(MessageRecord, OptionsMixin): # FIXME Remove OptionsMixin # Version that the stream is expected to be when the message is written expected_version = fields.Integer() - @classmethod - def derived_metadata(cls, new_message_type: str) -> Dict: - additional_metadata = {} - - if hasattr(g, "message_in_context"): - if ( - new_message_type == "COMMAND" - and g.message_in_context.metadata.kind == "EVENT" - ): - additional_metadata["origin_stream_name"] = ( - g.message_in_context.stream_name - ) - - if ( - new_message_type == "EVENT" - and g.message_in_context.metadata.kind == "COMMAND" - and g.message_in_context.metadata.origin_stream_name is not None - ): - additional_metadata["origin_stream_name"] = ( - g.message_in_context.metadata.origin_stream_name - ) - return additional_metadata - @classmethod def from_dict(cls, message: Dict) -> Message: return Message( @@ -111,14 +85,6 @@ def from_dict(cls, message: Dict) -> Message: def to_aggregate_event_message( cls, aggregate: BaseEventSourcedAggregate, event: BaseEvent ) -> Message: - identifier = getattr(aggregate, id_field(aggregate).field_name) - - if not event.meta_.stream_name: - raise ConfigurationError( - f"No stream name found for `{event.__class__.__name__}`. " - "Either specify an explicit stream name or associate the event with an aggregate." - ) - # If this is a Fact Event, don't set an expected version. # Otherwise, expect the previous version if event.__class__.__name__.endswith("FactEvent"): @@ -127,7 +93,7 @@ def to_aggregate_event_message( expected_version = int(event._metadata.sequence_id) - 1 return cls( - stream_name=f"{event.meta_.stream_name}-{identifier}", + stream_name=event._metadata.stream_name, type=fully_qualified_name(event.__class__), data=event.to_dict(), metadata=event._metadata, @@ -151,23 +117,13 @@ def to_object(self) -> Union[BaseEvent, BaseCommand]: @classmethod def to_message(cls, message_object: Union[BaseEvent, BaseCommand]) -> Message: - if has_id_field(message_object): - identifier = getattr(message_object, id_field(message_object).field_name) - else: - identifier = str(uuid4()) - - if not message_object.meta_.stream_name: + if not message_object.meta_.part_of.meta_.stream_name: raise ConfigurationError( f"No stream name found for `{message_object.__class__.__name__}`. " "Either specify an explicit stream name or associate the event with an aggregate." ) - if isinstance(message_object, BaseEvent): - stream_name = f"{message_object.meta_.stream_name}-{identifier}" - elif isinstance(message_object, BaseCommand): - stream_name = f"{message_object.meta_.stream_name}:command-{identifier}" - else: - raise NotImplementedError # FIXME Handle unknown messages better + stream_name = message_object._metadata.stream_name return cls( stream_name=stream_name, diff --git a/tests/command/test_automatic_stream_association.py b/tests/command/test_automatic_stream_association.py index 8c71ec36..83bb4183 100644 --- a/tests/command/test_automatic_stream_association.py +++ b/tests/command/test_automatic_stream_association.py @@ -86,10 +86,10 @@ def register(test_domain): test_domain.register(Login, part_of=User) test_domain.register(Register, part_of=User) test_domain.register(Activate, part_of=User) - test_domain.register(Subscribe, stream_name="subscriptions") + test_domain.register(Subscribe, part_of=User) test_domain.register(Email) test_domain.register(Send, part_of=Email) - test_domain.register(Recall, part_of=Email, stream_name="recalls") + test_domain.register(Recall, part_of=Email) test_domain.register(UserCommandHandler, part_of=User) test_domain.register(EmailCommandHandler, part_of=Email) @@ -101,11 +101,11 @@ def test_automatic_association_of_events_with_aggregate_and_stream(): assert Activate.meta_.part_of is User assert Activate.meta_.stream_name == "user" - assert Subscribe.meta_.part_of is None - assert Subscribe.meta_.stream_name == "subscriptions" + assert Subscribe.meta_.part_of is User + assert Subscribe.meta_.stream_name == "user" assert Send.meta_.part_of is Email assert Send.meta_.stream_name == "email" assert Recall.meta_.part_of is Email - assert Recall.meta_.stream_name == "recalls" + assert Recall.meta_.stream_name == "email" diff --git a/tests/command/test_command_meta.py b/tests/command/test_command_meta.py index 1f76324e..8b5a250a 100644 --- a/tests/command/test_command_meta.py +++ b/tests/command/test_command_meta.py @@ -67,8 +67,10 @@ def test_command_associated_with_aggregate(test_domain): @pytest.mark.eventstore -def test_command_associated_with_stream_name(test_domain): - test_domain.register(Register, stream_name="foo") +def test_command_associated_with_aggregate_with_custom_stream_name(test_domain): + test_domain.register(User, stream_name="foo") + test_domain.register(Register, part_of=User) + test_domain.init(traverse=False) identifier = str(uuid4()) test_domain.process( @@ -91,12 +93,3 @@ def test_aggregate_cluster_of_event(test_domain): test_domain.init(traverse=False) assert Register.meta_.aggregate_cluster == User - - -def test_no_aggregate_cluster_for_command_with_stream(test_domain): - class SendEmail(BaseCommand): - email = String() - - test_domain.register(SendEmail, stream_name="email") - - assert SendEmail.meta_.aggregate_cluster is None diff --git a/tests/command_handler/test_handle_decorator_in_command_handlers.py b/tests/command_handler/test_handle_decorator_in_command_handlers.py index 127b1f27..3c5a6e85 100644 --- a/tests/command_handler/test_handle_decorator_in_command_handlers.py +++ b/tests/command_handler/test_handle_decorator_in_command_handlers.py @@ -29,6 +29,7 @@ def register(self, command: Register) -> None: pass test_domain.register(User) + test_domain.register(Register, part_of=User) test_domain.register(UserCommandHandlers, part_of=User) assert fully_qualified_name(Register) in UserCommandHandlers._handlers @@ -45,7 +46,10 @@ def update_billing_address(self, event: ChangeAddress) -> None: pass test_domain.register(User) + test_domain.register(Register, part_of=User) + test_domain.register(ChangeAddress, part_of=User) test_domain.register(UserCommandHandlers, part_of=User) + test_domain.init(traverse=False) assert len(UserCommandHandlers._handlers) == 2 assert all( @@ -82,6 +86,7 @@ def provision_user_account(self, event: Register) -> None: with pytest.raises(NotSupportedError) as exc: test_domain.register(User) + test_domain.register(Register, part_of=User) test_domain.register(UserCommandHandlers, part_of=User) assert ( diff --git a/tests/command_handler/test_inline_command_processing.py b/tests/command_handler/test_inline_command_processing.py index 958e0d9f..baf3ed32 100644 --- a/tests/command_handler/test_inline_command_processing.py +++ b/tests/command_handler/test_inline_command_processing.py @@ -28,6 +28,7 @@ def register(self, event: Register) -> None: def test_that_command_can_be_processed_inline(test_domain): test_domain.register(User) + test_domain.register(Register, part_of=User) test_domain.register(UserCommandHandlers, part_of=User) test_domain.init(traverse=False) @@ -39,6 +40,7 @@ def test_that_command_can_be_processed_inline(test_domain): def test_that_command_is_persisted_in_message_store(test_domain): test_domain.register(User) + test_domain.register(Register, part_of=User) test_domain.register(UserCommandHandlers, part_of=User) test_domain.init(traverse=False) diff --git a/tests/command_handler/test_retrieving_handlers_by_command.py b/tests/command_handler/test_retrieving_handlers_by_command.py index bc735279..738c0478 100644 --- a/tests/command_handler/test_retrieving_handlers_by_command.py +++ b/tests/command_handler/test_retrieving_handlers_by_command.py @@ -2,7 +2,7 @@ from protean import BaseCommand, BaseEventSourcedAggregate, handle from protean.core.command_handler import BaseCommandHandler -from protean.exceptions import NotSupportedError +from protean.exceptions import ConfigurationError, NotSupportedError from protean.fields import Identifier, String, Text @@ -76,7 +76,13 @@ def test_for_no_errors_when_no_handler_method_has_not_been_defined_for_a_command def test_retrieving_handlers_for_unknown_command(test_domain): - assert test_domain.command_handler_for(UnknownCommand) is None + with pytest.raises(ConfigurationError) as exc: + test_domain.command_handler_for(UnknownCommand) + + assert ( + exc.value.args[0] + == "Command `UnknownCommand` needs to be associated with an aggregate" + ) def test_error_on_defining_multiple_handlers_for_a_command(test_domain): diff --git a/tests/event/test_automatic_stream_association.py b/tests/event/test_automatic_stream_association.py deleted file mode 100644 index 6d5a2c98..00000000 --- a/tests/event/test_automatic_stream_association.py +++ /dev/null @@ -1,115 +0,0 @@ -from __future__ import annotations - -import pytest - -from protean import BaseEvent, BaseEventHandler, BaseEventSourcedAggregate, handle -from protean.fields import DateTime, Identifier, String - - -class User(BaseEventSourcedAggregate): - email = String() - name = String() - password_hash = String() - - -class Email(BaseEventSourcedAggregate): - email = String() - sent_at = DateTime() - - -class Registered(BaseEvent): - id = Identifier() - email = String() - name = String() - password_hash = String() - - -class Activated(BaseEvent): - id = Identifier() - activated_at = DateTime() - - -class LoggedIn(BaseEvent): - id = Identifier() - activated_at = DateTime() - - -class Subscribed(BaseEvent): - """An event generated by an external system in its own stream, - that is consumed and stored as part of the User aggregate. - """ - - id = Identifier() - - -class Sent(BaseEvent): - email = String() - sent_at = DateTime() - - -class Recalled(BaseEvent): - email = String() - sent_at = DateTime() - - -class UserEventHandler(BaseEventHandler): - @handle(Registered) - def send_activation_email(self, _: Registered) -> None: - pass - - @handle(Activated) - def provision_user(self, _: Activated) -> None: - pass - - @handle(Activated) - def send_welcome_email(self, _: Activated) -> None: - pass - - @handle(LoggedIn) - def record_login(self, _: LoggedIn) -> None: - pass - - @handle(Subscribed) - def subscribed_for_notifications(self, _: Subscribed) -> None: - pass - - -class EmailEventHandler(BaseEventHandler): - @handle(Sent) - def record_sent_email(self, _: Sent) -> None: - pass - - @handle(Recalled) - def record_recalls(self, _: Recalled) -> None: - pass - - -@pytest.fixture(autouse=True) -def register(test_domain): - test_domain.register(User) - test_domain.register(Registered, stream_name="user") - test_domain.register(Activated, stream_name="user") - test_domain.register(LoggedIn, part_of=User) - test_domain.register(Subscribed, stream_name="subscriptions") - test_domain.register(Email) - test_domain.register(Sent, stream_name="email") - test_domain.register(Recalled, part_of=Email, stream_name="recalls") - test_domain.register(UserEventHandler, part_of=User) - test_domain.register(EmailEventHandler, part_of=Email) - - -def test_automatic_association_of_events_with_aggregate_and_stream(): - assert Registered.meta_.part_of is None - assert Registered.meta_.stream_name == "user" - - assert Activated.meta_.part_of is None - assert Activated.meta_.stream_name == "user" - - assert Subscribed.meta_.part_of is None - assert Subscribed.meta_.stream_name == "subscriptions" - - assert Sent.meta_.part_of is None - assert Sent.meta_.stream_name == "email" - - assert Recalled.meta_.part_of is Email - assert Recalled.meta_.stream_name == "recalls" diff --git a/tests/event/test_event_meta.py b/tests/event/test_event_meta.py index df2979c4..81457de5 100644 --- a/tests/event/test_event_meta.py +++ b/tests/event/test_event_meta.py @@ -52,12 +52,3 @@ def test_that_part_of_is_resolved_correctly(): def test_aggregate_cluster_of_event(): assert UserLoggedIn.meta_.aggregate_cluster == User - - -def test_no_aggregate_cluster_for_command_with_stream(test_domain): - class EmailSent(BaseEvent): - email = String() - - test_domain.register(EmailSent, stream_name="email") - - assert EmailSent.meta_.aggregate_cluster is None diff --git a/tests/event/test_event_metadata.py b/tests/event/test_event_metadata.py index 35fa2cbf..c7c04fc2 100644 --- a/tests/event/test_event_metadata.py +++ b/tests/event/test_event_metadata.py @@ -86,7 +86,7 @@ def test_event_metadata(): "id": f"user-{user.id}-0", "type": "User.UserLoggedIn.v1", "kind": "EVENT", - "stream_name": "user", + "stream_name": f"user-{user.id}", "origin_stream_name": None, "timestamp": str(event._metadata.timestamp), "version": "v1", diff --git a/tests/event/test_event_part_of_resolution.py b/tests/event/test_event_part_of_resolution.py index 080a52be..f8a4f15b 100644 --- a/tests/event/test_event_part_of_resolution.py +++ b/tests/event/test_event_part_of_resolution.py @@ -22,10 +22,11 @@ def register_elements(test_domain): def test_event_does_not_have_stream_name_before_domain_init(): - assert UserLoggedIn.meta_.stream_name is None + assert isinstance(UserLoggedIn.meta_.part_of, str) def test_event_has_stream_name_after_domain_init(test_domain): test_domain.init(traverse=False) - assert UserLoggedIn.meta_.stream_name == "user" + assert UserLoggedIn.meta_.part_of == User + assert UserLoggedIn.meta_.part_of.meta_.stream_name == "user" diff --git a/tests/event/test_event_payload.py b/tests/event/test_event_payload.py index 5c61b1e8..23f07229 100644 --- a/tests/event/test_event_payload.py +++ b/tests/event/test_event_payload.py @@ -39,7 +39,7 @@ def test_event_payload(): "id": f"user-{user_id}-0", "type": "User.UserLoggedIn.v1", "kind": "EVENT", - "stream_name": "user", + "stream_name": f"user-{user_id}", "origin_stream_name": None, "timestamp": str(event._metadata.timestamp), "version": "v1", diff --git a/tests/event/test_event_properties.py b/tests/event/test_event_properties.py index 7846dae9..f72b826b 100644 --- a/tests/event/test_event_properties.py +++ b/tests/event/test_event_properties.py @@ -20,6 +20,13 @@ class Registered(BaseEvent): name = String() +@pytest.fixture(autouse=True) +def register_elements(test_domain): + test_domain.register(User) + test_domain.register(Registered, part_of=User) + test_domain.init(traverse=False) + + def test_that_events_are_immutable(): event = Registered(email="john.doe@gmail.com", name="John Doe", user_id="1234") with pytest.raises(IncorrectUsageError): diff --git a/tests/event/test_raising_events.py b/tests/event/test_raising_events.py index b91daba0..b524c56e 100644 --- a/tests/event/test_raising_events.py +++ b/tests/event/test_raising_events.py @@ -19,8 +19,8 @@ class UserLoggedIn(BaseEvent): @pytest.mark.eventstore def test_raising_event(test_domain): - test_domain.register(User) - test_domain.register(UserLoggedIn, stream_name="authentication") + test_domain.register(User, stream_name="authentication") + test_domain.register(UserLoggedIn, part_of=User) identifier = str(uuid4()) user = User(id=identifier, email="test@example.com", name="Test User") diff --git a/tests/event/test_stream_name_derivation.py b/tests/event/test_stream_name_derivation.py index f407442c..606addbb 100644 --- a/tests/event/test_stream_name_derivation.py +++ b/tests/event/test_stream_name_derivation.py @@ -1,5 +1,3 @@ -import pytest - from protean import BaseAggregate, BaseEvent from protean.fields import String from protean.fields.basic import Identifier @@ -18,18 +16,11 @@ def test_stream_name_from_part_of(test_domain): test_domain.register(User) test_domain.register(UserLoggedIn, part_of=User) - assert UserLoggedIn.meta_.stream_name == "user" + assert UserLoggedIn.meta_.part_of.meta_.stream_name == "user" def test_stream_name_from_explicit_stream_name_in_aggregate(test_domain): test_domain.register(User, stream_name="authentication") test_domain.register(UserLoggedIn, part_of=User) - assert UserLoggedIn.meta_.stream_name == "authentication" - - -def test_stream_name_from_explicit_stream_name(test_domain): - test_domain.register(User) - test_domain.register(UserLoggedIn, stream_name="authentication") - - assert UserLoggedIn.meta_.stream_name == "authentication" + assert UserLoggedIn.meta_.part_of.meta_.stream_name == "authentication" diff --git a/tests/event/tests.py b/tests/event/tests.py index ad4bf6e6..e8b7a9b6 100644 --- a/tests/event/tests.py +++ b/tests/event/tests.py @@ -2,9 +2,9 @@ import pytest -from protean import BaseEvent, BaseValueObject +from protean import BaseAggregate, BaseEvent, BaseValueObject from protean.exceptions import NotSupportedError -from protean.fields import String, ValueObject +from protean.fields import Identifier, String, ValueObject from protean.reflection import data_fields, declared_fields, fields from protean.utils import fully_qualified_name @@ -30,12 +30,23 @@ def test_that_domain_event_can_accommodate_value_objects(self, test_domain): class Email(BaseValueObject): address = String(max_length=255) + class User(BaseAggregate): + email = ValueObject(Email, required=True) + name = String(max_length=50) + class UserAdded(BaseEvent): + id = Identifier(identifier=True) email = ValueObject(Email, required=True) name = String(max_length=50) - test_domain.register(UserAdded, stream_name="user") - event = UserAdded(email_address="john.doe@gmail.com", name="John Doe") + test_domain.register(UserAdded, part_of=User) + + user = User( + id=str(uuid.uuid4()), + email=Email(address="john.doe@gmail.com"), + name="John Doe", + ) + event = UserAdded(id=user.id, email_address=user.email_address, name=user.name) assert event is not None assert event.email == Email(address="john.doe@gmail.com") @@ -59,6 +70,7 @@ class UserAdded(BaseEvent): "address": "john.doe@gmail.com", }, "name": "John Doe", + "id": user.id, } ) diff --git a/tests/event_sourced_aggregates/test_event_association_with_aggregate.py b/tests/event_sourced_aggregates/test_event_association_with_aggregate.py index 3843a792..e4a66817 100644 --- a/tests/event_sourced_aggregates/test_event_association_with_aggregate.py +++ b/tests/event_sourced_aggregates/test_event_association_with_aggregate.py @@ -106,12 +106,9 @@ def test_that_trying_to_associate_an_event_with_multiple_aggregates_throws_an_er @pytest.mark.eventstore def test_an_unassociated_event_throws_error(test_domain): user = User.register(user_id="1", name="", email="") - user.raise_(UserArchived(user_id=user.user_id)) - with pytest.raises(ConfigurationError) as exc: - test_domain.repository_for(User).add(user) + user.raise_(UserArchived(user_id=user.user_id)) assert exc.value.args[0] == ( - "No stream name found for `UserArchived`. " - "Either specify an explicit stream name or associate the event with an aggregate." + "Event `UserArchived` is not associated with aggregate `User`" ) diff --git a/tests/event_sourced_aggregates/test_raising_multiple_events_for_one_aggregate_in_a_uow.py b/tests/event_sourced_aggregates/test_raising_multiple_events_for_one_aggregate_in_a_uow.py index 40fd61e2..8a75fe1c 100644 --- a/tests/event_sourced_aggregates/test_raising_multiple_events_for_one_aggregate_in_a_uow.py +++ b/tests/event_sourced_aggregates/test_raising_multiple_events_for_one_aggregate_in_a_uow.py @@ -76,6 +76,8 @@ def rename_user(self, command: RenameNameTwice) -> None: def test_that_multiple_events_are_raised_per_aggregate_in_the_same_uow(test_domain): test_domain.register(User) + test_domain.register(Register, part_of=User) + test_domain.register(RenameNameTwice, part_of=User) test_domain.register(UserCommandHandler, part_of=User) test_domain.register(Registered, part_of=User) test_domain.register(Renamed, part_of=User) diff --git a/tests/event_sourced_repository/test_add_uow.py b/tests/event_sourced_repository/test_add_uow.py index d2902779..4d47b45f 100644 --- a/tests/event_sourced_repository/test_add_uow.py +++ b/tests/event_sourced_repository/test_add_uow.py @@ -20,6 +20,7 @@ class Registered(BaseEvent): @pytest.fixture(autouse=True) def register_elements(test_domain): test_domain.register(User) + test_domain.register(Registered, part_of=User) test_domain.init(traverse=False) diff --git a/tests/event_store/test_appending_commands.py b/tests/event_store/test_appending_commands.py index c7da2384..c31092a9 100644 --- a/tests/event_store/test_appending_commands.py +++ b/tests/event_store/test_appending_commands.py @@ -41,7 +41,7 @@ def test_command_submission(test_domain): test_domain.init(traverse=False) identifier = str(uuid4()) - test_domain.event_store.store.append( + test_domain.process( Register( user_id=identifier, email="john.doe@gmail.com", diff --git a/tests/event_store/test_appending_events.py b/tests/event_store/test_appending_events.py index dcc5fa90..605f2db7 100644 --- a/tests/event_store/test_appending_events.py +++ b/tests/event_store/test_appending_events.py @@ -4,23 +4,30 @@ import pytest -from protean import BaseEvent +from protean import BaseEvent, BaseEventSourcedAggregate from protean.fields.basic import Identifier from protean.utils.mixins import Message +class User(BaseEventSourcedAggregate): + user_id = Identifier(identifier=True) + + class UserLoggedIn(BaseEvent): user_id = Identifier(identifier=True) @pytest.mark.eventstore def test_appending_raw_events(test_domain): - test_domain.register(UserLoggedIn, stream_name="authentication") + test_domain.register(User, stream_name="authentication") + test_domain.register(UserLoggedIn, part_of=User) test_domain.init(traverse=False) identifier = str(uuid4()) - event = UserLoggedIn(user_id=identifier) - test_domain.event_store.store.append(event) + user = User(user_id=identifier) + user.raise_(UserLoggedIn(user_id=identifier)) + event = user._events[0] # Remember event for later comparison + test_domain.repository_for(User).add(user) messages = test_domain.event_store.store.read("authentication") diff --git a/tests/event_store/test_inline_event_processing_on_publish.py b/tests/event_store/test_inline_event_processing_on_publish.py index 0d1720fe..0390ccd2 100644 --- a/tests/event_store/test_inline_event_processing_on_publish.py +++ b/tests/event_store/test_inline_event_processing_on_publish.py @@ -9,7 +9,7 @@ import pytest -from protean import BaseEvent, BaseEventHandler, handle +from protean import BaseEvent, BaseEventHandler, BaseEventSourcedAggregate, handle from protean.fields import Identifier, String from protean.globals import current_domain @@ -21,6 +21,13 @@ def count_up(): counter += 1 +class User(BaseEventSourcedAggregate): + user_id = Identifier(identifier=True) + email = String() + name = String() + password_hash = String() + + class Registered(BaseEvent): user_id = Identifier() email = String() @@ -36,18 +43,26 @@ def registered(self, _: Registered) -> None: @pytest.mark.eventstore def test_inline_event_processing_on_publish_in_sync_mode(test_domain): - test_domain.register(Registered, stream_name="user") + test_domain.register(User, stream_name="user") + test_domain.register(Registered, part_of=User) test_domain.register(UserEventHandler, stream_name="user") test_domain.init(traverse=False) - current_domain.publish( + user = User( + user_id=str(uuid4()), + email="john.doe@example.com", + name="John Doe", + password_hash="hash", + ) + user.raise_( Registered( - user_id=str(uuid4()), - email="john.doe@example.com", - name="John Doe", - password_hash="hash", + user_id=user.user_id, + email=user.email, + name=user.name, + password_hash=user.password_hash, ) ) + current_domain.publish(user._events[0]) global counter assert counter == 1 diff --git a/tests/message/test_object_to_message.py b/tests/message/test_object_to_message.py index 59ef36e3..6f4f9892 100644 --- a/tests/message/test_object_to_message.py +++ b/tests/message/test_object_to_message.py @@ -82,10 +82,12 @@ def test_construct_message_from_event(test_domain): def test_construct_message_from_command(test_domain): identifier = str(uuid4()) command = Register(id=identifier, email="john.doe@gmail.com", name="John Doe") + test_domain.process(command) - message = Message.to_message(command) + messages = test_domain.event_store.store.read("user:command") + assert len(messages) == 1 - assert message is not None + message = messages[0] assert type(message) is Message # Verify Message Content @@ -93,7 +95,7 @@ def test_construct_message_from_command(test_domain): assert message.stream_name == f"{User.meta_.stream_name}:command-{identifier}" assert message.metadata.kind == "COMMAND" assert message.data == command.to_dict() - assert message.time is None + assert message.time is not None # Verify Message Dict message_dict = message.to_dict() @@ -103,17 +105,19 @@ def test_construct_message_from_command(test_domain): message_dict["stream_name"] == f"{User.meta_.stream_name}:command-{identifier}" ) assert message_dict["data"] == command.to_dict() - assert message_dict["time"] is None + assert message_dict["time"] is not None -def test_construct_message_from_command_without_identifier(): +def test_construct_message_from_command_without_identifier(test_domain): """Test that a new UUID is used as identifier when there is no explicit identifier specified""" identifier = str(uuid4()) command = SendEmailCommand(to="john.doe@gmail.com", subject="Foo", content="Bar") + test_domain.process(command) - message = Message.to_message(command) + messages = test_domain.event_store.store.read("send_email:command") + assert len(messages) == 1 - assert message is not None + message = messages[0] assert type(message) is Message message_dict = message.to_dict() @@ -127,9 +131,10 @@ def test_construct_message_from_command_without_identifier(): pytest.fail("Command identifier is not a valid UUID") -def test_construct_message_from_either_event_or_command(): +def test_construct_message_from_either_event_or_command(test_domain): identifier = str(uuid4()) command = Register(id=identifier, email="john.doe@gmail.com", name="John Doe") + command = test_domain._enrich_command(command) message = Message.to_message(command) @@ -142,7 +147,9 @@ def test_construct_message_from_either_event_or_command(): assert message.metadata.kind == "COMMAND" assert message.data == command.to_dict() - event = Registered(id=identifier, email="john.doe@gmail.com", name="John Doe") + user = User(id=identifier, email="john.doe@example.com", name="John Doe") + user.raise_(Registered(id=identifier, email="john.doe@gmail.com", name="John Doe")) + event = user._events[-1] # This simulates the call by UnitOfWork message = Message.to_message(event) diff --git a/tests/message/test_origin_stream_name_in_metadata.py b/tests/message/test_origin_stream_name_in_metadata.py index 7a25f5d8..3d82c264 100644 --- a/tests/message/test_origin_stream_name_in_metadata.py +++ b/tests/message/test_origin_stream_name_in_metadata.py @@ -41,28 +41,35 @@ def user_id(): return str(uuid4()) -def register_command_message(user_id): - return Message.to_message( +@pytest.fixture +def register_command_message(user_id, test_domain): + enriched_command = test_domain._enrich_command( Register( user_id=user_id, email="john.doe@gmail.com", name="John Doe", ) ) + return Message.to_message(enriched_command) +@pytest.fixture def registered_event_message(user_id): - return Message.to_message( + user = User(id=user_id, email="john.doe@gmail.com", name="John Doe") + user.raise_( Registered( user_id=user_id, - email="john.doe@gmail.com", - name="John Doe", + email=user.email, + name=user.name, ) ) + return Message.to_message(user._events[0]) -def test_origin_stream_name_in_event_from_command_without_origin_stream_name(user_id): - g.message_in_context = register_command_message(user_id) +def test_origin_stream_name_in_event_from_command_without_origin_stream_name( + user_id, register_command_message +): + g.message_in_context = register_command_message event_message = Message.to_message( Registered( @@ -74,8 +81,10 @@ def test_origin_stream_name_in_event_from_command_without_origin_stream_name(use assert event_message.metadata.origin_stream_name is None -def test_origin_stream_name_in_event_from_command_with_origin_stream_name(user_id): - command_message = register_command_message(user_id) +def test_origin_stream_name_in_event_from_command_with_origin_stream_name( + user_id, register_command_message +): + command_message = register_command_message command_message.metadata = Metadata( command_message.metadata.to_dict(), origin_stream_name="foo" @@ -94,9 +103,9 @@ def test_origin_stream_name_in_event_from_command_with_origin_stream_name(user_i def test_origin_stream_name_in_aggregate_event_from_command_without_origin_stream_name( - user_id, + user_id, register_command_message ): - g.message_in_context = register_command_message(user_id) + g.message_in_context = register_command_message user = User( id=user_id, email="john.doe@gmail.com", @@ -115,9 +124,9 @@ def test_origin_stream_name_in_aggregate_event_from_command_without_origin_strea def test_origin_stream_name_in_aggregate_event_from_command_with_origin_stream_name( - user_id, + user_id, register_command_message ): - command_message = register_command_message(user_id) + command_message = register_command_message command_message.metadata = Metadata( command_message.metadata.to_dict(), origin_stream_name="foo" @@ -141,14 +150,17 @@ def test_origin_stream_name_in_aggregate_event_from_command_with_origin_stream_n assert event_message.metadata.origin_stream_name == "foo" -def test_origin_stream_name_in_command_from_event(user_id): - g.message_in_context = registered_event_message(user_id) - command_message = Message.to_message( - Register( - user_id=user_id, - email="john.doe@gmail.com", - name="John Doe", - ) +def test_origin_stream_name_in_command_from_event( + user_id, test_domain, registered_event_message +): + g.message_in_context = registered_event_message + command = Register( + user_id=user_id, + email="john.doe@gmail.com", + name="John Doe", ) + enriched_command = test_domain._enrich_command(command) + command_message = Message.to_message(enriched_command) + assert command_message.metadata.origin_stream_name == f"user-{user_id}" diff --git a/tests/server/test_engine_run.py b/tests/server/test_engine_run.py index a0f0700f..c130a68c 100644 --- a/tests/server/test_engine_run.py +++ b/tests/server/test_engine_run.py @@ -3,8 +3,9 @@ import pytest -from protean import BaseEvent, BaseEventHandler, Engine, handle +from protean import BaseAggregate, BaseEvent, BaseEventHandler, Engine, handle from protean.fields import Identifier +from protean.utils import EventProcessing counter = 0 @@ -14,6 +15,10 @@ def count_up(): counter += 1 +class User(BaseAggregate): + user_id = Identifier(identifier=True) + + class UserLoggedIn(BaseEvent): user_id = Identifier(identifier=True) @@ -40,14 +45,18 @@ def auto_set_and_close_loop(): @pytest.fixture(autouse=True) def register_elements(test_domain): - test_domain.register(UserLoggedIn, stream_name="authentication") + test_domain.config["event_processing"] = EventProcessing.ASYNC.value + test_domain.register(User, stream_name="authentication") + test_domain.register(UserLoggedIn, part_of=User) test_domain.register(UserEventHandler, stream_name="authentication") + test_domain.init(traverse=False) def test_processing_messages_on_start(test_domain): identifier = str(uuid4()) - event = UserLoggedIn(user_id=identifier) - test_domain.event_store.store.append(event) + user = User(user_id=identifier) + user.raise_(UserLoggedIn(user_id=identifier)) + test_domain.repository_for(User).add(user) engine = Engine(domain=test_domain, test_mode=True) engine.run() @@ -58,8 +67,9 @@ def test_processing_messages_on_start(test_domain): def test_that_read_position_is_updated_after_engine_run(test_domain): identifier = str(uuid4()) - event = UserLoggedIn(user_id=identifier) - test_domain.event_store.store.append(event) + user = User(user_id=identifier) + user.raise_(UserLoggedIn(user_id=identifier)) + test_domain.repository_for(User).add(user) messages = test_domain.event_store.store.read("authentication") assert len(messages) == 1 @@ -73,8 +83,9 @@ def test_that_read_position_is_updated_after_engine_run(test_domain): def test_processing_messages_from_beginning_the_first_time(test_domain): identifier = str(uuid4()) - event = UserLoggedIn(user_id=identifier) - test_domain.event_store.store.append(event) + user = User(user_id=identifier) + user.raise_(UserLoggedIn(user_id=identifier)) + test_domain.repository_for(User).add(user) engine = Engine(domain=test_domain, test_mode=True) engine.run() diff --git a/tests/subscription/test_read_position_updates.py b/tests/subscription/test_read_position_updates.py index 3ec55214..7d5d7ca7 100644 --- a/tests/subscription/test_read_position_updates.py +++ b/tests/subscription/test_read_position_updates.py @@ -69,7 +69,7 @@ def register_elements(test_domain): test_domain.register(Email) test_domain.register(Registered, part_of=User) test_domain.register(Activated, part_of=User) - test_domain.register(Sent, stream_name="email") + test_domain.register(Sent, part_of=Email) test_domain.register(UserEventHandler, part_of=User) test_domain.register(EmailEventHandler, stream_name="email") diff --git a/tests/test_brokers.py b/tests/test_brokers.py index 11b6b391..41d4d441 100644 --- a/tests/test_brokers.py +++ b/tests/test_brokers.py @@ -27,7 +27,7 @@ class PersonAdded(BaseEvent): class NotifySSOSubscriber(BaseSubscriber): def __call__(self, domain_event_dict): - print("Received Event: ", domain_event_dict) + pass class AddPersonCommand(BaseCommand): @@ -103,14 +103,16 @@ def test_that_brokers_can_be_registered_manually(self, test_domain): class TestEventPublish: @pytest.mark.eventstore def test_that_event_is_persisted_on_publish(self, mocker, test_domain): - test_domain.publish( + person = Person(first_name="John", last_name="Doe", age=24) + person.raise_( PersonAdded( - id="1234", - first_name="John", - last_name="Doe", - age=24, + id=person.id, + first_name=person.first_name, + last_name=person.last_name, + age=person.age, ) ) + test_domain.publish(person._events[0]) messages = test_domain.event_store.store.read("person") @@ -119,20 +121,28 @@ def test_that_event_is_persisted_on_publish(self, mocker, test_domain): @pytest.mark.eventstore def test_that_multiple_events_are_persisted_on_publish(self, mocker, test_domain): + person1 = Person(id="1234", first_name="John", last_name="Doe", age=24) + person1.raise_( + PersonAdded( + id=person1.id, + first_name=person1.first_name, + last_name=person1.last_name, + age=person1.age, + ) + ) + person2 = Person(id="1235", first_name="Jane", last_name="Doe", age=23) + person2.raise_( + PersonAdded( + id=person2.id, + first_name=person2.first_name, + last_name=person2.last_name, + age=person2.age, + ) + ) test_domain.publish( [ - PersonAdded( - id="1234", - first_name="John", - last_name="Doe", - age=24, - ), - PersonAdded( - id="1235", - first_name="Jane", - last_name="Doe", - age=25, - ), + person1._events[0], + person2._events[0], ] )