From 5709fff15ee7b3c817326d39005c59844dee2663 Mon Sep 17 00:00:00 2001 From: Subhash Bhushan Date: Thu, 11 Jul 2024 12:40:32 -0700 Subject: [PATCH] Manage handlers by event/command type We were basing the logic on retrieving handlers on the fully qualified name of the event or command class. This commit moves the logic to be on the `__type__` attribute of event/command classes. This is one of the steps necessary to be able to handle events/ messages from other systems, where we may not have an event registered in the domain. It is also necessary to be able to support multiple versions of event and command classes. --- src/protean/adapters/event_store/__init__.py | 8 ++-- src/protean/adapters/event_store/memory.py | 6 ++- src/protean/container.py | 2 +- src/protean/core/command.py | 14 ++++++- src/protean/core/command_handler.py | 37 ++++++++++--------- src/protean/core/entity.py | 1 + src/protean/core/event.py | 16 ++++++-- src/protean/core/event_handler.py | 11 ++++-- src/protean/core/unit_of_work.py | 4 +- src/protean/domain/__init__.py | 24 ++++++++---- src/protean/utils/mixins.py | 9 ++--- tests/adapters/broker/redis_broker/tests.py | 4 +- ...ams.py => test_aggregate_event_streams.py} | 0 tests/command/test_command_metadata.py | 25 +++++++++++++ ...st_handle_decorator_in_command_handlers.py | 15 ++++---- .../test_inline_command_processing.py | 25 ++++++++++--- .../test_retrieving_handlers_by_command.py | 2 +- tests/event/test_event_metadata.py | 2 + tests/event/test_event_payload.py | 2 + tests/event/tests.py | 1 + ...test_handle_decorator_in_event_handlers.py | 22 +++++------ ...t_raising_events_from_within_aggregates.py | 13 +++++-- ...tiple_events_for_one_aggregate_in_a_uow.py | 7 ++-- tests/event_store/test_reading_messages.py | 3 +- .../test_streams_initialization.py | 3 ++ tests/message/test_object_to_message.py | 18 ++++----- tests/server/test_command_handling.py | 3 +- .../server/test_event_handler_subscription.py | 3 ++ ...st_message_filtering_with_origin_stream.py | 2 +- .../subscription/test_no_message_filtering.py | 4 +- tests/test_commands.py | 8 ++-- .../test_inline_event_processing.py | 4 +- 32 files changed, 199 insertions(+), 99 deletions(-) rename tests/aggregate/events/{test_aggregate_streams.py => test_aggregate_event_streams.py} (100%) diff --git a/src/protean/adapters/event_store/__init__.py b/src/protean/adapters/event_store/__init__.py index fb2c3562..90b6a92d 100644 --- a/src/protean/adapters/event_store/__init__.py +++ b/src/protean/adapters/event_store/__init__.py @@ -104,7 +104,7 @@ def handlers_for(self, event: BaseEvent) -> List[BaseEventHandler]: ) configured_stream_handlers = set() for stream_handler in stream_handlers: - if fqn(event.__class__) in stream_handler._handlers: + if event.__class__.__type__ in stream_handler._handlers: configured_stream_handlers.add(stream_handler) return set.union(configured_stream_handlers, all_stream_handlers) @@ -129,7 +129,7 @@ def command_handler_for(self, command: BaseCommand) -> Optional[BaseCommandHandl for handler_cls in handler_classes: try: handler_method = next( - iter(handler_cls._handlers[fqn(command.__class__)]) + iter(handler_cls._handlers[command.__class__.__type__]) ) handler_methods.add((handler_cls, handler_method)) except StopIteration: @@ -149,7 +149,7 @@ def last_event_of_type( events = [ event for event in self.domain.event_store.store._read(stream_name) - if event["type"] == fqn(event_cls) + if event["type"] == event_cls.__type__ ] return Message.from_dict(events[-1]).to_object() if len(events) > 0 else None @@ -175,5 +175,5 @@ def events_of_type( return [ Message.from_dict(event).to_object() for event in self.domain.event_store.store._read(stream_name) - if event["type"] == fqn(event_cls) + if event["type"] == event_cls.__type__ ] diff --git a/src/protean/adapters/event_store/memory.py b/src/protean/adapters/event_store/memory.py index aa558b5c..677f705f 100644 --- a/src/protean/adapters/event_store/memory.py +++ b/src/protean/adapters/event_store/memory.py @@ -76,7 +76,11 @@ def read( if stream_name == "$all": pass # Don't filter on stream name elif self.is_category(stream_name): - q = q.filter(stream_name__contains=stream_name) + # If filtering on category, ensure the supplied stream name + # is the only thing in the category. + # Eg. If stream_name is 'user', then only 'user' should be in the category, + # and not even `user:command` + q = q.filter(stream_name__contains=f"{stream_name}-") else: q = q.filter(stream_name=stream_name) diff --git a/src/protean/container.py b/src/protean/container.py index 622a559f..fd7e7357 100644 --- a/src/protean/container.py +++ b/src/protean/container.py @@ -14,7 +14,6 @@ ValidationError, ) from protean.fields import Auto, Field, FieldBase, ValueObject -from protean.globals import current_domain from protean.reflection import id_field from protean.utils import generate_identity @@ -416,6 +415,7 @@ def raise_(self, event, fact_event=False) -> None: _metadata={ "id": (f"{stream_name}-{self._version}"), "type": event._metadata.type, + "fqn": event._metadata.fqn, "kind": event._metadata.kind, "stream_name": stream_name, "origin_stream_name": event._metadata.origin_stream_name, diff --git a/src/protean/core/command.py b/src/protean/core/command.py index 4fd7621e..10e26b34 100644 --- a/src/protean/core/command.py +++ b/src/protean/core/command.py @@ -9,7 +9,7 @@ from protean.fields import Field, ValueObject from protean.globals import g from protean.reflection import _ID_FIELD_NAME, declared_fields, fields -from protean.utils import DomainObjects, derive_element_class +from protean.utils import DomainObjects, derive_element_class, fqn class BaseCommand(BaseContainer, OptionsMixin): @@ -58,6 +58,7 @@ def __init__(self, *args, **kwargs): self._metadata = Metadata( self._metadata.to_dict(), # Template kind="COMMAND", + fqn=fqn(self.__class__), origin_stream_name=origin_stream_name, version=version, ) @@ -116,6 +117,17 @@ def __track_id_field(subclass): # No Identity fields declared pass + def to_dict(self): + """Return data as a dictionary. + + We need to override this method in Command, because `to_dict()` of `BaseContainer` + eliminates `_metadata`. + """ + return { + field_name: field_obj.as_dict(getattr(self, field_name, None)) + for field_name, field_obj in fields(self).items() + } + def command_factory(element_cls, domain, **opts): element_cls = derive_element_class(element_cls, BaseCommand, **opts) diff --git a/src/protean/core/command_handler.py b/src/protean/core/command_handler.py index b08a2cab..2d024bd9 100644 --- a/src/protean/core/command_handler.py +++ b/src/protean/core/command_handler.py @@ -3,7 +3,7 @@ from protean.container import Element, OptionsMixin from protean.core.command import BaseCommand from protean.exceptions import IncorrectUsageError, NotSupportedError -from protean.utils import DomainObjects, derive_element_class, fully_qualified_name +from protean.utils import DomainObjects, derive_element_class from protean.utils.mixins import HandlerMixin @@ -45,23 +45,6 @@ def command_handler_factory(element_cls, domain, **opts): if not ( method_name.startswith("__") and method_name.endswith("__") ) and hasattr(method, "_target_cls"): - # Do not allow multiple handlers per command - if ( - fully_qualified_name(method._target_cls) in element_cls._handlers - and len( - element_cls._handlers[fully_qualified_name(method._target_cls)] - ) - != 0 - ): - raise NotSupportedError( - f"Command {method._target_cls.__name__} cannot be handled by multiple handlers" - ) - - # `_handlers` maps the command to its handler method - element_cls._handlers[fully_qualified_name(method._target_cls)].add( - method - ) - # Throw error if target_cls is not a Command if not inspect.isclass(method._target_cls) or not issubclass( method._target_cls, BaseCommand @@ -96,6 +79,24 @@ def command_handler_factory(element_cls, domain, **opts): } ) + command_type = ( + method._target_cls.__type__ + if issubclass(method._target_cls, BaseCommand) + else method._target_cls + ) + + # Do not allow multiple handlers per command + if ( + command_type in element_cls._handlers + and len(element_cls._handlers[command_type]) != 0 + ): + raise NotSupportedError( + f"Command {method._target_cls.__name__} cannot be handled by multiple handlers" + ) + + # `_handlers` maps the command to its handler method + element_cls._handlers[command_type].add(method) + # Associate Command with the handler's stream # Order of preference: # 1. Stream name defined in command diff --git a/src/protean/core/entity.py b/src/protean/core/entity.py index 899d03e9..1acc7fd3 100644 --- a/src/protean/core/entity.py +++ b/src/protean/core/entity.py @@ -469,6 +469,7 @@ def raise_(self, event) -> None: _metadata={ "id": (f"{stream_name}-{aggregate_version}.{event_number}"), "type": event._metadata.type, + "fqn": event._metadata.fqn, "kind": event._metadata.kind, "stream_name": stream_name, "origin_stream_name": event._metadata.origin_stream_name, diff --git a/src/protean/core/event.py b/src/protean/core/event.py index 84e583f7..947b57d2 100644 --- a/src/protean/core/event.py +++ b/src/protean/core/event.py @@ -12,20 +12,26 @@ from protean.fields import DateTime, Field, Integer, String, ValueObject from protean.globals import g from protean.reflection import _ID_FIELD_NAME, declared_fields, fields -from protean.utils import DomainObjects, derive_element_class +from protean.utils import DomainObjects, derive_element_class, fqn logger = logging.getLogger(__name__) class Metadata(BaseValueObject): - # Unique identifier of the Event - # Format is .... + # Unique identifier of the event/command + # + # FIXME Fix the format documentation + # Event Format is .... + # Command Format is .. id = String() # Type of the event # Format is .. type = String() + # Fully Qualified Name of the event/command + fqn = String() + # Kind of the object # Can be one of "EVENT", "COMMAND" kind = String() @@ -40,9 +46,10 @@ class Metadata(BaseValueObject): timestamp = DateTime(default=lambda: datetime.now(timezone.utc)) # Version of the event - # Can be overridden with `__version__` class attr in Event class definition + # Can be overridden with `__version__` class attr in event/command class definition version = String(default="v1") + # Applies to Events only # Sequence of the event in the aggregate # This is the version of the aggregate as it will be *after* persistence. # @@ -146,6 +153,7 @@ def __init__(self, *args, **kwargs): self._metadata.to_dict(), # Template from old Metadata type=self.__class__.__type__, kind="EVENT", + fqn=fqn(self.__class__), origin_stream_name=origin_stream_name, version=self.__class__.__version__, # Was set in `__init_subclass__` ) diff --git a/src/protean/core/event_handler.py b/src/protean/core/event_handler.py index 56560775..c3b3cfbd 100644 --- a/src/protean/core/event_handler.py +++ b/src/protean/core/event_handler.py @@ -2,8 +2,9 @@ import logging from protean.container import Element, OptionsMixin +from protean.core.event import BaseEvent from protean.exceptions import IncorrectUsageError, NotSupportedError -from protean.utils import DomainObjects, derive_element_class, fully_qualified_name +from protean.utils import DomainObjects, derive_element_class from protean.utils.mixins import HandlerMixin logger = logging.getLogger(__name__) @@ -59,8 +60,12 @@ def event_handler_factory(element_cls, domain, **opts): # can have only one `$any` handler method. element_cls._handlers["$any"] = {method} else: - element_cls._handlers[fully_qualified_name(method._target_cls)].add( - method + # Target could be an event or an event type string + event_type = ( + method._target_cls.__type__ + if issubclass(method._target_cls, BaseEvent) + else method._target_cls ) + element_cls._handlers[event_type].add(method) return element_cls diff --git a/src/protean/core/unit_of_work.py b/src/protean/core/unit_of_work.py index 50151fc0..0ce03109 100644 --- a/src/protean/core/unit_of_work.py +++ b/src/protean/core/unit_of_work.py @@ -8,7 +8,7 @@ ) from protean.globals import _uow_context_stack, current_domain from protean.reflection import id_field -from protean.utils import EventProcessing, fqn +from protean.utils import EventProcessing logger = logging.getLogger(__name__) @@ -90,7 +90,7 @@ def commit(self): # noqa: C901 handler_classes = current_domain.handlers_for(event) for handler_cls in handler_classes: handler_methods = ( - handler_cls._handlers[fqn(event.__class__)] + handler_cls._handlers[event.__class__.__type__] or handler_cls._handlers["$any"] ) diff --git a/src/protean/domain/__init__.py b/src/protean/domain/__init__.py index d665f11c..cb8ab43f 100644 --- a/src/protean/domain/__init__.py +++ b/src/protean/domain/__init__.py @@ -910,7 +910,7 @@ def publish(self, events: Union[BaseEvent, List[BaseEvent]]) -> None: handler_classes = self.handlers_for(event) for handler_cls in handler_classes: handler_methods = ( - handler_cls._handlers[fqn(event.__class__)] + handler_cls._handlers[event.__class__.__type__] or handler_cls._handlers["$any"] ) @@ -939,11 +939,9 @@ def _enrich_command(self, command: BaseCommand) -> BaseCommand: command_with_metadata = command.__class__( command.to_dict(), _metadata={ - "id": (str(uuid4())), - "type": ( - f"{command.meta_.part_of.__class__.__name__}.{command.__class__.__name__}." - f"{command._metadata.version}" - ), + "id": identifier, # FIXME Double check command ID format and construction + "type": command.__class__.__type__, + "fqn": command._metadata.fqn, "kind": "EVENT", "stream_name": stream_name, "origin_stream_name": origin_stream_name, @@ -976,6 +974,18 @@ def process(self, command: BaseCommand, asynchronous: bool = True) -> Optional[A Returns: Optional[Any]: Returns either the command handler's return value or nothing, based on preference. """ + if ( + fqn(command.__class__) + not in self.registry._elements[DomainObjects.COMMAND.value] + ): + raise IncorrectUsageError( + { + "element": [ + f"Element {command.__class__.__name__} is not registered in domain {self.name}" + ] + } + ) + command_with_metadata = self._enrich_command(command) position = self.event_store.store.append(command_with_metadata) @@ -986,7 +996,7 @@ def process(self, command: BaseCommand, asynchronous: bool = True) -> Optional[A handler_class = self.command_handler_for(command) if handler_class: handler_method = next( - iter(handler_class._handlers[fqn(command.__class__)]) + iter(handler_class._handlers[command.__class__.__type__]) ) handler_method(handler_class(), command) diff --git a/src/protean/utils/mixins.py b/src/protean/utils/mixins.py index aaa2968f..1bf0fc04 100644 --- a/src/protean/utils/mixins.py +++ b/src/protean/utils/mixins.py @@ -13,7 +13,6 @@ from protean.core.unit_of_work import UnitOfWork from protean.exceptions import ConfigurationError, InvalidDataError from protean.globals import current_domain -from protean.utils import fully_qualified_name logger = logging.getLogger(__name__) @@ -82,9 +81,9 @@ def from_dict(cls, message: Dict) -> Message: def to_object(self) -> Union[BaseEvent, BaseCommand]: if self.metadata.kind == MessageType.EVENT.value: - element_record = current_domain.registry.events[self.type] + element_record = current_domain.registry.events[self.metadata.fqn] elif self.metadata.kind == MessageType.COMMAND.value: - element_record = current_domain.registry.commands[self.type] + element_record = current_domain.registry.commands[self.metadata.fqn] else: raise InvalidDataError( {"_message": ["Message type is not supported for deserialization"]} @@ -110,7 +109,7 @@ def to_message(cls, message_object: Union[BaseEvent, BaseCommand]) -> Message: return cls( stream_name=message_object._metadata.stream_name, - type=fully_qualified_name(message_object.__class__), + type=message_object.__class__.__type__, data=message_object.to_dict(), metadata=message_object._metadata, expected_version=expected_version, @@ -162,7 +161,7 @@ def __init_subclass__(subclass) -> None: @classmethod def _handle(cls, message: Message) -> None: # Use Event-specific handlers if available, or fallback on `$any` if defined - handlers = cls._handlers[message.type] or cls._handlers["$any"] + handlers = cls._handlers[message.metadata.type] or cls._handlers["$any"] for handler_method in handlers: handler_method(cls(), message.to_object()) diff --git a/tests/adapters/broker/redis_broker/tests.py b/tests/adapters/broker/redis_broker/tests.py index 3838a9be..9a5e6b7e 100644 --- a/tests/adapters/broker/redis_broker/tests.py +++ b/tests/adapters/broker/redis_broker/tests.py @@ -60,7 +60,9 @@ def test_event_message_structure(self, test_domain): "metadata", ] ) - assert json_message["type"] == "redis_broker.elements.PersonAdded" + assert ( + json_message["type"] == "Redis Broker Tests.PersonAdded.v1" + ) # FIXME Normalize Domain Name assert json_message["metadata"]["kind"] == "EVENT" diff --git a/tests/aggregate/events/test_aggregate_streams.py b/tests/aggregate/events/test_aggregate_event_streams.py similarity index 100% rename from tests/aggregate/events/test_aggregate_streams.py rename to tests/aggregate/events/test_aggregate_event_streams.py diff --git a/tests/command/test_command_metadata.py b/tests/command/test_command_metadata.py index 4d594a2c..60783b6c 100644 --- a/tests/command/test_command_metadata.py +++ b/tests/command/test_command_metadata.py @@ -5,6 +5,7 @@ from protean import BaseAggregate, BaseCommand from protean.fields import Identifier, String from protean.reflection import fields +from protean.utils import fqn class User(BaseAggregate): @@ -56,3 +57,27 @@ class Login(BaseCommand): command = Login(user_id=str(uuid4())) assert command._metadata.version == "v2" + + +def test_command_metadata(test_domain): + identifier = str(uuid4()) + command = test_domain._enrich_command(Login(user_id=identifier)) + + assert ( + command.to_dict() + == { + "_metadata": { + "id": f"{identifier}", # FIXME Double-check command identifier format and construction + "type": "Test.Login.v1", + "fqn": fqn(Login), + "kind": "COMMAND", + "stream_name": f"user:command-{identifier}", + "origin_stream_name": None, + "timestamp": str(command._metadata.timestamp), + "version": "v1", + "sequence_id": None, + "payload_hash": command._metadata.payload_hash, + }, + "user_id": command.user_id, + } + ) diff --git a/tests/command_handler/test_handle_decorator_in_command_handlers.py b/tests/command_handler/test_handle_decorator_in_command_handlers.py index 3c5a6e85..26a97e66 100644 --- a/tests/command_handler/test_handle_decorator_in_command_handlers.py +++ b/tests/command_handler/test_handle_decorator_in_command_handlers.py @@ -4,7 +4,6 @@ from protean.core.command_handler import BaseCommandHandler from protean.exceptions import NotSupportedError from protean.fields import Identifier, String -from protean.utils import fully_qualified_name class User(BaseAggregate): @@ -32,7 +31,7 @@ def register(self, command: Register) -> None: test_domain.register(Register, part_of=User) test_domain.register(UserCommandHandlers, part_of=User) - assert fully_qualified_name(Register) in UserCommandHandlers._handlers + assert Register.__type__ in UserCommandHandlers._handlers def test_that_multiple_handlers_can_be_recorded_against_command_handler(test_domain): @@ -55,19 +54,19 @@ def update_billing_address(self, event: ChangeAddress) -> None: assert all( handle_name in UserCommandHandlers._handlers for handle_name in [ - fully_qualified_name(Register), - fully_qualified_name(ChangeAddress), + Register.__type__, + ChangeAddress.__type__, ] ) - assert len(UserCommandHandlers._handlers[fully_qualified_name(Register)]) == 1 - assert len(UserCommandHandlers._handlers[fully_qualified_name(ChangeAddress)]) == 1 + assert len(UserCommandHandlers._handlers[Register.__type__]) == 1 + assert len(UserCommandHandlers._handlers[ChangeAddress.__type__]) == 1 assert ( - next(iter(UserCommandHandlers._handlers[fully_qualified_name(Register)])) + next(iter(UserCommandHandlers._handlers[Register.__type__])) == UserCommandHandlers.register ) assert ( - next(iter(UserCommandHandlers._handlers[fully_qualified_name(ChangeAddress)])) + next(iter(UserCommandHandlers._handlers[ChangeAddress.__type__])) == UserCommandHandlers.update_billing_address ) diff --git a/tests/command_handler/test_inline_command_processing.py b/tests/command_handler/test_inline_command_processing.py index baf3ed32..f7da4f4e 100644 --- a/tests/command_handler/test_inline_command_processing.py +++ b/tests/command_handler/test_inline_command_processing.py @@ -1,7 +1,10 @@ from uuid import uuid4 +import pytest + from protean import BaseAggregate, BaseCommand, handle from protean.core.command_handler import BaseCommandHandler +from protean.exceptions import IncorrectUsageError from protean.fields import Identifier, String from protean.utils import CommandProcessing @@ -19,6 +22,10 @@ class Register(BaseCommand): email = String() +class Login(BaseCommand): + user_id = Identifier() + + class UserCommandHandlers(BaseCommandHandler): @handle(Register) def register(self, event: Register) -> None: @@ -26,24 +33,32 @@ def register(self, event: Register) -> None: counter += 1 -def test_that_command_can_be_processed_inline(test_domain): +@pytest.fixture(autouse=True) +def register(test_domain): test_domain.register(User) test_domain.register(Register, part_of=User) test_domain.register(UserCommandHandlers, part_of=User) test_domain.init(traverse=False) - assert test_domain.config["command_processing"] == CommandProcessing.SYNC.value - test_domain.process(Register(user_id=str(uuid4()), email="john.doe@gmail.com")) - assert counter == 1 +def test_unregistered_command_raises_error(test_domain): + with pytest.raises(IncorrectUsageError): + test_domain.process(Login(user_id=str(uuid4()))) -def test_that_command_is_persisted_in_message_store(test_domain): +def test_that_command_can_be_processed_inline(test_domain): test_domain.register(User) test_domain.register(Register, part_of=User) test_domain.register(UserCommandHandlers, part_of=User) test_domain.init(traverse=False) + assert test_domain.config["command_processing"] == CommandProcessing.SYNC.value + + test_domain.process(Register(user_id=str(uuid4()), email="john.doe@gmail.com")) + assert counter == 1 + + +def test_that_command_is_persisted_in_message_store(test_domain): identifier = str(uuid4()) test_domain.process(Register(user_id=identifier, email="john.doe@gmail.com")) diff --git a/tests/command_handler/test_retrieving_handlers_by_command.py b/tests/command_handler/test_retrieving_handlers_by_command.py index 3e4962f2..c08e9013 100644 --- a/tests/command_handler/test_retrieving_handlers_by_command.py +++ b/tests/command_handler/test_retrieving_handlers_by_command.py @@ -78,7 +78,7 @@ def test_for_no_errors_when_no_handler_method_has_not_been_defined_for_a_command test_domain.register(UserCommandHandlers, part_of=User) test_domain.init(traverse=False) - assert test_domain.command_handler_for(ChangeAddress) is None + assert test_domain.command_handler_for(ChangeAddress()) is None def test_retrieving_handlers_for_unknown_command(test_domain): diff --git a/tests/event/test_event_metadata.py b/tests/event/test_event_metadata.py index 4d90cd7f..b6f9c5ce 100644 --- a/tests/event/test_event_metadata.py +++ b/tests/event/test_event_metadata.py @@ -7,6 +7,7 @@ from protean.fields import String, ValueObject from protean.fields.basic import Identifier from protean.reflection import fields +from protean.utils import fqn class User(BaseEventSourcedAggregate): @@ -102,6 +103,7 @@ def test_event_metadata(): "_metadata": { "id": f"user-{user.id}-0", "type": "Test.UserLoggedIn.v1", + "fqn": fqn(UserLoggedIn), "kind": "EVENT", "stream_name": f"user-{user.id}", "origin_stream_name": None, diff --git a/tests/event/test_event_payload.py b/tests/event/test_event_payload.py index 4ff0341e..2bf83ba0 100644 --- a/tests/event/test_event_payload.py +++ b/tests/event/test_event_payload.py @@ -5,6 +5,7 @@ from protean import BaseEvent, BaseEventSourcedAggregate from protean.fields import String from protean.fields.basic import Identifier +from protean.utils import fqn class User(BaseEventSourcedAggregate): @@ -38,6 +39,7 @@ def test_event_payload(): "_metadata": { "id": f"user-{user_id}-0", "type": "Test.UserLoggedIn.v1", + "fqn": fqn(UserLoggedIn), "kind": "EVENT", "stream_name": f"user-{user_id}", "origin_stream_name": None, diff --git a/tests/event/tests.py b/tests/event/tests.py index 0eb37942..759c2271 100644 --- a/tests/event/tests.py +++ b/tests/event/tests.py @@ -58,6 +58,7 @@ class UserAdded(BaseEvent): "_metadata": { "id": None, # ID is none because the event is not being raised in the proper way (with `_raise`) "type": "Test.UserAdded.v1", + "fqn": fully_qualified_name(UserAdded), "kind": "EVENT", "stream_name": None, # Type is none here because of the same reason as above "origin_stream_name": None, diff --git a/tests/event_handler/test_handle_decorator_in_event_handlers.py b/tests/event_handler/test_handle_decorator_in_event_handlers.py index ba0e308e..068726af 100644 --- a/tests/event_handler/test_handle_decorator_in_event_handlers.py +++ b/tests/event_handler/test_handle_decorator_in_event_handlers.py @@ -2,7 +2,6 @@ from protean import BaseAggregate, BaseEvent, BaseEventHandler, handle from protean.fields import Identifier, String -from protean.utils import fully_qualified_name class User(BaseAggregate): @@ -27,9 +26,10 @@ def send_email_notification(self, event: Registered) -> None: pass test_domain.register(User) + test_domain.register(Registered, part_of=User) test_domain.register(UserEventHandlers, part_of=User) - assert fully_qualified_name(Registered) in UserEventHandlers._handlers + assert Registered.__type__ in UserEventHandlers._handlers def test_that_multiple_handlers_can_be_recorded_against_event_handler(test_domain): @@ -43,25 +43,27 @@ def updated_billing_address(self, event: AddressChanged) -> None: pass test_domain.register(User) + test_domain.register(Registered, part_of=User) + test_domain.register(AddressChanged, part_of=User) test_domain.register(UserEventHandlers, part_of=User) assert len(UserEventHandlers._handlers) == 2 assert all( handle_name in UserEventHandlers._handlers for handle_name in [ - fully_qualified_name(Registered), - fully_qualified_name(AddressChanged), + Registered.__type__, + AddressChanged.__type__, ] ) - assert len(UserEventHandlers._handlers[fully_qualified_name(Registered)]) == 1 - assert len(UserEventHandlers._handlers[fully_qualified_name(AddressChanged)]) == 1 + assert len(UserEventHandlers._handlers[Registered.__type__]) == 1 + assert len(UserEventHandlers._handlers[AddressChanged.__type__]) == 1 assert ( - next(iter(UserEventHandlers._handlers[fully_qualified_name(Registered)])) + next(iter(UserEventHandlers._handlers[Registered.__type__])) == UserEventHandlers.send_email_notification ) assert ( - next(iter(UserEventHandlers._handlers[fully_qualified_name(AddressChanged)])) + next(iter(UserEventHandlers._handlers[AddressChanged.__type__])) == UserEventHandlers.updated_billing_address ) @@ -81,9 +83,7 @@ def provision_user_accounts(self, event: Registered) -> None: assert len(UserEventHandlers._handlers) == 1 # Against Registered Event - handlers_for_registered = UserEventHandlers._handlers[ - fully_qualified_name(Registered) - ] + handlers_for_registered = UserEventHandlers._handlers[Registered.__type__] assert len(handlers_for_registered) == 2 assert all( handler_method in handlers_for_registered diff --git a/tests/event_sourced_aggregates/test_raising_events_from_within_aggregates.py b/tests/event_sourced_aggregates/test_raising_events_from_within_aggregates.py index 92460ee3..72864a13 100644 --- a/tests/event_sourced_aggregates/test_raising_events_from_within_aggregates.py +++ b/tests/event_sourced_aggregates/test_raising_events_from_within_aggregates.py @@ -9,11 +9,10 @@ from protean.core.event_sourced_aggregate import apply from protean.fields import Identifier, String from protean.globals import current_domain -from protean.utils import fqn class Register(BaseCommand): - id = Identifier() + id = Identifier(identifier=True) email = String() name = String() password_hash = String() @@ -75,7 +74,7 @@ def register_elements(test_domain): @pytest.mark.eventstore def test_that_events_can_be_raised_from_within_aggregates(test_domain): identifier = str(uuid4()) - UserCommandHandler().register_user( + test_domain.process( Register( id=identifier, email="john.doe@example.com", @@ -88,4 +87,10 @@ def test_that_events_can_be_raised_from_within_aggregates(test_domain): assert len(messages) == 1 assert messages[0]["stream_name"] == f"user-{identifier}" - assert messages[0]["type"] == f"{fqn(Registered)}" + assert messages[0]["type"] == Registered.__type__ + + messages = test_domain.event_store.store._read("user:command") + + assert len(messages) == 1 + assert messages[0]["stream_name"] == f"user:command-{identifier}" + assert messages[0]["type"] == Register.__type__ diff --git a/tests/event_sourced_aggregates/test_raising_multiple_events_for_one_aggregate_in_a_uow.py b/tests/event_sourced_aggregates/test_raising_multiple_events_for_one_aggregate_in_a_uow.py index 8a75fe1c..c3548e80 100644 --- a/tests/event_sourced_aggregates/test_raising_multiple_events_for_one_aggregate_in_a_uow.py +++ b/tests/event_sourced_aggregates/test_raising_multiple_events_for_one_aggregate_in_a_uow.py @@ -14,7 +14,6 @@ ) from protean.fields import Identifier, String from protean.globals import current_domain -from protean.utils import fqn class Register(BaseCommand): @@ -100,6 +99,6 @@ def test_that_multiple_events_are_raised_per_aggregate_in_the_same_uow(test_doma messages = test_domain.event_store.store._read("user") assert len(messages) == 3 - assert messages[0]["type"] == f"{fqn(Registered)}" - assert messages[1]["type"] == f"{fqn(Renamed)}" - assert messages[2]["type"] == f"{fqn(Renamed)}" + assert messages[0]["type"] == Registered.__type__ + assert messages[1]["type"] == Renamed.__type__ + assert messages[2]["type"] == Renamed.__type__ diff --git a/tests/event_store/test_reading_messages.py b/tests/event_store/test_reading_messages.py index 38d58281..999a43b9 100644 --- a/tests/event_store/test_reading_messages.py +++ b/tests/event_store/test_reading_messages.py @@ -7,7 +7,6 @@ from protean import BaseEvent, BaseEventSourcedAggregate from protean.fields import String from protean.fields.basic import Identifier -from protean.utils import fqn from protean.utils.mixins import Message @@ -136,5 +135,5 @@ def test_reading_messages_by_category(test_domain, activated_user): def test_reading_last_message(test_domain, renamed_user): # Reading by stream message = test_domain.event_store.store.read_last_message(f"user-{renamed_user.id}") - assert message.type == fqn(Renamed) + assert message.type == Renamed.__type__ assert message.data["name"] == "John Doe 9" diff --git a/tests/event_store/test_streams_initialization.py b/tests/event_store/test_streams_initialization.py index 03b5ff10..2b648a96 100644 --- a/tests/event_store/test_streams_initialization.py +++ b/tests/event_store/test_streams_initialization.py @@ -57,7 +57,10 @@ def record_sent_email(self, event: Sent) -> None: @pytest.fixture(autouse=True) def register(test_domain): test_domain.register(User) + test_domain.register(Registered, part_of=User) + test_domain.register(Activated, part_of=User) test_domain.register(Email) + test_domain.register(Sent, part_of=Email) test_domain.register(UserEventHandler, part_of=User) test_domain.register(EmailEventHandler, part_of=Email) test_domain.init(traverse=False) diff --git a/tests/message/test_object_to_message.py b/tests/message/test_object_to_message.py index c208a464..b8b6c7f7 100644 --- a/tests/message/test_object_to_message.py +++ b/tests/message/test_object_to_message.py @@ -5,7 +5,6 @@ from protean import BaseCommand, BaseEvent, BaseEventSourcedAggregate from protean.exceptions import ConfigurationError from protean.fields import Identifier, String -from protean.utils import fully_qualified_name from protean.utils.mixins import Message @@ -64,7 +63,7 @@ def test_construct_message_from_event(test_domain): assert type(message) is Message # Verify Message Content - assert message.type == fully_qualified_name(Registered) + assert message.type == Registered.__type__ assert message.stream_name == f"{User.meta_.stream_name}-{identifier}" assert message.metadata.kind == "EVENT" assert message.data == user._events[-1].to_dict() @@ -74,7 +73,7 @@ def test_construct_message_from_event(test_domain): # Verify Message Dict message_dict = message.to_dict() - assert message_dict["type"] == fully_qualified_name(Registered) + assert message_dict["type"] == Registered.__type__ assert message_dict["metadata"]["kind"] == "EVENT" assert message_dict["stream_name"] == f"{User.meta_.stream_name}-{identifier}" assert message_dict["data"] == user._events[-1].to_dict() @@ -87,6 +86,7 @@ def test_construct_message_from_event(test_domain): def test_construct_message_from_command(test_domain): identifier = str(uuid4()) command = Register(id=identifier, email="john.doe@gmail.com", name="John Doe") + command_with_metadata = test_domain._enrich_command(command) test_domain.process(command) messages = test_domain.event_store.store.read("user:command") @@ -96,20 +96,20 @@ def test_construct_message_from_command(test_domain): assert type(message) is Message # Verify Message Content - assert message.type == fully_qualified_name(Register) + assert message.type == Register.__type__ assert message.stream_name == f"{User.meta_.stream_name}:command-{identifier}" assert message.metadata.kind == "COMMAND" - assert message.data == command.to_dict() + assert message.data == command_with_metadata.to_dict() assert message.time is not None # Verify Message Dict message_dict = message.to_dict() - assert message_dict["type"] == fully_qualified_name(Register) + assert message_dict["type"] == Register.__type__ assert message_dict["metadata"]["kind"] == "COMMAND" assert ( message_dict["stream_name"] == f"{User.meta_.stream_name}:command-{identifier}" ) - assert message_dict["data"] == command.to_dict() + assert message_dict["data"] == command_with_metadata.to_dict() assert message_dict["time"] is not None @@ -147,7 +147,7 @@ def test_construct_message_from_either_event_or_command(test_domain): assert type(message) is Message # Verify Message Content - assert message.type == fully_qualified_name(Register) + assert message.type == Register.__type__ assert message.stream_name == f"{User.meta_.stream_name}:command-{identifier}" assert message.metadata.kind == "COMMAND" assert message.data == command.to_dict() @@ -163,7 +163,7 @@ def test_construct_message_from_either_event_or_command(test_domain): assert type(message) is Message # Verify Message Content - assert message.type == fully_qualified_name(Registered) + assert message.type == Registered.__type__ assert message.stream_name == f"{User.meta_.stream_name}-{identifier}" assert message.metadata.kind == "EVENT" assert message.data == event.to_dict() diff --git a/tests/server/test_command_handling.py b/tests/server/test_command_handling.py index bc532241..df1fba98 100644 --- a/tests/server/test_command_handling.py +++ b/tests/server/test_command_handling.py @@ -52,7 +52,8 @@ async def test_handler_invocation(test_domain): user_id=identifier, email="john.doe@example.com", ) - message = Message.to_message(command) + enriched_command = test_domain._enrich_command(command) + message = Message.to_message(enriched_command) engine = Engine(domain=test_domain, test_mode=True) await engine.handle_message(UserCommandHandler, message) diff --git a/tests/server/test_event_handler_subscription.py b/tests/server/test_event_handler_subscription.py index 3180b8f9..a503858c 100644 --- a/tests/server/test_event_handler_subscription.py +++ b/tests/server/test_event_handler_subscription.py @@ -73,6 +73,8 @@ def setup_event_loop(): def test_event_subscriptions(test_domain): test_domain.register(User) + test_domain.register(Registered, part_of=User) + test_domain.register(Activated, part_of=User) test_domain.register(UserEventHandler, part_of=User) engine = Engine(test_domain, test_mode=True) @@ -83,6 +85,7 @@ def test_event_subscriptions(test_domain): def test_origin_stream_name_in_subscription(test_domain): test_domain.register(User) + test_domain.register(Sent, part_of=User) test_domain.register(EmailEventHandler, part_of=User, source_stream="email") engine = Engine(test_domain, test_mode=True) diff --git a/tests/subscription/test_message_filtering_with_origin_stream.py b/tests/subscription/test_message_filtering_with_origin_stream.py index be3595c7..6e67f55d 100644 --- a/tests/subscription/test_message_filtering_with_origin_stream.py +++ b/tests/subscription/test_message_filtering_with_origin_stream.py @@ -113,4 +113,4 @@ async def test_message_filtering_for_event_handlers_with_defined_origin_stream( ) assert len(filtered_messages) == 1 - assert filtered_messages[0].type == fqn(Sent) + assert filtered_messages[0].type == Sent.__type__ diff --git a/tests/subscription/test_no_message_filtering.py b/tests/subscription/test_no_message_filtering.py index fffa869d..7065cef5 100644 --- a/tests/subscription/test_no_message_filtering.py +++ b/tests/subscription/test_no_message_filtering.py @@ -112,5 +112,5 @@ async def test_no_filtering_for_event_handlers_without_defined_origin_stream( ) assert len(filtered_messages) == 3 - assert filtered_messages[0].type == fqn(Registered) - assert filtered_messages[2].type == fqn(Sent) + assert filtered_messages[0].type == Registered.__type__ + assert filtered_messages[2].type == Sent.__type__ diff --git a/tests/test_commands.py b/tests/test_commands.py index b37aa540..9b7ac0eb 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -15,7 +15,7 @@ class User(BaseAggregate): class UserRegistrationCommand(BaseCommand): - email = String(required=True, max_length=250) + email = String(required=True, identifier=True, max_length=250) username = String(required=True, max_length=50) password = String(required=True, max_length=255) age = Integer(default=21) @@ -83,7 +83,9 @@ def test_two_commands_with_equal_values_are_considered_equal(self): email="john.doe@gmail.com", username="john.doe", password="secret1!" ) - assert command1 == command2 + # The commands themselves will not be equal because their metadata, like + # timestamp, can differ. But their payloads should be equal + assert command1.payload == command2.payload def test_that_commands_are_immutable(self): command = UserRegistrationCommand( @@ -96,7 +98,7 @@ def test_output_to_dict(self): command = UserRegistrationCommand( email="john.doe@gmail.com", username="john.doe", password="secret1!" ) - assert command.to_dict() == { + assert command.payload == { "email": "john.doe@gmail.com", "username": "john.doe", "password": "secret1!", diff --git a/tests/unit_of_work/test_inline_event_processing.py b/tests/unit_of_work/test_inline_event_processing.py index f54eeca0..b8fb0a8c 100644 --- a/tests/unit_of_work/test_inline_event_processing.py +++ b/tests/unit_of_work/test_inline_event_processing.py @@ -93,13 +93,15 @@ def count_registrations(self, _: BaseEventHandler) -> None: @pytest.mark.eventstore def test_inline_event_processing_in_sync_mode(test_domain): test_domain.register(User) + test_domain.register(Register, part_of=User) test_domain.register(Registered, part_of=User) + test_domain.register(UserCommandHandler, part_of=User) test_domain.register(UserEventHandler, part_of=User) test_domain.register(UserMetrics, part_of=User) test_domain.init(traverse=False) identifier = str(uuid4()) - UserCommandHandler().register_user( + test_domain.process( Register( user_id=identifier, email="john.doe@example.com",