Skip to content

Commit

Permalink
Manage handlers by event/command type
Browse files Browse the repository at this point in the history
We were basing the logic on retrieving handlers on the fully
qualified name of the event or command class. This commit moves
the logic to be on the `__type__` attribute of event/command
classes.

This is one of the steps necessary to be able to handle events/
messages from other systems, where we may not have an event
registered in the domain. It is also necessary to be able to
support multiple versions of event and command classes.
  • Loading branch information
subhashb committed Jul 11, 2024
1 parent e4c8836 commit 5709fff
Show file tree
Hide file tree
Showing 32 changed files with 199 additions and 99 deletions.
8 changes: 4 additions & 4 deletions src/protean/adapters/event_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def handlers_for(self, event: BaseEvent) -> List[BaseEventHandler]:
)
configured_stream_handlers = set()
for stream_handler in stream_handlers:
if fqn(event.__class__) in stream_handler._handlers:
if event.__class__.__type__ in stream_handler._handlers:
configured_stream_handlers.add(stream_handler)

return set.union(configured_stream_handlers, all_stream_handlers)
Expand All @@ -129,7 +129,7 @@ def command_handler_for(self, command: BaseCommand) -> Optional[BaseCommandHandl
for handler_cls in handler_classes:
try:
handler_method = next(
iter(handler_cls._handlers[fqn(command.__class__)])
iter(handler_cls._handlers[command.__class__.__type__])
)
handler_methods.add((handler_cls, handler_method))
except StopIteration:
Expand All @@ -149,7 +149,7 @@ def last_event_of_type(
events = [
event
for event in self.domain.event_store.store._read(stream_name)
if event["type"] == fqn(event_cls)
if event["type"] == event_cls.__type__
]

return Message.from_dict(events[-1]).to_object() if len(events) > 0 else None
Expand All @@ -175,5 +175,5 @@ def events_of_type(
return [
Message.from_dict(event).to_object()
for event in self.domain.event_store.store._read(stream_name)
if event["type"] == fqn(event_cls)
if event["type"] == event_cls.__type__
]
6 changes: 5 additions & 1 deletion src/protean/adapters/event_store/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ def read(
if stream_name == "$all":
pass # Don't filter on stream name
elif self.is_category(stream_name):
q = q.filter(stream_name__contains=stream_name)
# If filtering on category, ensure the supplied stream name
# is the only thing in the category.
# Eg. If stream_name is 'user', then only 'user' should be in the category,
# and not even `user:command`
q = q.filter(stream_name__contains=f"{stream_name}-")
else:
q = q.filter(stream_name=stream_name)

Expand Down
2 changes: 1 addition & 1 deletion src/protean/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
ValidationError,
)
from protean.fields import Auto, Field, FieldBase, ValueObject
from protean.globals import current_domain
from protean.reflection import id_field
from protean.utils import generate_identity

Expand Down Expand Up @@ -416,6 +415,7 @@ def raise_(self, event, fact_event=False) -> None:
_metadata={
"id": (f"{stream_name}-{self._version}"),
"type": event._metadata.type,
"fqn": event._metadata.fqn,
"kind": event._metadata.kind,
"stream_name": stream_name,
"origin_stream_name": event._metadata.origin_stream_name,
Expand Down
14 changes: 13 additions & 1 deletion src/protean/core/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from protean.fields import Field, ValueObject
from protean.globals import g
from protean.reflection import _ID_FIELD_NAME, declared_fields, fields
from protean.utils import DomainObjects, derive_element_class
from protean.utils import DomainObjects, derive_element_class, fqn


class BaseCommand(BaseContainer, OptionsMixin):
Expand Down Expand Up @@ -58,6 +58,7 @@ def __init__(self, *args, **kwargs):
self._metadata = Metadata(
self._metadata.to_dict(), # Template
kind="COMMAND",
fqn=fqn(self.__class__),
origin_stream_name=origin_stream_name,
version=version,
)
Expand Down Expand Up @@ -116,6 +117,17 @@ def __track_id_field(subclass):
# No Identity fields declared
pass

def to_dict(self):
"""Return data as a dictionary.
We need to override this method in Command, because `to_dict()` of `BaseContainer`
eliminates `_metadata`.
"""
return {
field_name: field_obj.as_dict(getattr(self, field_name, None))
for field_name, field_obj in fields(self).items()
}


def command_factory(element_cls, domain, **opts):
element_cls = derive_element_class(element_cls, BaseCommand, **opts)
Expand Down
37 changes: 19 additions & 18 deletions src/protean/core/command_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from protean.container import Element, OptionsMixin
from protean.core.command import BaseCommand
from protean.exceptions import IncorrectUsageError, NotSupportedError
from protean.utils import DomainObjects, derive_element_class, fully_qualified_name
from protean.utils import DomainObjects, derive_element_class
from protean.utils.mixins import HandlerMixin


Expand Down Expand Up @@ -45,23 +45,6 @@ def command_handler_factory(element_cls, domain, **opts):
if not (
method_name.startswith("__") and method_name.endswith("__")
) and hasattr(method, "_target_cls"):
# Do not allow multiple handlers per command
if (
fully_qualified_name(method._target_cls) in element_cls._handlers
and len(
element_cls._handlers[fully_qualified_name(method._target_cls)]
)
!= 0
):
raise NotSupportedError(
f"Command {method._target_cls.__name__} cannot be handled by multiple handlers"
)

# `_handlers` maps the command to its handler method
element_cls._handlers[fully_qualified_name(method._target_cls)].add(
method
)

# Throw error if target_cls is not a Command
if not inspect.isclass(method._target_cls) or not issubclass(
method._target_cls, BaseCommand
Expand Down Expand Up @@ -96,6 +79,24 @@ def command_handler_factory(element_cls, domain, **opts):
}
)

command_type = (
method._target_cls.__type__
if issubclass(method._target_cls, BaseCommand)
else method._target_cls
)

# Do not allow multiple handlers per command
if (
command_type in element_cls._handlers
and len(element_cls._handlers[command_type]) != 0
):
raise NotSupportedError(
f"Command {method._target_cls.__name__} cannot be handled by multiple handlers"
)

# `_handlers` maps the command to its handler method
element_cls._handlers[command_type].add(method)

# Associate Command with the handler's stream
# Order of preference:
# 1. Stream name defined in command
Expand Down
1 change: 1 addition & 0 deletions src/protean/core/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ def raise_(self, event) -> None:
_metadata={
"id": (f"{stream_name}-{aggregate_version}.{event_number}"),
"type": event._metadata.type,
"fqn": event._metadata.fqn,
"kind": event._metadata.kind,
"stream_name": stream_name,
"origin_stream_name": event._metadata.origin_stream_name,
Expand Down
16 changes: 12 additions & 4 deletions src/protean/core/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,26 @@
from protean.fields import DateTime, Field, Integer, String, ValueObject
from protean.globals import g
from protean.reflection import _ID_FIELD_NAME, declared_fields, fields
from protean.utils import DomainObjects, derive_element_class
from protean.utils import DomainObjects, derive_element_class, fqn

logger = logging.getLogger(__name__)


class Metadata(BaseValueObject):
# Unique identifier of the Event
# Format is <domain-name>.<event-class-name>.<event-version>.<aggregate-id>.<aggregate-version>
# Unique identifier of the event/command
#
# FIXME Fix the format documentation
# Event Format is <domain-name>.<class-name>.<version>.<aggregate-id>.<aggregate-version>
# Command Format is <domain-name>.<class-name>.<version>
id = String()

# Type of the event
# Format is <domain-name>.<event-class-name>.<event-version>
type = String()

# Fully Qualified Name of the event/command
fqn = String()

# Kind of the object
# Can be one of "EVENT", "COMMAND"
kind = String()
Expand All @@ -40,9 +46,10 @@ class Metadata(BaseValueObject):
timestamp = DateTime(default=lambda: datetime.now(timezone.utc))

# Version of the event
# Can be overridden with `__version__` class attr in Event class definition
# Can be overridden with `__version__` class attr in event/command class definition
version = String(default="v1")

# Applies to Events only
# Sequence of the event in the aggregate
# This is the version of the aggregate as it will be *after* persistence.
#
Expand Down Expand Up @@ -146,6 +153,7 @@ def __init__(self, *args, **kwargs):
self._metadata.to_dict(), # Template from old Metadata
type=self.__class__.__type__,
kind="EVENT",
fqn=fqn(self.__class__),
origin_stream_name=origin_stream_name,
version=self.__class__.__version__, # Was set in `__init_subclass__`
)
Expand Down
11 changes: 8 additions & 3 deletions src/protean/core/event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import logging

from protean.container import Element, OptionsMixin
from protean.core.event import BaseEvent
from protean.exceptions import IncorrectUsageError, NotSupportedError
from protean.utils import DomainObjects, derive_element_class, fully_qualified_name
from protean.utils import DomainObjects, derive_element_class
from protean.utils.mixins import HandlerMixin

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -59,8 +60,12 @@ def event_handler_factory(element_cls, domain, **opts):
# can have only one `$any` handler method.
element_cls._handlers["$any"] = {method}
else:
element_cls._handlers[fully_qualified_name(method._target_cls)].add(
method
# Target could be an event or an event type string
event_type = (
method._target_cls.__type__
if issubclass(method._target_cls, BaseEvent)
else method._target_cls
)
element_cls._handlers[event_type].add(method)

return element_cls
4 changes: 2 additions & 2 deletions src/protean/core/unit_of_work.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
)
from protean.globals import _uow_context_stack, current_domain
from protean.reflection import id_field
from protean.utils import EventProcessing, fqn
from protean.utils import EventProcessing

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -90,7 +90,7 @@ def commit(self): # noqa: C901
handler_classes = current_domain.handlers_for(event)
for handler_cls in handler_classes:
handler_methods = (
handler_cls._handlers[fqn(event.__class__)]
handler_cls._handlers[event.__class__.__type__]
or handler_cls._handlers["$any"]
)

Expand Down
24 changes: 17 additions & 7 deletions src/protean/domain/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,7 @@ def publish(self, events: Union[BaseEvent, List[BaseEvent]]) -> None:
handler_classes = self.handlers_for(event)
for handler_cls in handler_classes:
handler_methods = (
handler_cls._handlers[fqn(event.__class__)]
handler_cls._handlers[event.__class__.__type__]
or handler_cls._handlers["$any"]
)

Expand Down Expand Up @@ -939,11 +939,9 @@ def _enrich_command(self, command: BaseCommand) -> BaseCommand:
command_with_metadata = command.__class__(
command.to_dict(),
_metadata={
"id": (str(uuid4())),
"type": (
f"{command.meta_.part_of.__class__.__name__}.{command.__class__.__name__}."
f"{command._metadata.version}"
),
"id": identifier, # FIXME Double check command ID format and construction
"type": command.__class__.__type__,
"fqn": command._metadata.fqn,
"kind": "EVENT",
"stream_name": stream_name,
"origin_stream_name": origin_stream_name,
Expand Down Expand Up @@ -976,6 +974,18 @@ def process(self, command: BaseCommand, asynchronous: bool = True) -> Optional[A
Returns:
Optional[Any]: Returns either the command handler's return value or nothing, based on preference.
"""
if (
fqn(command.__class__)
not in self.registry._elements[DomainObjects.COMMAND.value]
):
raise IncorrectUsageError(
{
"element": [
f"Element {command.__class__.__name__} is not registered in domain {self.name}"
]
}
)

command_with_metadata = self._enrich_command(command)
position = self.event_store.store.append(command_with_metadata)

Expand All @@ -986,7 +996,7 @@ def process(self, command: BaseCommand, asynchronous: bool = True) -> Optional[A
handler_class = self.command_handler_for(command)
if handler_class:
handler_method = next(
iter(handler_class._handlers[fqn(command.__class__)])
iter(handler_class._handlers[command.__class__.__type__])
)
handler_method(handler_class(), command)

Expand Down
9 changes: 4 additions & 5 deletions src/protean/utils/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from protean.core.unit_of_work import UnitOfWork
from protean.exceptions import ConfigurationError, InvalidDataError
from protean.globals import current_domain
from protean.utils import fully_qualified_name

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -82,9 +81,9 @@ def from_dict(cls, message: Dict) -> Message:

def to_object(self) -> Union[BaseEvent, BaseCommand]:
if self.metadata.kind == MessageType.EVENT.value:
element_record = current_domain.registry.events[self.type]
element_record = current_domain.registry.events[self.metadata.fqn]
elif self.metadata.kind == MessageType.COMMAND.value:
element_record = current_domain.registry.commands[self.type]
element_record = current_domain.registry.commands[self.metadata.fqn]
else:
raise InvalidDataError(
{"_message": ["Message type is not supported for deserialization"]}
Expand All @@ -110,7 +109,7 @@ def to_message(cls, message_object: Union[BaseEvent, BaseCommand]) -> Message:

return cls(
stream_name=message_object._metadata.stream_name,
type=fully_qualified_name(message_object.__class__),
type=message_object.__class__.__type__,
data=message_object.to_dict(),
metadata=message_object._metadata,
expected_version=expected_version,
Expand Down Expand Up @@ -162,7 +161,7 @@ def __init_subclass__(subclass) -> None:
@classmethod
def _handle(cls, message: Message) -> None:
# Use Event-specific handlers if available, or fallback on `$any` if defined
handlers = cls._handlers[message.type] or cls._handlers["$any"]
handlers = cls._handlers[message.metadata.type] or cls._handlers["$any"]

for handler_method in handlers:
handler_method(cls(), message.to_object())
Expand Down
4 changes: 3 additions & 1 deletion tests/adapters/broker/redis_broker/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ def test_event_message_structure(self, test_domain):
"metadata",
]
)
assert json_message["type"] == "redis_broker.elements.PersonAdded"
assert (
json_message["type"] == "Redis Broker Tests.PersonAdded.v1"
) # FIXME Normalize Domain Name
assert json_message["metadata"]["kind"] == "EVENT"


Expand Down
25 changes: 25 additions & 0 deletions tests/command/test_command_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from protean import BaseAggregate, BaseCommand
from protean.fields import Identifier, String
from protean.reflection import fields
from protean.utils import fqn


class User(BaseAggregate):
Expand Down Expand Up @@ -56,3 +57,27 @@ class Login(BaseCommand):

command = Login(user_id=str(uuid4()))
assert command._metadata.version == "v2"


def test_command_metadata(test_domain):
identifier = str(uuid4())
command = test_domain._enrich_command(Login(user_id=identifier))

assert (
command.to_dict()
== {
"_metadata": {
"id": f"{identifier}", # FIXME Double-check command identifier format and construction
"type": "Test.Login.v1",
"fqn": fqn(Login),
"kind": "COMMAND",
"stream_name": f"user:command-{identifier}",
"origin_stream_name": None,
"timestamp": str(command._metadata.timestamp),
"version": "v1",
"sequence_id": None,
"payload_hash": command._metadata.payload_hash,
},
"user_id": command.user_id,
}
)
Loading

0 comments on commit 5709fff

Please sign in to comment.