Skip to content

Commit

Permalink
Eventing related changes:
Browse files Browse the repository at this point in the history
- Introduce `Metadata` Valueobject within events to store event metadata,
instead of relying on metadata in Messages
- Introduce `payload` property in Event to retrieve event payload / aggregate
data. This is necessary because there wiil be three major parts to each event
eventually - data (payload), metadata of event, and headers
- Introduce a `from_events` factory method to encapsulate the aspect of
initializing an aggregate from the first event, applying the event on it,
and subsequently applying the rest of the events. This keeps the logic of
aggregate initialization in a single place.
- Pass `Event` object to `_apply` method in Event Sourced Aggregates, instead
of a `Dict`. We were passing a Message payload, contents of which is not necessary
for aggregate data attributes.
- Clean up snapshot fetching and aggregate initialization because of the above
changes
  • Loading branch information
subhashb committed Jun 26, 2024
1 parent a056cd0 commit 3b30259
Show file tree
Hide file tree
Showing 20 changed files with 321 additions and 75 deletions.
2 changes: 0 additions & 2 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -212,5 +212,3 @@ nav:
- Community:
- community/index.md
- community/contributing.md
extra_css:
- stylesheets/extra.css
30 changes: 28 additions & 2 deletions src/protean/core/event.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
import logging
from datetime import datetime, timezone

from protean.container import BaseContainer, OptionsMixin
from protean.core.value_object import BaseValueObject
from protean.exceptions import IncorrectUsageError, NotSupportedError
from protean.fields import Field
from protean.reflection import _ID_FIELD_NAME, declared_fields
from protean.fields import DateTime, Field, String, ValueObject
from protean.reflection import _ID_FIELD_NAME, declared_fields, fields
from protean.utils import DomainObjects, derive_element_class

logger = logging.getLogger(__name__)


class Metadata(BaseValueObject):
kind = String(default="EVENT")
timestamp = DateTime(default=lambda: datetime.now(timezone.utc))


