Skip to content

Commit

Permalink
Optimize fetching handlers for events
Browse files Browse the repository at this point in the history
The code to associate events to streams was incorrect because it was considering
handler methods (in event handlers that could belong to a different aggregate)
to associate streams with events. This commit fixes the bug and optimizes how
handlers for a specific event are fetched.
  • Loading branch information
subhashb committed Jun 19, 2024
1 parent eb2aba2 commit 7486072
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 32 deletions.
10 changes: 8 additions & 2 deletions src/protean/adapters/event_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,21 @@ def handlers_for(self, event: BaseEvent) -> List[BaseEventHandler]:

all_stream_handlers = self._event_streams.get("$all", set())

# Take the Aggregate's stream_name
# Get the Aggregate's stream_name
aggregate_stream_name = None
if event.meta_.aggregate_cluster:
aggregate_stream_name = event.meta_.aggregate_cluster.meta_.stream_name

stream_name = event.meta_.stream_name or aggregate_stream_name
stream_handlers = self._event_streams.get(stream_name, set())

return set.union(stream_handlers, all_stream_handlers)
# Gather all handlers that are configured to run on this event
configured_stream_handlers = set()
for stream_handler in stream_handlers:
if fqn(event.__class__) in stream_handler._handlers:
configured_stream_handlers.add(stream_handler)

return set.union(configured_stream_handlers, all_stream_handlers)

def command_handler_for(self, command: BaseCommand) -> Optional[BaseCommandHandler]:
if self._command_streams is None:
Expand Down
13 changes: 12 additions & 1 deletion src/protean/core/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,21 @@ def __setattr__(self, name, value):

@classmethod
def _default_options(cls):
part_of = (
getattr(cls.meta_, "part_of") if hasattr(cls.meta_, "part_of") else None
)

# This method is called during class import, so we cannot use part_of if it
# is still a string. We ignore it for now, and resolve `stream_name` in
# the factory after the domain has resolved references.
# FIXME A better mechanism would be to not set stream_name here, unless explicitly
# specified, and resolve it during `domain.init()`
part_of = None if isinstance(part_of, str) else part_of

return [
("abstract", False),
("part_of", None),
("stream_name", None),
("stream_name", part_of.meta_.stream_name if part_of else None),
("aggregate_cluster", None),
]

Expand Down
18 changes: 0 additions & 18 deletions src/protean/core/event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import logging

from protean.container import Element, OptionsMixin
from protean.core.event import BaseEvent
from protean.exceptions import IncorrectUsageError, NotSupportedError
from protean.utils import DomainObjects, derive_element_class, fully_qualified_name
from protean.utils.mixins import HandlerMixin
Expand Down Expand Up @@ -64,21 +63,4 @@ def event_handler_factory(element_cls, **opts):
method
)

# Associate Event with the handler's stream
if inspect.isclass(method._target_cls) and issubclass(
method._target_cls, BaseEvent
):
# Order of preference:
# 1. Stream name defined in event
# 2. Stream name defined for the event handler
# 3. Stream name derived from aggregate
stream_name = element_cls.meta_.stream_name or (
element_cls.meta_.part_of.meta_.stream_name
if element_cls.meta_.part_of
else None
)
method._target_cls.meta_.stream_name = (
method._target_cls.meta_.stream_name or stream_name
)

return element_cls
6 changes: 6 additions & 0 deletions src/protean/domain/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,12 @@ def _resolve_references(self):
)
cls.meta_.part_of = to_cls

# Also set the stream name if there is a `stream_name` option in `meta_`
# FIXME Could this task be pushed to the element itself, so each element
# can do stuff beyond `stream_name`?
if hasattr(cls.meta_, "stream_name") and not cls.meta_.stream_name:
cls.meta_.stream_name = to_cls.meta_.stream_name

# Remove from pending list now that the class has been resolved
del self._pending_class_resolutions[name]

