Skip to content

Commit

Permalink
Introduce id into Event metadata
Browse files Browse the repository at this point in the history
`id` serves as the primary identifier of an event. It's format is
<domain-name>.<event-class-name>.<event-version>.<aggregate-id>.<aggregate-version>

This is applicable for Event Sourced Aggregates, and may undergo slight modifications
for regular Aggregates.

The changes in tests are largely because events were being added to the event store
directly. We cannot add plain events anymore because their metadata is being
enriched in the Aggregate's `raise_` method.
  • Loading branch information
subhashb committed Jun 27, 2024
1 parent 6b49016 commit 774889d
Show file tree
Hide file tree
Showing 26 changed files with 469 additions and 328 deletions.
26 changes: 25 additions & 1 deletion src/protean/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
ValidationError,
)
from protean.fields import Auto, Field, FieldBase, ValueObject
from protean.globals import current_domain
from protean.reflection import id_field
from protean.utils import generate_identity

from .reflection import _FIELDS, _ID_FIELD_NAME, attributes, declared_fields, fields
Expand Down Expand Up @@ -366,7 +368,29 @@ def __init__(self, *args, **kwargs) -> None:
self._events = []

def raise_(self, event) -> None:
self._events.append(event)
"""Raise an event in the aggregate cluster.
The version of the aggregate is incremented with every event raised, which is true
in the case of Event Sourced Aggregates.
Event is immutable, so we clone a new event object from the event raised,
and add the enhanced metadata to it.
"""
self._version += 1

identifier = getattr(self, id_field(self).field_name)

event_with_metadata = event.__class__(
event.to_dict(),
_metadata={
"id": f"{current_domain.name}.{self.__class__.__name__}.{event._metadata.version}.{identifier}.{self._version}",
"timestamp": event._metadata.timestamp,
"version": event._metadata.version,
"sequence_id": self._version,
},
)

self._events.append(event_with_metadata)


class IdentityMixin:
Expand Down
16 changes: 14 additions & 2 deletions src/protean/core/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,30 @@
from protean.container import BaseContainer, OptionsMixin
from protean.core.value_object import BaseValueObject
from protean.exceptions import IncorrectUsageError, NotSupportedError
from protean.fields import DateTime, Field, String, ValueObject
from protean.fields import DateTime, Field, Integer, String, ValueObject
from protean.reflection import _ID_FIELD_NAME, declared_fields, fields
from protean.utils import DomainObjects, derive_element_class

logger = logging.getLogger(__name__)


class Metadata(BaseValueObject):
kind = String(default="EVENT")
# Unique identifier of the Event
# Format is <domain-name>.<event-class-name>.<event-version>.<aggregate-id>.<aggregate-version>
id = String()

# Time of event generation
timestamp = DateTime(default=lambda: datetime.now(timezone.utc))

# Version of the event
# Can be overridden with `__version__` class attr in Event class definition
version = String(default="v1")

# Sequence of the event in the aggregate
# This is the version of the aggregate *after* the time of event generation,
# so it will always be one more than the version in the event store.
sequence_id = Integer()


