diff --git a/src/protean/adapters/event_store/__init__.py b/src/protean/adapters/event_store/__init__.py index fb2c3562..90b6a92d 100644 --- a/src/protean/adapters/event_store/__init__.py +++ b/src/protean/adapters/event_store/__init__.py @@ -104,7 +104,7 @@ def handlers_for(self, event: BaseEvent) -> List[BaseEventHandler]: ) configured_stream_handlers = set() for stream_handler in stream_handlers: - if fqn(event.__class__) in stream_handler._handlers: + if event.__class__.__type__ in stream_handler._handlers: configured_stream_handlers.add(stream_handler) return set.union(configured_stream_handlers, all_stream_handlers) @@ -129,7 +129,7 @@ def command_handler_for(self, command: BaseCommand) -> Optional[BaseCommandHandl for handler_cls in handler_classes: try: handler_method = next( - iter(handler_cls._handlers[fqn(command.__class__)]) + iter(handler_cls._handlers[command.__class__.__type__]) ) handler_methods.add((handler_cls, handler_method)) except StopIteration: @@ -149,7 +149,7 @@ def last_event_of_type( events = [ event for event in self.domain.event_store.store._read(stream_name) - if event["type"] == fqn(event_cls) + if event["type"] == event_cls.__type__ ] return Message.from_dict(events[-1]).to_object() if len(events) > 0 else None @@ -175,5 +175,5 @@ def events_of_type( return [ Message.from_dict(event).to_object() for event in self.domain.event_store.store._read(stream_name) - if event["type"] == fqn(event_cls) + if event["type"] == event_cls.__type__ ] diff --git a/src/protean/adapters/event_store/memory.py b/src/protean/adapters/event_store/memory.py index aa558b5c..677f705f 100644 --- a/src/protean/adapters/event_store/memory.py +++ b/src/protean/adapters/event_store/memory.py @@ -76,7 +76,11 @@ def read( if stream_name == "$all": pass # Don't filter on stream name elif self.is_category(stream_name): - q = q.filter(stream_name__contains=stream_name) + # If filtering on category, ensure the supplied stream name + # is the only thing in the category. + # Eg. If stream_name is 'user', then only 'user' should be in the category, + # and not even `user:command` + q = q.filter(stream_name__contains=f"{stream_name}-") else: q = q.filter(stream_name=stream_name) diff --git a/src/protean/container.py b/src/protean/container.py index 622a559f..fd7e7357 100644 --- a/src/protean/container.py +++ b/src/protean/container.py @@ -14,7 +14,6 @@ ValidationError, ) from protean.fields import Auto, Field, FieldBase, ValueObject -from protean.globals import current_domain from protean.reflection import id_field from protean.utils import generate_identity @@ -416,6 +415,7 @@ def raise_(self, event, fact_event=False) -> None: _metadata={ "id": (f"{stream_name}-{self._version}"), "type": event._metadata.type, + "fqn": event._metadata.fqn, "kind": event._metadata.kind, "stream_name": stream_name, "origin_stream_name": event._metadata.origin_stream_name, diff --git a/src/protean/core/command.py b/src/protean/core/command.py index 4fd7621e..10e26b34 100644 --- a/src/protean/core/command.py +++ b/src/protean/core/command.py @@ -9,7 +9,7 @@ from protean.fields import Field, ValueObject from protean.globals import g from protean.reflection import _ID_FIELD_NAME, declared_fields, fields -from protean.utils import DomainObjects, derive_element_class +from protean.utils import DomainObjects, derive_element_class, fqn class BaseCommand(BaseContainer, OptionsMixin): @@ -58,6 +58,7 @@ def __init__(self, *args, **kwargs): self._metadata = Metadata( self._metadata.to_dict(), # Template kind="COMMAND", + fqn=fqn(self.__class__), origin_stream_name=origin_stream_name, version=version, ) @@ -116,6 +117,17 @@ def __track_id_field(subclass): # No Identity fields declared pass + def to_dict(self): + """Return data as a dictionary. + + We need to override this method in Command, because `to_dict()` of `BaseContainer` + eliminates `_metadata`. + """ + return { + field_name: field_obj.as_dict(getattr(self, field_name, None)) + for field_name, field_obj in fields(self).items() + } + def command_factory(element_cls, domain, **opts): element_cls = derive_element_class(element_cls, BaseCommand, **opts) diff --git a/src/protean/core/command_handler.py b/src/protean/core/command_handler.py index b08a2cab..2d024bd9 100644 --- a/src/protean/core/command_handler.py +++ b/src/protean/core/command_handler.py @@ -3,7 +3,7 @@ from protean.container import Element, OptionsMixin from protean.core.command import BaseCommand from protean.exceptions import IncorrectUsageError, NotSupportedError -from protean.utils import DomainObjects, derive_element_class, fully_qualified_name +from protean.utils import DomainObjects, derive_element_class from protean.utils.mixins import HandlerMixin @@ -45,23 +45,6 @@ def command_handler_factory(element_cls, domain, **opts): if not ( method_name.startswith("__") and method_name.endswith("__") ) and hasattr(method, "_target_cls"): - # Do not allow multiple handlers per command - if ( - fully_qualified_name(method._target_cls) in element_cls._handlers - and len( - element_cls._handlers[fully_qualified_name(method._target_cls)] - ) - != 0 - ): - raise NotSupportedError( - f"Command {method._target_cls.__name__} cannot be handled by multiple handlers" - ) - - # `_handlers` maps the command to its handler method - element_cls._handlers[fully_qualified_name(method._target_cls)].add( - method - ) - # Throw error if target_cls is not a Command if not inspect.isclass(method._target_cls) or not issubclass( method._target_cls, BaseCommand @@ -96,6 +79,24 @@ def command_handler_factory(element_cls, domain, **opts): } ) + command_type = ( + method._target_cls.__type__ + if issubclass(method._target_cls, BaseCommand) + else method._target_cls + ) + + # Do not allow multiple handlers per command + if ( + command_type in element_cls._handlers + and len(element_cls._handlers[command_type]) != 0 + ): + raise NotSupportedError( + f"Command {method._target_cls.__name__} cannot be handled by multiple handlers" + ) + + # `_handlers` maps the command to its handler method + element_cls._handlers[command_type].add(method) + # Associate Command with the handler's stream # Order of preference: # 1. Stream name defined in command diff --git a/src/protean/core/entity.py b/src/protean/core/entity.py index 899d03e9..1acc7fd3 100644 --- a/src/protean/core/entity.py +++ b/src/protean/core/entity.py @@ -469,6 +469,7 @@ def raise_(self, event) -> None: _metadata={ "id": (f"{stream_name}-{aggregate_version}.{event_number}"), "type": event._metadata.type, + "fqn": event._metadata.fqn, "kind": event._metadata.kind, "stream_name": stream_name, "origin_stream_name": event._metadata.origin_stream_name, diff --git a/src/protean/core/event.py b/src/protean/core/event.py index 84e583f7..947b57d2 100644 --- a/src/protean/core/event.py +++ b/src/protean/core/event.py @@ -12,20 +12,26 @@ from protean.fields import DateTime, Field, Integer, String, ValueObject from protean.globals import g from protean.reflection import _ID_FIELD_NAME, declared_fields, fields -from protean.utils import DomainObjects, derive_element_class +from protean.utils import DomainObjects, derive_element_class, fqn logger = logging.getLogger(__name__) class Metadata(BaseValueObject): - # Unique identifier of the Event - # Format is .... + # Unique identifier of the event/command + # + # FIXME Fix the format documentation + # Event Format is .... + # Command Format is .. id = String() # Type of the event # Format is .. type = String() + # Fully Qualified Name of the event/command + fqn = String() + # Kind of the object # Can be one of "EVENT", "COMMAND" kind = String() @@ -40,9 +46,10 @@ class Metadata(BaseValueObject): timestamp = DateTime(default=lambda: datetime.now(timezone.utc)) # Version of the event - # Can be overridden with `__version__` class attr in Event class definition + # Can be overridden with `__version__` class attr in event/command class definition version = String(default="v1") + # Applies to Events only # Sequence of the event in the aggregate # This is the version of the aggregate as it will be *after* persistence. # @@ -146,6 +153,7 @@ def __init__(self, *args, **kwargs): self._metadata.to_dict(), # Template from old Metadata type=self.__class__.__type__, kind="EVENT", + fqn=fqn(self.__class__), origin_stream_name=origin_stream_name, version=self.__class__.__version__, # Was set in `__init_subclass__` ) diff --git a/src/protean/core/event_handler.py b/src/protean/core/event_handler.py index 56560775..c3b3cfbd 100644 --- a/src/protean/core/event_handler.py +++ b/src/protean/core/event_handler.py @@ -2,8 +2,9 @@ import logging from protean.container import Element, OptionsMixin +from protean.core.event import BaseEvent from protean.exceptions import IncorrectUsageError, NotSupportedError -from protean.utils import DomainObjects, derive_element_class, fully_qualified_name +from protean.utils import DomainObjects, derive_element_class from protean.utils.mixins import HandlerMixin logger = logging.getLogger(__name__) @@ -59,8 +60,12 @@ def event_handler_factory(element_cls, domain, **opts): # can have only one `$any` handler method. element_cls._handlers["$any"] = {method} else: - element_cls._handlers[fully_qualified_name(method._target_cls)].add( - method + # Target could be an event or an event type string + event_type = ( + method._target_cls.__type__ + if issubclass(method._target_cls, BaseEvent) + else method._target_cls ) + element_cls._handlers[event_type].add(method) return element_cls diff --git a/src/protean/core/unit_of_work.py b/src/protean/core/unit_of_work.py index 50151fc0..0ce03109 100644 --- a/src/protean/core/unit_of_work.py +++ b/src/protean/core/unit_of_work.py @@ -8,7 +8,7 @@ ) from protean.globals import _uow_context_stack, current_domain from protean.reflection import id_field -from protean.utils import EventProcessing, fqn +from protean.utils import EventProcessing logger = logging.getLogger(__name__) @@ -90,7 +90,7 @@ def commit(self): # noqa: C901 handler_classes = current_domain.handlers_for(event) for handler_cls in handler_classes: handler_methods = ( - handler_cls._handlers[fqn(event.__class__)] + handler_cls._handlers[event.__class__.__type__] or handler_cls._handlers["$any"] ) diff --git a/src/protean/domain/__init__.py b/src/protean/domain/__init__.py index d665f11c..cb8ab43f 100644 --- a/src/protean/domain/__init__.py +++ b/src/protean/domain/__init__.py @@ -910,7 +910,7 @@ def publish(self, events: Union[BaseEvent, List[BaseEvent]]) -> None: handler_classes = self.handlers_for(event) for handler_cls in handler_classes: handler_methods = ( - handler_cls._handlers[fqn(event.__class__)] + handler_cls._handlers[event.__class__.__type__] or handler_cls._handlers["$any"] ) @@ -939,11 +939,9 @@ def _enrich_command(self, command: BaseCommand) -> BaseCommand: command_with_metadata = command.__class__( command.to_dict(), _metadata={ - "id": (str(uuid4())), - "type": ( - f"{command.meta_.part_of.__class__.__name__}.{command.__class__.__name__}." - f"{command._metadata.version}" - ), + "id": identifier, # FIXME Double check command ID format and construction + "type": command.__class__.__type__, + "fqn": command._metadata.fqn, "kind": "EVENT", "stream_name": stream_name, "origin_stream_name": origin_stream_name, @@ -976,6 +974,18 @@ def process(self, command: BaseCommand, asynchronous: bool = True) -> Optional[A Returns: Optional[Any]: Returns either the command handler's return value or nothing, based on preference. """ + if ( + fqn(command.__class__) + not in self.registry._elements[DomainObjects.COMMAND.value] + ): + raise IncorrectUsageError( + { + "element": [ + f"Element {command.__class__.__name__} is not registered in domain {self.name}" + ] + } + ) + command_with_metadata = self._enrich_command(command) position = self.event_store.store.append(command_with_metadata) @@ -986,7 +996,7 @@ def process(self, command: BaseCommand, asynchronous: bool = True) -> Optional[A handler_class = self.command_handler_for(command) if handler_class: handler_method = next( - iter(handler_class._handlers[fqn(command.__class__)]) + iter(handler_class._handlers[command.__class__.__type__]) ) handler_method(handler_class(), command) diff --git a/src/protean/utils/mixins.py b/src/protean/utils/mixins.py index aaa2968f..1bf0fc04 100644 --- a/src/protean/utils/mixins.py +++ b/src/protean/utils/mixins.py @@ -13,7 +13,6 @@ from protean.core.unit_of_work import UnitOfWork from protean.exceptions import ConfigurationError, InvalidDataError from protean.globals import current_domain -from protean.utils import fully_qualified_name logger = logging.getLogger(__name__) @@ -82,9 +81,9 @@ def from_dict(cls, message: Dict) -> Message: def to_object(self) -> Union[BaseEvent, BaseCommand]: if self.metadata.kind == MessageType.EVENT.value: - element_record = current_domain.registry.events[self.type] + element_record = current_domain.registry.events[self.metadata.fqn] elif self.metadata.kind == MessageType.COMMAND.value: - element_record = current_domain.registry.commands[self.type] + element_record = current_domain.registry.commands[self.metadata.fqn] else: raise InvalidDataError( {"_message": ["Message type is not supported for deserialization"]} @@ -110,7 +109,7 @@ def to_message(cls, message_object: Union[BaseEvent, BaseCommand]) -> Message: return cls( stream_name=message_object._metadata.stream_name, - type=fully_qualified_name(message_object.__class__), + type=message_object.__class__.__type__, data=message_object.to_dict(), metadata=message_object._metadata, expected_version=expected_version, @@ -162,7 +161,7 @@ def __init_subclass__(subclass) -> None: @classmethod def _handle(cls, message: Message) -> None: # Use Event-specific handlers if available, or fallback on `$any` if defined - handlers = cls._handlers[message.type] or cls._handlers["$any"] + handlers = cls._handlers[message.metadata.type] or cls._handlers["$any"] for handler_method in handlers: handler_method(cls(), message.to_object()) diff --git a/tests/adapters/broker/redis_broker/tests.py b/tests/adapters/broker/redis_broker/tests.py index 3838a9be..9a5e6b7e 100644 --- a/tests/adapters/broker/redis_broker/tests.py +++ b/tests/adapters/broker/redis_broker/tests.py @@ -60,7 +60,9 @@ def test_event_message_structure(self, test_domain): "metadata", ] ) - assert json_message["type"] == "redis_broker.elements.PersonAdded" + assert ( + json_message["type"] == "Redis Broker Tests.PersonAdded.v1" + ) # FIXME Normalize Domain Name assert json_message["metadata"]["kind"] == "EVENT" diff --git a/tests/aggregate/events/test_aggregate_streams.py b/tests/aggregate/events/test_aggregate_event_streams.py similarity index 100% rename from tests/aggregate/events/test_aggregate_streams.py rename to tests/aggregate/events/test_aggregate_event_streams.py diff --git a/tests/command/test_command_metadata.py b/tests/command/test_command_metadata.py index 4d594a2c..60783b6c 100644 --- a/tests/command/test_command_metadata.py +++ b/tests/command/test_command_metadata.py @@ -5,6 +5,7 @@ from protean import BaseAggregate, BaseCommand from protean.fields import Identifier, String from protean.reflection import fields +from protean.utils import fqn class User(BaseAggregate): @@ -56,3 +57,27 @@ class Login(BaseCommand): command = Login(user_id=str(uuid4())) assert command._metadata.version == "v2" + + +def test_command_metadata(test_domain): + identifier = str(uuid4()) + command = test_domain._enrich_command(Login(user_id=identifier)) + + assert ( + command.to_dict() + == { + "_metadata": { + "id": f"{identifier}", # FIXME Double-check command identifier format and construction + "type": "Test.Login.v1", + "fqn": fqn(Login), + "kind": "COMMAND", + "stream_name": f"user:command-{identifier}", + "origin_stream_name": None, + "timestamp": str(command._metadata.timestamp), + "version": "v1", + "sequence_id": None, + "payload_hash": command._metadata.payload_hash, + }, + "user_id": command.user_id, + } + ) diff --git a/tests/command_handler/test_handle_decorator_in_command_handlers.py b/tests/command_handler/test_handle_decorator_in_command_handlers.py index 3c5a6e85..26a97e66 100644 --- a/tests/command_handler/test_handle_decorator_in_command_handlers.py +++ b/tests/command_handler/test_handle_decorator_in_command_handlers.py @@ -4,7 +4,6 @@ from protean.core.command_handler import BaseCommandHandler from protean.exceptions import NotSupportedError from protean.fields import Identifier, String -from protean.utils import fully_qualified_name class User(BaseAggregate): @@ -32,7 +31,7 @@ def register(self, command: Register) -> None: test_domain.register(Register, part_of=User) test_domain.register(UserCommandHandlers, part_of=User) - assert fully_qualified_name(Register) in UserCommandHandlers._handlers + assert Register.__type__ in UserCommandHandlers._handlers def test_that_multiple_handlers_can_be_recorded_against_command_handler(test_domain): @@ -55,19 +54,19 @@ def update_billing_address(self, event: ChangeAddress) -> None: assert all( handle_name in UserCommandHandlers._handlers for handle_name in [ - fully_qualified_name(Register), - fully_qualified_name(ChangeAddress), + Register.__type__, + ChangeAddress.__type__, ] ) - assert len(UserCommandHandlers._handlers[fully_qualified_name(Register)]) == 1 - assert len(UserCommandHandlers._handlers[fully_qualified_name(ChangeAddress)]) == 1 + assert len(UserCommandHandlers._handlers[Register.__type__]) == 1 + assert len(UserCommandHandlers._handlers[ChangeAddress.__type__]) == 1 assert ( - next(iter(UserCommandHandlers._handlers[fully_qualified_name(Register)])) + next(iter(UserCommandHandlers._handlers[Register.__type__])) == UserCommandHandlers.register ) assert ( - next(iter(UserCommandHandlers._handlers[fully_qualified_name(ChangeAddress)])) + next(iter(UserCommandHandlers._handlers[ChangeAddress.__type__])) == UserCommandHandlers.update_billing_address ) diff --git a/tests/command_handler/test_inline_command_processing.py b/tests/command_handler/test_inline_command_processing.py index baf3ed32..f7da4f4e 100644 --- a/tests/command_handler/test_inline_command_processing.py +++ b/tests/command_handler/test_inline_command_processing.py @@ -1,7 +1,10 @@ from uuid import uuid4 +import pytest + from protean import BaseAggregate, BaseCommand, handle from protean.core.command_handler import BaseCommandHandler +from protean.exceptions import IncorrectUsageError from protean.fields import Identifier, String from protean.utils import CommandProcessing @@ -19,6 +22,10 @@ class Register(BaseCommand): email = String() +class Login(BaseCommand): + user_id = Identifier() + + class UserCommandHandlers(BaseCommandHandler): @handle(Register) def register(self, event: Register) -> None: @@ -26,24 +33,32 @@ def register(self, event: Register) -> None: counter += 1 -def test_that_command_can_be_processed_inline(test_domain): +@pytest.fixture(autouse=True) +def register(test_domain): test_domain.register(User) test_domain.register(Register, part_of=User) test_domain.register(UserCommandHandlers, part_of=User) test_domain.init(traverse=False) - assert test_domain.config["command_processing"] == CommandProcessing.SYNC.value - test_domain.process(Register(user_id=str(uuid4()), email="john.doe@gmail.com")) - assert counter == 1 +def test_unregistered_command_raises_error(test_domain): + with pytest.raises(IncorrectUsageError): + test_domain.process(Login(user_id=str(uuid4()))) -def test_that_command_is_persisted_in_message_store(test_domain): +def test_that_command_can_be_processed_inline(test_domain): test_domain.register(User) test_domain.register(Register, part_of=User) test_domain.register(UserCommandHandlers, part_of=User) test_domain.init(traverse=False) + assert test_domain.config["command_processing"] == CommandProcessing.SYNC.value + + test_domain.process(Register(user_id=str(uuid4()), email="john.doe@gmail.com")) + assert counter == 1 + + +def test_that_command_is_persisted_in_message_store(test_domain): identifier = str(uuid4()) test_domain.process(Register(user_id=identifier, email="john.doe@gmail.com")) diff --git a/tests/command_handler/test_retrieving_handlers_by_command.py b/tests/command_handler/test_retrieving_handlers_by_command.py index 3e4962f2..c08e9013 100644 --- a/tests/command_handler/test_retrieving_handlers_by_command.py +++ b/tests/command_handler/test_retrieving_handlers_by_command.py @@ -78,7 +78,7 @@ def test_for_no_errors_when_no_handler_method_has_not_been_defined_for_a_command test_domain.register(UserCommandHandlers, part_of=User) test_domain.init(traverse=False) - assert test_domain.command_handler_for(ChangeAddress) is None + assert test_domain.command_handler_for(ChangeAddress()) is None def test_retrieving_handlers_for_unknown_command(test_domain): diff --git a/tests/event/test_event_metadata.py b/tests/event/test_event_metadata.py index 4d90cd7f..b6f9c5ce 100644 --- a/tests/event/test_event_metadata.py +++ b/tests/event/test_event_metadata.py @@ -7,6 +7,7 @@ from protean.fields import String, ValueObject from protean.fields.basic import Identifier from protean.reflection import fields +from protean.utils import fqn class User(BaseEventSourcedAggregate): @@ -102,6 +103,7 @@ def test_event_metadata(): "_metadata": { "id": f"user-{user.id}-0", "type": "Test.UserLoggedIn.v1", + "fqn": fqn(UserLoggedIn), "kind": "EVENT", "stream_name": f"user-{user.id}", "origin_stream_name": None, diff --git a/tests/event/test_event_payload.py b/tests/event/test_event_payload.py index 4ff0341e..2bf83ba0 100644 --- a/tests/event/test_event_payload.py +++ b/tests/event/test_event_payload.py @@ -5,6 +5,7 @@ from protean import BaseEvent, BaseEventSourcedAggregate from protean.fields import String from protean.fields.basic import Identifier +from protean.utils import fqn class User(BaseEventSourcedAggregate): @@ -38,6 +39,7 @@ def test_event_payload(): "_metadata": { "id": f"user-{user_id}-0", "type": "Test.UserLoggedIn.v1", + "fqn": fqn(UserLoggedIn), "kind": "EVENT", "stream_name": f"user-{user_id}", "origin_stream_name": None, diff --git a/tests/event/tests.py b/tests/event/tests.py index 0eb37942..759c2271 100644 --- a/tests/event/tests.py +++ b/tests/event/tests.py @@ -58,6 +58,7 @@ class UserAdded(BaseEvent): "_metadata": { "id": None, # ID is none because the event is not being raised in the proper way (with `_raise`) "type": "Test.UserAdded.v1", + "fqn": fully_qualified_name(UserAdded), "kind": "EVENT", "stream_name": None, # Type is none here because of the same reason as above "origin_stream_name": None, diff --git a/tests/event_handler/test_handle_decorator_in_event_handlers.py b/tests/event_handler/test_handle_decorator_in_event_handlers.py index ba0e308e..068726af 100644 --- a/tests/event_handler/test_handle_decorator_in_event_handlers.py +++ b/tests/event_handler/test_handle_decorator_in_event_handlers.py @@ -2,7 +2,6 @@ from protean import BaseAggregate, BaseEvent, BaseEventHandler, handle from protean.fields import Identifier, String -from protean.utils import fully_qualified_name class User(BaseAggregate): @@ -27,9 +26,10 @@ def send_email_notification(self, event: Registered) -> None: pass test_domain.register(User) + test_domain.register(Registered, part_of=User) test_domain.register(UserEventHandlers, part_of=User) - assert fully_qualified_name(Registered) in UserEventHandlers._handlers + assert Registered.__type__ in UserEventHandlers._handlers def test_that_multiple_handlers_can_be_recorded_against_event_handler(test_domain): @@ -43,25 +43,27 @@ def updated_billing_address(self, event: AddressChanged) -> None: pass test_domain.register(User) + test_domain.register(Registered, part_of=User) + test_domain.register(AddressChanged, part_of=User) test_domain.register(UserEventHandlers, part_of=User) assert len(UserEventHandlers._handlers) == 2 assert all( handle_name in UserEventHandlers._handlers for handle_name in [ - fully_qualified_name(Registered), - fully_qualified_name(AddressChanged), + Registered.__type__, + AddressChanged.__type__, ] ) - assert len(UserEventHandlers._handlers[fully_qualified_name(Registered)]) == 1 - assert len(UserEventHandlers._handlers[fully_qualified_name(AddressChanged)]) == 1 + assert len(UserEventHandlers._handlers[Registered.__type__]) == 1 + assert len(UserEventHandlers._handlers[AddressChanged.__type__]) == 1 assert ( - next(iter(UserEventHandlers._handlers[fully_qualified_name(Registered)])) + next(iter(UserEventHandlers._handlers[Registered.__type__])) == UserEventHandlers.send_email_notification ) assert ( - next(iter(UserEventHandlers._handlers[fully_qualified_name(AddressChanged)])) + next(iter(UserEventHandlers._handlers[AddressChanged.__type__])) == UserEventHandlers.updated_billing_address ) @@ -81,9 +83,7 @@ def provision_user_accounts(self, event: Registered) -> None: assert len(UserEventHandlers._handlers) == 1 # Against Registered Event - handlers_for_registered = UserEventHandlers._handlers[ - fully_qualified_name(Registered) - ] + handlers_for_registered = UserEventHandlers._handlers[Registered.__type__] assert len(handlers_for_registered) == 2 assert all( handler_method in handlers_for_registered diff --git a/tests/event_sourced_aggregates/test_raising_events_from_within_aggregates.py b/tests/event_sourced_aggregates/test_raising_events_from_within_aggregates.py index 92460ee3..72864a13 100644 --- a/tests/event_sourced_aggregates/test_raising_events_from_within_aggregates.py +++ b/tests/event_sourced_aggregates/test_raising_events_from_within_aggregates.py @@ -9,11 +9,10 @@ from protean.core.event_sourced_aggregate import apply from protean.fields import Identifier, String from protean.globals import current_domain -from protean.utils import fqn class Register(BaseCommand): - id = Identifier() + id = Identifier(identifier=True) email = String() name = String() password_hash = String() @@ -75,7 +74,7 @@ def register_elements(test_domain): @pytest.mark.eventstore def test_that_events_can_be_raised_from_within_aggregates(test_domain): identifier = str(uuid4()) - UserCommandHandler().register_user( + test_domain.process( Register( id=identifier, email="john.doe@example.com", @@ -88,4 +87,10 @@ def test_that_events_can_be_raised_from_within_aggregates(test_domain): assert len(messages) == 1 assert messages[0]["stream_name"] == f"user-{identifier}" - assert messages[0]["type"] == f"{fqn(Registered)}" + assert messages[0]["type"] == Registered.__type__ + + messages = test_domain.event_store.store._read("user:command") + + assert len(messages) == 1 + assert messages[0]["stream_name"] == f"user:command-{identifier}" + assert messages[0]["type"] == Register.__type__ diff --git a/tests/event_sourced_aggregates/test_raising_multiple_events_for_one_aggregate_in_a_uow.py b/tests/event_sourced_aggregates/test_raising_multiple_events_for_one_aggregate_in_a_uow.py index 8a75fe1c..c3548e80 100644 --- a/tests/event_sourced_aggregates/test_raising_multiple_events_for_one_aggregate_in_a_uow.py +++ b/tests/event_sourced_aggregates/test_raising_multiple_events_for_one_aggregate_in_a_uow.py @@ -14,7 +14,6 @@ ) from protean.fields import Identifier, String from protean.globals import current_domain -from protean.utils import fqn class Register(BaseCommand): @@ -100,6 +99,6 @@ def test_that_multiple_events_are_raised_per_aggregate_in_the_same_uow(test_doma messages = test_domain.event_store.store._read("user") assert len(messages) == 3 - assert messages[0]["type"] == f"{fqn(Registered)}" - assert messages[1]["type"] == f"{fqn(Renamed)}" - assert messages[2]["type"] == f"{fqn(Renamed)}" + assert messages[0]["type"] == Registered.__type__ + assert messages[1]["type"] == Renamed.__type__ + assert messages[2]["type"] == Renamed.__type__ diff --git a/tests/event_store/test_reading_messages.py b/tests/event_store/test_reading_messages.py index 38d58281..999a43b9 100644 --- a/tests/event_store/test_reading_messages.py +++ b/tests/event_store/test_reading_messages.py @@ -7,7 +7,6 @@ from protean import BaseEvent, BaseEventSourcedAggregate from protean.fields import String from protean.fields.basic import Identifier -from protean.utils import fqn from protean.utils.mixins import Message @@ -136,5 +135,5 @@ def test_reading_messages_by_category(test_domain, activated_user): def test_reading_last_message(test_domain, renamed_user): # Reading by stream message = test_domain.event_store.store.read_last_message(f"user-{renamed_user.id}") - assert message.type == fqn(Renamed) + assert message.type == Renamed.__type__ assert message.data["name"] == "John Doe 9" diff --git a/tests/event_store/test_streams_initialization.py b/tests/event_store/test_streams_initialization.py index 03b5ff10..2b648a96 100644 --- a/tests/event_store/test_streams_initialization.py +++ b/tests/event_store/test_streams_initialization.py @@ -57,7 +57,10 @@ def record_sent_email(self, event: Sent) -> None: @pytest.fixture(autouse=True) def register(test_domain): test_domain.register(User) + test_domain.register(Registered, part_of=User) + test_domain.register(Activated, part_of=User) test_domain.register(Email) + test_domain.register(Sent, part_of=Email) test_domain.register(UserEventHandler, part_of=User) test_domain.register(EmailEventHandler, part_of=Email) test_domain.init(traverse=False) diff --git a/tests/message/test_object_to_message.py b/tests/message/test_object_to_message.py index c208a464..b8b6c7f7 100644 --- a/tests/message/test_object_to_message.py +++ b/tests/message/test_object_to_message.py @@ -5,7 +5,6 @@ from protean import BaseCommand, BaseEvent, BaseEventSourcedAggregate from protean.exceptions import ConfigurationError from protean.fields import Identifier, String -from protean.utils import fully_qualified_name from protean.utils.mixins import Message @@ -64,7 +63,7 @@ def test_construct_message_from_event(test_domain): assert type(message) is Message # Verify Message Content - assert message.type == fully_qualified_name(Registered) + assert message.type == Registered.__type__ assert message.stream_name == f"{User.meta_.stream_name}-{identifier}" assert message.metadata.kind == "EVENT" assert message.data == user._events[-1].to_dict() @@ -74,7 +73,7 @@ def test_construct_message_from_event(test_domain): # Verify Message Dict message_dict = message.to_dict() - assert message_dict["type"] == fully_qualified_name(Registered) + assert message_dict["type"] == Registered.__type__ assert message_dict["metadata"]["kind"] == "EVENT" assert message_dict["stream_name"] == f"{User.meta_.stream_name}-{identifier}" assert message_dict["data"] == user._events[-1].to_dict() @@ -87,6 +86,7 @@ def test_construct_message_from_event(test_domain): def test_construct_message_from_command(test_domain): identifier = str(uuid4()) command = Register(id=identifier, email="john.doe@gmail.com", name="John Doe") + command_with_metadata = test_domain._enrich_command(command) test_domain.process(command) messages = test_domain.event_store.store.read("user:command") @@ -96,20 +96,20 @@ def test_construct_message_from_command(test_domain): assert type(message) is Message # Verify Message Content - assert message.type == fully_qualified_name(Register) + assert message.type == Register.__type__ assert message.stream_name == f"{User.meta_.stream_name}:command-{identifier}" assert message.metadata.kind == "COMMAND" - assert message.data == command.to_dict() + assert message.data == command_with_metadata.to_dict() assert message.time is not None # Verify Message Dict message_dict = message.to_dict() - assert message_dict["type"] == fully_qualified_name(Register) + assert message_dict["type"] == Register.__type__ assert message_dict["metadata"]["kind"] == "COMMAND" assert ( message_dict["stream_name"] == f"{User.meta_.stream_name}:command-{identifier}" ) - assert message_dict["data"] == command.to_dict() + assert message_dict["data"] == command_with_metadata.to_dict() assert message_dict["time"] is not None @@ -147,7 +147,7 @@ def test_construct_message_from_either_event_or_command(test_domain): assert type(message) is Message # Verify Message Content - assert message.type == fully_qualified_name(Register) + assert message.type == Register.__type__ assert message.stream_name == f"{User.meta_.stream_name}:command-{identifier}" assert message.metadata.kind == "COMMAND" assert message.data == command.to_dict() @@ -163,7 +163,7 @@ def test_construct_message_from_either_event_or_command(test_domain): assert type(message) is Message # Verify Message Content - assert message.type == fully_qualified_name(Registered) + assert message.type == Registered.__type__ assert message.stream_name == f"{User.meta_.stream_name}-{identifier}" assert message.metadata.kind == "EVENT" assert message.data == event.to_dict() diff --git a/tests/server/test_command_handling.py b/tests/server/test_command_handling.py index bc532241..df1fba98 100644 --- a/tests/server/test_command_handling.py +++ b/tests/server/test_command_handling.py @@ -52,7 +52,8 @@ async def test_handler_invocation(test_domain): user_id=identifier, email="john.doe@example.com", ) - message = Message.to_message(command) + enriched_command = test_domain._enrich_command(command) + message = Message.to_message(enriched_command) engine = Engine(domain=test_domain, test_mode=True) await engine.handle_message(UserCommandHandler, message) diff --git a/tests/server/test_event_handler_subscription.py b/tests/server/test_event_handler_subscription.py index 3180b8f9..a503858c 100644 --- a/tests/server/test_event_handler_subscription.py +++ b/tests/server/test_event_handler_subscription.py @@ -73,6 +73,8 @@ def setup_event_loop(): def test_event_subscriptions(test_domain): test_domain.register(User) + test_domain.register(Registered, part_of=User) + test_domain.register(Activated, part_of=User) test_domain.register(UserEventHandler, part_of=User) engine = Engine(test_domain, test_mode=True) @@ -83,6 +85,7 @@ def test_event_subscriptions(test_domain): def test_origin_stream_name_in_subscription(test_domain): test_domain.register(User) + test_domain.register(Sent, part_of=User) test_domain.register(EmailEventHandler, part_of=User, source_stream="email") engine = Engine(test_domain, test_mode=True) diff --git a/tests/subscription/test_message_filtering_with_origin_stream.py b/tests/subscription/test_message_filtering_with_origin_stream.py index be3595c7..6e67f55d 100644 --- a/tests/subscription/test_message_filtering_with_origin_stream.py +++ b/tests/subscription/test_message_filtering_with_origin_stream.py @@ -113,4 +113,4 @@ async def test_message_filtering_for_event_handlers_with_defined_origin_stream( ) assert len(filtered_messages) == 1 - assert filtered_messages[0].type == fqn(Sent) + assert filtered_messages[0].type == Sent.__type__ diff --git a/tests/subscription/test_no_message_filtering.py b/tests/subscription/test_no_message_filtering.py index fffa869d..7065cef5 100644 --- a/tests/subscription/test_no_message_filtering.py +++ b/tests/subscription/test_no_message_filtering.py @@ -112,5 +112,5 @@ async def test_no_filtering_for_event_handlers_without_defined_origin_stream( ) assert len(filtered_messages) == 3 - assert filtered_messages[0].type == fqn(Registered) - assert filtered_messages[2].type == fqn(Sent) + assert filtered_messages[0].type == Registered.__type__ + assert filtered_messages[2].type == Sent.__type__ diff --git a/tests/test_commands.py b/tests/test_commands.py index b37aa540..9b7ac0eb 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -15,7 +15,7 @@ class User(BaseAggregate): class UserRegistrationCommand(BaseCommand): - email = String(required=True, max_length=250) + email = String(required=True, identifier=True, max_length=250) username = String(required=True, max_length=50) password = String(required=True, max_length=255) age = Integer(default=21) @@ -83,7 +83,9 @@ def test_two_commands_with_equal_values_are_considered_equal(self): email="john.doe@gmail.com", username="john.doe", password="secret1!" ) - assert command1 == command2 + # The commands themselves will not be equal because their metadata, like + # timestamp, can differ. But their payloads should be equal + assert command1.payload == command2.payload def test_that_commands_are_immutable(self): command = UserRegistrationCommand( @@ -96,7 +98,7 @@ def test_output_to_dict(self): command = UserRegistrationCommand( email="john.doe@gmail.com", username="john.doe", password="secret1!" ) - assert command.to_dict() == { + assert command.payload == { "email": "john.doe@gmail.com", "username": "john.doe", "password": "secret1!", diff --git a/tests/unit_of_work/test_inline_event_processing.py b/tests/unit_of_work/test_inline_event_processing.py index f54eeca0..b8fb0a8c 100644 --- a/tests/unit_of_work/test_inline_event_processing.py +++ b/tests/unit_of_work/test_inline_event_processing.py @@ -93,13 +93,15 @@ def count_registrations(self, _: BaseEventHandler) -> None: @pytest.mark.eventstore def test_inline_event_processing_in_sync_mode(test_domain): test_domain.register(User) + test_domain.register(Register, part_of=User) test_domain.register(Registered, part_of=User) + test_domain.register(UserCommandHandler, part_of=User) test_domain.register(UserEventHandler, part_of=User) test_domain.register(UserMetrics, part_of=User) test_domain.init(traverse=False) identifier = str(uuid4()) - UserCommandHandler().register_user( + test_domain.process( Register( user_id=identifier, email="john.doe@example.com",