Expand Down
2 changes: 2 additions & 0 deletions src/protean/utils/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ def __init_subclass__(subclass) -> None:
super().__init_subclass__()

# Associate a `_handlers` map with subclasses.
# `_handlers` is a dictionary mapping the event/command to handler methods.
#
# It needs to be initialized here because if it
# were initialized in __init__, the same collection object
# would be made available across all subclasses,
Expand Down
2 changes: 1 addition & 1 deletion tests/aggregate/test_aggregate_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def register_elements(test_domain):
test_domain.register(Account, part_of=User)
test_domain.register(UserActivated, part_of=User)
test_domain.register(UserRenamed, part_of=User)
test_domain.register(PasswordChanged, part_of=Account)
test_domain.register(PasswordChanged, part_of=User)


def test_that_aggregate_has_events_list():
Expand Down
31 changes: 31 additions & 0 deletions tests/event/test_event_part_of_resolution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import pytest

from protean import BaseEvent, BaseEventSourcedAggregate
from protean.fields import String
from protean.fields.basic import Identifier


class User(BaseEventSourcedAggregate):
id = Identifier(identifier=True)
email = String()
name = String()


class UserLoggedIn(BaseEvent):
user_id = Identifier(identifier=True)


@pytest.fixture(autouse=True)
def register_elements(test_domain):
test_domain.register(User)
test_domain.register(UserLoggedIn, part_of="User")


def test_event_does_not_have_stream_name_before_domain_init():
assert UserLoggedIn.meta_.stream_name is None


def test_event_has_stream_name_after_domain_init(test_domain):
test_domain.init(traverse=False)

assert UserLoggedIn.meta_.stream_name == "user"
16 changes: 6 additions & 10 deletions tests/event_handler/test_retrieving_handlers_by_event.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import pytest

from protean import BaseEvent, BaseEventHandler, BaseEventSourcedAggregate, handle
from protean.fields import DateTime, Identifier, String

Expand Down Expand Up @@ -69,7 +71,8 @@ def universal_handler(self, _: BaseEvent) -> None:
pass


def test_retrieving_handler_by_event(test_domain):
@pytest.fixture(autouse=True)
def register_elements(test_domain):
test_domain.register(User)
test_domain.register(Registered, part_of=User)
test_domain.register(Activated, part_of=User)
Expand All @@ -80,6 +83,8 @@ def test_retrieving_handler_by_event(test_domain):
test_domain.register(Sent, part_of=Email)
test_domain.register(EmailEventHandler, part_of=Email)


def test_retrieving_handler_by_event(test_domain):
assert test_domain.handlers_for(Registered()) == {UserEventHandler, UserMetrics}
assert test_domain.handlers_for(Sent()) == {EmailEventHandler}

Expand All @@ -90,16 +95,7 @@ def test_that_all_streams_handler_is_returned(test_domain):


def test_that_all_streams_handler_is_always_returned_with_other_handlers(test_domain):
test_domain.register(User)
test_domain.register(Registered, part_of=User)
test_domain.register(Activated, part_of=User)
test_domain.register(Renamed, part_of=User)
test_domain.register(AllEventsHandler, stream_name="$all")
test_domain.register(UserEventHandler, part_of=User)
test_domain.register(UserMetrics, part_of=User)
test_domain.register(Email)
test_domain.register(Sent, part_of=Email)
test_domain.register(EmailEventHandler, part_of=Email)

assert test_domain.handlers_for(Registered()) == {
UserEventHandler,
Expand Down
1 change: 1 addition & 0 deletions tests/subscription/test_message_handover_to_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ async def test_that_subscription_invokes_engine_handler_on_message(
mock_handle_message, test_domain
):
test_domain.register(User)
test_domain.register(Registered, part_of=User)
test_domain.register(UserEventHandler, part_of=User)

identifier = str(uuid4())
Expand Down

0 comments on commit 7486072

Please sign in to comment.