Skip to content

Commit

Permalink
Register raised events only on Aggregates
Browse files Browse the repository at this point in the history
This commit allows Entities to raise events. Events are always raised
on the root aggregate, irrespective of which entity actually raised them.
  • Loading branch information
subhashb committed Jun 10, 2024
1 parent 5f0be5c commit 3dbcf66
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 1,026 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ todos
# Temporary Python files
tester*.py
tester-*
*.patch

# Temporary Docs folder
docs-old/*
1,015 changes: 0 additions & 1,015 deletions mega_change.patch

This file was deleted.

13 changes: 12 additions & 1 deletion src/protean/adapters/event_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
from typing import List, Optional, Type

from protean import BaseEvent, BaseEventHandler
from protean.core.aggregate import BaseAggregate
from protean.core.command import BaseCommand
from protean.core.command_handler import BaseCommandHandler
from protean.core.event_sourced_aggregate import BaseEventSourcedAggregate
from protean.core.event_sourced_repository import (
BaseEventSourcedRepository,
event_sourced_repository_factory,
Expand Down Expand Up @@ -103,7 +105,16 @@ def handlers_for(self, event: BaseEvent) -> List[BaseEventHandler]:

all_stream_handlers = self._event_streams.get("$all", set())

stream_name = event.meta_.stream_name or event.meta_.part_of.meta_.stream_name
# Recursively follow `part_of` trail until BaseAggregate and derive its stream_name
part_of = event.meta_.part_of
aggregate_stream_name = None
if part_of:
while not issubclass(part_of, (BaseAggregate, BaseEventSourcedAggregate)):
part_of = part_of.meta_.part_of

aggregate_stream_name = part_of.meta_.stream_name

stream_name = event.meta_.stream_name or aggregate_stream_name
stream_handlers = self._event_streams.get(stream_name, set())

return set.union(stream_handlers, all_stream_handlers)
Expand Down
12 changes: 11 additions & 1 deletion src/protean/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,18 @@ def _default_options(cls):

class EventedMixin:
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
"""Initialize an instance-level variable named `_events` to track events
raised in the aggregate cluster.
This method cannot have a super invocation, because we don't want it to
invoke BaseContainer's `__init__` method. But there is a conflict regarding
this between BaseAggregate and BaseEventSourcedAggregate. So this Mixin's
functionality has been replicated temporarily in BaseAggregate class.
Other mixins that are inherited by BaseEntity and BaseEventSourcedAggregate
work with `__init_subclass__`, and do not have this issue.
"""
super().__init__(*args, **kwargs)
self._events = []

def raise_(self, event) -> None:
Expand Down
3 changes: 1 addition & 2 deletions src/protean/core/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import inspect
import logging

from protean.container import EventedMixin
from protean.core.entity import BaseEntity
from protean.exceptions import NotSupportedError
from protean.fields import Integer
Expand All @@ -12,7 +11,7 @@
logger = logging.getLogger(__name__)


class BaseAggregate(EventedMixin, BaseEntity):
class BaseAggregate(BaseEntity):
"""This is the base class for Domain Aggregates.
Aggregates are fundamental, coarse-grained building blocks of a domain model. They are
Expand Down
10 changes: 10 additions & 0 deletions src/protean/core/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ def __init__(self, *template, **kwargs): # noqa: C901
# To control invariant checks
self._disable_invariant_checks = False

# Placeholder for temporary storage of raised events
self._events = []

# Collect Reference field attribute names to prevent accidental overwriting
# of shadow fields.
reference_attributes = {
Expand Down Expand Up @@ -390,6 +393,13 @@ def _postcheck(self, return_errors=False):
"""Invariant checks performed after initialization and attribute changes"""
return self._run_invariants("post", return_errors=return_errors)

def raise_(self, event) -> None:
"""Raise an event in the aggregate cluster.
The event is always registered on the aggregate, irrespective of where
it is raised in the entity cluster."""
self._root._events.append(event)

def __eq__(self, other):
"""Equivalence check to be based only on Identity"""

Expand Down
26 changes: 21 additions & 5 deletions src/protean/utils/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from protean import fields
from protean.container import BaseContainer, OptionsMixin
from protean.core.command import BaseCommand
from protean.core.aggregate import BaseAggregate
from protean.core.event import BaseEvent
from protean.core.event_sourced_aggregate import BaseEventSourcedAggregate
from protean.core.unit_of_work import UnitOfWork
Expand Down Expand Up @@ -134,8 +135,17 @@ def to_aggregate_event_message(
) -> Message:
identifier = getattr(aggregate, id_field(aggregate).field_name)

# Recursively follow `part_of` trail until BaseAggregate and derive its stream_name
part_of = event.meta_.part_of
aggregate_stream_name = None
if part_of:
while not issubclass(part_of, (BaseAggregate, BaseEventSourcedAggregate)):
part_of = part_of.meta_.part_of

aggregate_stream_name = part_of.meta_.stream_name

# Use explicit stream name if provided, or fallback on Aggregate's stream name
stream_name = event.meta_.stream_name or event.meta_.part_of.meta_.stream_name
stream_name = event.meta_.stream_name or aggregate_stream_name

return cls(
stream_name=f"{stream_name}-{identifier}",
Expand Down Expand Up @@ -172,11 +182,17 @@ def to_message(cls, message_object: Union[BaseEvent, BaseCommand]) -> Message:
else:
identifier = str(uuid4())

# Recursively follow `part_of` trail until BaseAggregate and derive its stream_name
part_of = message_object.meta_.part_of
aggregate_stream_name = None
if part_of:
while not issubclass(part_of, (BaseAggregate, BaseEventSourcedAggregate)):
part_of = part_of.meta_.part_of

aggregate_stream_name = part_of.meta_.stream_name

# Use explicit stream name if provided, or fallback on Aggregate's stream name
stream_name = (
message_object.meta_.stream_name
or message_object.meta_.part_of.meta_.stream_name
)
stream_name = message_object.meta_.stream_name or aggregate_stream_name

if isinstance(message_object, BaseEvent):
stream_name = f"{stream_name}-{identifier}"
Expand Down
44 changes: 42 additions & 2 deletions tests/aggregate/test_aggregate_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import pytest

from protean import BaseAggregate, BaseEvent
from protean import BaseAggregate, BaseEntity, BaseEvent
from protean.core.unit_of_work import UnitOfWork
from protean.fields import Identifier, String
from protean.fields import Identifier, String, HasOne
from protean.globals import current_domain


Expand All @@ -13,11 +13,26 @@ class UserStatus(Enum):
ARCHIVED = "ARCHIVED"


class Account(BaseEntity):
password_hash = String(max_length=512)

def change_password(self, password):
self.password_hash = password
self.raise_(PasswordChanged(account_id=self.id, user_id=self.user_id))


class PasswordChanged(BaseEvent):
account_id = Identifier(required=True)
user_id = Identifier(required=True)


class User(BaseAggregate):
name = String(max_length=50, required=True)
email = String(required=True)
status = String(choices=UserStatus)

account = HasOne(Account)

def activate(self):
self.raise_(UserActivated(user_id=self.id))

Expand All @@ -37,8 +52,10 @@ class UserRenamed(BaseEvent):
@pytest.fixture(autouse=True)
def register_elements(test_domain):
test_domain.register(User)
test_domain.register(Account, part_of=User)
test_domain.register(UserActivated, part_of=User)
test_domain.register(UserRenamed, part_of=User)
test_domain.register(PasswordChanged, part_of=Account)


def test_that_aggregate_has_events_list():
Expand Down Expand Up @@ -76,3 +93,26 @@ def test_that_events_are_empty_after_uow():
user_repo.add(user)

assert len(user._events) == 0


@pytest.mark.eventstore
def test_events_can_be_raised_by_entities():
user = User(
name="John Doe",
email="[email protected]",
account=Account(password_hash="password"),
)

user.account.change_password("new_password")

assert len(user._events) == 1
# Events are still stored at the aggregate level
assert len(user.account._events) == 0
assert isinstance(user._events[0], PasswordChanged)

with UnitOfWork():
user_repo = current_domain.repository_for(User)
user_repo.add(user)

assert len(user._events) == 0
assert len(user.account._events) == 0
8 changes: 8 additions & 0 deletions tests/unit_of_work/test_storing_events_on_commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ def register_user(self, command: Register) -> None:
User.register(command)


@pytest.fixture(autouse=True)
def register_elements(test_domain):
test_domain.register(User)
test_domain.register(Registered, part_of=User)
test_domain.register(Register, part_of=User)
test_domain.register(UserCommandHandler, part_of=User)


@pytest.mark.eventstore
def test_persisting_events_on_commit(test_domain):
identifier = str(uuid4())
Expand Down

0 comments on commit 3dbcf66

Please sign in to comment.