class BaseEvent(BaseContainer, OptionsMixin): # FIXME Remove OptionsMixin
"""Base Event class that all Events should inherit from.
Expand Down
5 changes: 5 additions & 0 deletions src/protean/core/event_sourced_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ def __init__(self, domain) -> None:
self._domain = domain

def add(self, aggregate: BaseEventSourcedAggregate) -> None:
if aggregate is None:
raise IncorrectUsageError(
{"_entity": ["Aggregate object to persist is invalid"]}
)

# `add` is typically invoked in handler methods in Command Handlers and Event Handlers, which are
# enclosed in a UoW automatically. Therefore, if there is a UoW in progress, we can assume
# that it is the active session. If not, we will start a new UoW and commit it after the operation
Expand Down
4 changes: 0 additions & 4 deletions src/protean/port/event_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,6 @@ def append_aggregate_event(
expected_version=message.expected_version,
)

# Increment aggregate's version as we process events
# to correctly handle expected version
aggregate._version += 1

return position

def append(self, object: Union[BaseEvent, BaseCommand]) -> int:
Expand Down
3 changes: 2 additions & 1 deletion src/protean/utils/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ def to_aggregate_event_message(
**cls.derived_metadata(MessageType.EVENT.value),
# schema_version=event.meta_.version, # FIXME Maintain version for event
),
expected_version=aggregate._version, # FIXME Maintain version for Aggregates
# Expect the previous version
expected_version=event._metadata.sequence_id - 1,
)

def to_object(self) -> Union[BaseEvent, BaseCommand]:
Expand Down
16 changes: 7 additions & 9 deletions tests/event/test_event_metadata.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime
from datetime import datetime, timedelta
from uuid import uuid4

import pytest
Expand Down Expand Up @@ -39,16 +39,14 @@ def test_event_has_metadata_value_object():
def test_metadata_defaults():
event = UserLoggedIn(user_id=str(uuid4()))
assert event._metadata is not None
assert event._metadata.kind == "EVENT"
assert isinstance(event._metadata.timestamp, datetime)


def test_metadata_can_be_overridden():
# Setting `kind` breaks the system elsewhere, but suffices for this test
event = UserLoggedIn(user_id=str(uuid4()), _metadata={"kind": "FOO"})
now_timestamp = datetime.now() - timedelta(hours=1)
event = UserLoggedIn(user_id=str(uuid4()), _metadata={"timestamp": now_timestamp})
assert event._metadata is not None
assert event._metadata.kind == "FOO"
assert isinstance(event._metadata.timestamp, datetime)
assert event._metadata.timestamp == now_timestamp


class TestEventMetadataVersion:
Expand Down Expand Up @@ -80,15 +78,15 @@ def test_event_metadata():
event = user._events[0]
assert event._metadata is not None

assert event._metadata.kind == "EVENT"
assert isinstance(event._metadata.timestamp, datetime)
# assert event._metadata.id == f"test.user.v1.{user.user_id}.1"
assert event._metadata.id == f"Test.User.v1.{user.id}.0"

assert event.to_dict() == {
"_metadata": {
"kind": "EVENT",
"id": f"Test.User.v1.{user.id}.0",
"timestamp": str(event._metadata.timestamp),
"version": "v1",
"sequence_id": 0,
},
"user_id": event.user_id,
}
37 changes: 37 additions & 0 deletions tests/event/test_event_metadata_id_and_sequence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from uuid import uuid4

import pytest

from protean import BaseEvent, BaseEventSourcedAggregate
from protean.fields import String
from protean.fields.basic import Identifier


class User(BaseEventSourcedAggregate):
id = Identifier(identifier=True)
email = String()
name = String()

def login(self):
self.raise_(UserLoggedIn(user_id=self.id))


class UserLoggedIn(BaseEvent):
user_id = Identifier(identifier=True)


@pytest.fixture(autouse=True)
def register_elements(test_domain):
test_domain.register(User)
test_domain.register(UserLoggedIn, part_of=User)
test_domain.init(traverse=False)


def test_event_is_generated_with_unique_id():
identifier = str(uuid4())
user = User(id=identifier, email="[email protected]", name="Foo Bar")
user.login()

event = user._events[0]
assert event._metadata.id == f"Test.User.v1.{identifier}.0"
assert event._metadata.sequence_id == 0
3 changes: 2 additions & 1 deletion tests/event/test_event_payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ def test_event_payload():

assert event.to_dict() == {
"_metadata": {
"kind": "EVENT",
"id": f"Test.User.v1.{user_id}.0",
"timestamp": str(event._metadata.timestamp),
"version": "v1",
"sequence_id": 0,
},
"user_id": event.user_id,
}
Expand Down
26 changes: 15 additions & 11 deletions tests/event/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,21 @@ class UserAdded(BaseEvent):
assert event.email == Email(address="[email protected]")
assert event.email_address == "[email protected]"

assert event.to_dict() == {
"_metadata": {
"kind": "EVENT",
"timestamp": str(event._metadata.timestamp),
"version": "v1",
},
"email": {
"address": "[email protected]",
},
"name": "John Doe",
}
assert (
event.to_dict()
== {
"_metadata": {
"id": None, # ID is none because the event is not being raised in the proper way (with `_raise`)
"timestamp": str(event._metadata.timestamp),
"version": "v1",
"sequence_id": None, # Sequence is unknown because event is not being raised as part of an aggregate
},
"email": {
"address": "[email protected]",
},
"name": "John Doe",
}
)

def test_that_domain_event_can_be_reconstructed_from_dict_enclosing_vo(self):
class Email(BaseValueObject):
Expand Down
26 changes: 26 additions & 0 deletions tests/event_sourced_repository/test_add.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import pytest

from protean import BaseEventSourcedAggregate
from protean.exceptions import IncorrectUsageError
from protean.fields import Identifier, String


class User(BaseEventSourcedAggregate):
id = Identifier(identifier=True)
email = String()
name = String()


@pytest.fixture(autouse=True)
def register_elements(test_domain):
test_domain.register(User)
test_domain.init(traverse=False)


def test_exception_on_empty_aggregate_object(test_domain):
with pytest.raises(IncorrectUsageError) as exception:
test_domain.repository_for(User).add(None)

assert exception.value.messages == {
"_entity": ["Aggregate object to persist is invalid"]
}
33 changes: 22 additions & 11 deletions tests/event_store/test_appending_aggregate_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,20 @@ class User(BaseEventSourcedAggregate):
name = String()
status = String(default="INACTIVE")

@classmethod
def register(cls, id, email, name):
user = User(id=id, email=email, name=name)
user.raise_(Registered(id=id, email=email, name=name))

return user

def activate(self):
self.raise_(Activated(id=self.id))

def rename(self, name):
self.name = name
self.raise_(Renamed(id=self.id, name=name))

@apply
def registered(self, _: Registered) -> None:
self.status = "INACTIVE"
Expand All @@ -55,9 +69,8 @@ def register_elements(test_domain):
@pytest.mark.eventstore
def test_appending_messages_to_aggregate(test_domain):
identifier = str(uuid4())
event = Registered(id=identifier, email="[email protected]")
user = User(id=identifier, email="[email protected]")
test_domain.event_store.store.append_aggregate_event(user, event)
user = User.register(id=identifier, email="[email protected]", name="John Doe")
test_domain.event_store.store.append_aggregate_event(user, user._events[0])

messages = test_domain.event_store.store._read("user")

Expand All @@ -67,22 +80,20 @@ def test_appending_messages_to_aggregate(test_domain):
@pytest.mark.eventstore
def test_version_increment_on_new_event(test_domain):
identifier = str(uuid4())
event1 = Registered(id=identifier, email="[email protected]")

user = User(**event1.payload)
test_domain.event_store.store.append_aggregate_event(user, event1)
user = User.register(id=identifier, email="[email protected]", name="John Doe")
test_domain.event_store.store.append_aggregate_event(user, user._events[0])

events = test_domain.event_store.store._read(f"user-{identifier}")
assert events[0]["position"] == 0

event2 = Activated(id=identifier)
test_domain.event_store.store.append_aggregate_event(user, event2)
user.activate()
test_domain.event_store.store.append_aggregate_event(user, user._events[1])

events = test_domain.event_store.store._read(f"user-{identifier}")
assert events[-1]["position"] == 1

event3 = Renamed(id=identifier, name="Jane Doe")
test_domain.event_store.store.append_aggregate_event(user, event3)
user.rename(name="John Doe 2")
test_domain.event_store.store.append_aggregate_event(user, user._events[2])

events = test_domain.event_store.store._read(f"user-{identifier}")
assert events[-1]["position"] == 2
Loading

0 comments on commit 774889d

Please sign in to comment.