class BaseEvent(BaseContainer, OptionsMixin): # FIXME Remove OptionsMixin
"""Base Event class that all Events should inherit from.
Expand All @@ -23,6 +30,9 @@ def __new__(cls, *args, **kwargs):
raise NotSupportedError("BaseEvent cannot be instantiated")
return super().__new__(cls)

# Track Metadata
_metadata = ValueObject(Metadata, default=lambda: Metadata())

def __init_subclass__(subclass) -> None:
super().__init_subclass__()

Expand Down Expand Up @@ -80,6 +90,22 @@ def __track_id_field(subclass):
# No Identity fields declared
pass

@property
def payload(self):
"""Return the payload of the event."""
return {
field_name: field_obj.as_dict(getattr(self, field_name, None))
for field_name, field_obj in fields(self).items()
if field_name not in {"_metadata"}
}

def __eq__(self, other) -> bool:
"""Equivalence check based only on data."""
if type(other) is not type(self):
return False

Check warning on line 105 in src/protean/core/event.py

View check run for this annotation

Codecov / codecov/patch

src/protean/core/event.py#L105

Added line #L105 was not covered by tests

return self.payload == other.payload


def domain_event_factory(element_cls, **kwargs):
element_cls = derive_element_class(element_cls, BaseEvent, **kwargs)
Expand Down
30 changes: 24 additions & 6 deletions src/protean/core/event_sourced_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import typing
from collections import defaultdict
from typing import Dict
from typing import List

from protean.container import BaseContainer, EventedMixin, IdentityMixin, OptionsMixin
from protean.core.event import BaseEvent
Expand Down Expand Up @@ -84,20 +84,38 @@ def __hash__(self):
# FIXME Add Object Class Type to hash
return hash(getattr(self, id_field(self).field_name))

def _apply(self, event_dict: Dict) -> None:
def _apply(self, event: BaseEvent) -> None:
"""Apply the event onto the aggregate by calling the appropriate projection.
Args:
event (BaseEvent): Event object to apply
"""
# FIXME Handle case of missing projection
for fn in self._projections[event_dict["type"]]:
# Reconstruct Event object
event_cls = self._events_cls_map[event_dict["type"]]
event = event_cls(**event_dict["data"])
event_name = fully_qualified_name(event.__class__)

# FIXME Handle case of missing projection method
if event_name not in self._projections:
raise NotImplementedError(

Check warning on line 98 in src/protean/core/event_sourced_aggregate.py

View check run for this annotation

Codecov / codecov/patch

src/protean/core/event_sourced_aggregate.py#L98

Added line #L98 was not covered by tests
f"No handler registered for event {event_name} in {self.__class__.__name__}"
)

for fn in self._projections[event_name]:
# Call event handler method
fn(self, event)
self._version += 1

@classmethod
def from_events(cls, events: List[BaseEvent]) -> "BaseEventSourcedAggregate":
"""Reconstruct an aggregate from a list of events."""
# Initialize the aggregate with the first event's payload and apply it
aggregate = cls(**events[0].payload)
aggregate._apply(events[0])

# Apply the rest of the events
for event in events[1:]:
aggregate._apply(event)

return aggregate


def apply(fn):
Expand Down
61 changes: 37 additions & 24 deletions src/protean/port/event_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,50 +136,63 @@ def load_aggregate(
Optional[BaseEventSourcedAggregate]: Return fully-formed aggregate when events exist,
or None.
"""
snapshot = self._read_last_message(
snapshot_message = self._read_last_message(
f"{part_of.meta_.stream_name}:snapshot-{identifier}"
)

if snapshot:
aggregate = part_of(**snapshot["data"])
position_in_snapshot = snapshot["data"]["_version"]
if snapshot_message:
# We have a snapshot, so initialize aggregate from snapshot
# and apply subsequent events
aggregate = part_of(**snapshot_message["data"])
position_in_snapshot = aggregate._version

events = deque(
event_stream = deque(
self._read(
f"{part_of.meta_.stream_name}-{identifier}",
position=position_in_snapshot + 1,
position=aggregate._version + 1,
)
)

events = []
for event_message in event_stream:
event = Message.from_dict(event_message).to_object()
aggregate._apply(event)
else:
events = deque(self._read(f"{part_of.meta_.stream_name}-{identifier}"))
# No snapshot, so initialize aggregate from events
event_stream = deque(
self._read(f"{part_of.meta_.stream_name}-{identifier}")
)

if not events:
if not event_stream:
return None

# Handle first event separately to initialize the aggregate
first_event = events.popleft()
aggregate = part_of(**first_event["data"])

# Also apply the first event in case a method has been specified
aggregate._apply(first_event)
aggregate._version += 1
events = []
for event_message in event_stream:
events.append(Message.from_dict(event_message).to_object())

# Apply all other events one-by-one
for event in events:
aggregate._apply(event)
aggregate._version += 1
aggregate = part_of.from_events(events)

####################################
# ADD SNAPSHOT IF BEYOND THRESHOLD #
####################################
# FIXME Delay creating snapshot or push to a background process
# If there are more events than SNAPSHOT_THRESHOLD, create a new snapshot
if (
snapshot
and len(events) > 1
snapshot_message
and len(event_stream) > 1
and (
events[-1]["position"] - position_in_snapshot
event_stream[-1]["position"] - position_in_snapshot
>= self.domain.config["SNAPSHOT_THRESHOLD"]
)
) or (
not snapshot and len(events) + 1 >= self.domain.config["SNAPSHOT_THRESHOLD"]
): # Account for the first event that was popped
not snapshot_message
and len(event_stream) >= self.domain.config["SNAPSHOT_THRESHOLD"]
):
# Snapshot is of type "SNAPSHOT" and contains only the aggregate's data
# (no metadata, so no event type)
# This makes reconstruction of the aggregate from the snapshot easier,
# and also avoids spurious data just to satisfy Metadata's structure
# and conditions.
self._write(
f"{part_of.meta_.stream_name}:snapshot-{identifier}",
"SNAPSHOT",
Expand Down
52 changes: 52 additions & 0 deletions tests/event/test_event_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from datetime import datetime
from uuid import uuid4

import pytest

from protean import BaseEvent, BaseEventSourcedAggregate
from protean.fields import String
from protean.fields.basic import Identifier


class User(BaseEventSourcedAggregate):
id = Identifier(identifier=True)
email = String()
name = String()

def login(self):
self.raise_(UserLoggedIn(user_id=self.id))


class UserLoggedIn(BaseEvent):
user_id = Identifier(identifier=True)


@pytest.fixture(autouse=True)
def register_elements(test_domain):
test_domain.register(User)
test_domain.register(UserLoggedIn, part_of=User)
test_domain.init(traverse=False)


def test_event_metadata(test_domain):
user_id = str(uuid4())
user = User(id=user_id, email="<EMAIL>", name="<NAME>")

user.login()

assert len(user._events) == 1

event = user._events[0]
assert event._metadata is not None

assert event._metadata.kind == "EVENT"
assert isinstance(event._metadata.timestamp, datetime)
# assert event._metadata.id == f"test.user.v1.{user.user_id}.1"

assert event.to_dict() == {
"_metadata": {
"kind": "EVENT",
"timestamp": str(event._metadata.timestamp),
},
"user_id": event.user_id,
}
47 changes: 47 additions & 0 deletions tests/event/test_event_payload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from uuid import uuid4

import pytest

from protean import BaseEvent, BaseEventSourcedAggregate
from protean.fields import String
from protean.fields.basic import Identifier


class User(BaseEventSourcedAggregate):
id = Identifier(identifier=True)
email = String()
name = String()

def login(self):
self.raise_(UserLoggedIn(user_id=self.id))


class UserLoggedIn(BaseEvent):
user_id = Identifier(identifier=True)


@pytest.fixture(autouse=True)
def register_elements(test_domain):
test_domain.register(User)
test_domain.register(UserLoggedIn, part_of=User)
test_domain.init(traverse=False)


def test_event_payload():
user_id = str(uuid4())
user = User(id=user_id, email="<EMAIL>", name="<NAME>")

user.login()
event = user._events[0]

assert event.to_dict() == {
"_metadata": {
"kind": "EVENT",
"timestamp": str(event._metadata.timestamp),
},
"user_id": event.user_id,
}

assert event.payload == {
"user_id": event.user_id,
}
4 changes: 4 additions & 0 deletions tests/event/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ class UserAdded(BaseEvent):
assert event.email_address == "[email protected]"

assert event.to_dict() == {
"_metadata": {
"kind": "EVENT",
"timestamp": str(event._metadata.timestamp),
},
"email": {
"address": "[email protected]",
},
Expand Down
12 changes: 4 additions & 8 deletions tests/event_sourced_aggregates/test_applying_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from protean import BaseCommand, BaseEvent, BaseEventSourcedAggregate, apply
from protean.exceptions import IncorrectUsageError
from protean.fields import Identifier, String
from protean.utils.mixins import Message


class UserStatus(Enum):
Expand Down Expand Up @@ -76,18 +75,15 @@ def test_applying_events():
activated = UserActivated(user_id=identifier)
renamed = UserRenamed(user_id=identifier, name="Jane Doe")

user = User.register(**registered.to_dict())
user = User.register(**registered.payload)

msg_registered = Message.to_aggregate_event_message(user, registered)
user._apply(msg_registered.to_dict())
user._apply(registered)
assert user.status == UserStatus.INACTIVE.value

msg_activated = Message.to_aggregate_event_message(user, activated)
user._apply(msg_activated.to_dict())
user._apply(activated)
assert user.status == UserStatus.ACTIVE.value

msg_renamed = Message.to_aggregate_event_message(user, renamed)
user._apply(msg_renamed.to_dict())
user._apply(renamed)
assert user.name == "Jane Doe"


Expand Down
Loading

0 comments on commit 3b30259

Please sign in to comment.