Skip to content

Commit

Permalink
Support multi-level event sourced aggregates
Browse files Browse the repository at this point in the history
Event sourced aggregates so far were only limited to on aggregate, with no support
for underlying entities. This commit allows event sourced aggregates to behave
just like regular aggregates with multi-level heirarchies.
  • Loading branch information
subhashb committed Jul 19, 2024
1 parent eea02e5 commit 807cc4d
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 76 deletions.
74 changes: 0 additions & 74 deletions src/protean/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,16 @@

import copy
import inspect
import json
import logging
from collections import defaultdict
from typing import Any, Type, Union

from protean.exceptions import (
ConfigurationError,
InvalidDataError,
NotSupportedError,
ValidationError,
)
from protean.fields import Auto, Field, FieldBase, ValueObject
from protean.reflection import id_field
from protean.utils import generate_identity

from .reflection import (
Expand Down Expand Up @@ -366,77 +363,6 @@ def _default_options(cls):
return []


class EventedMixin:
def __init__(self, *args, **kwargs) -> None:
"""Initialize an instance-level variable named `_events` to track events
raised in the aggregate cluster.
This method cannot have a super invocation, because we don't want it to
invoke BaseContainer's `__init__` method. But there is a conflict regarding
this between BaseAggregate and BaseEventSourcedAggregate. So this Mixin's
functionality has been replicated temporarily in BaseAggregate class.
Other mixins that are inherited by BaseEntity and BaseEventSourcedAggregate
work with `__init_subclass__`, and do not have this issue.
"""
super().__init__(*args, **kwargs)
self._events = []

def raise_(self, event) -> None:
"""Raise an event in the aggregate cluster.
The version of the aggregate is incremented with every event raised, which is true
in the case of Event Sourced Aggregates.
Event is immutable, so we clone a new event object from the event raised,
and add the enhanced metadata to it.
"""
# Verify that event is indeed associated with this aggregate
if event.meta_.part_of != self.__class__:
raise ConfigurationError(
f"Event `{event.__class__.__name__}` is not associated with "
f"aggregate `{self.__class__.__name__}`"
)

if not self.meta_.fact_events:
self._version += 1

identifier = getattr(self, id_field(self).field_name)

# Set Fact Event stream to be `<aggregate_stream>-fact`
if event.__class__.__name__.endswith("FactEvent"):
stream = f"{self.meta_.stream_category}-fact-{identifier}"
else:
stream = f"{self.meta_.stream_category}-{identifier}"

event_with_metadata = event.__class__(
event.to_dict(),
_expected_version=self._event_position,
_metadata={
"id": (f"{stream}-{self._version}"),
"type": event._metadata.type,
"fqn": event._metadata.fqn,
"kind": event._metadata.kind,
"stream": stream,
"origin_stream": event._metadata.origin_stream,
"timestamp": event._metadata.timestamp,
"version": event._metadata.version,
"sequence_id": self._version,
"payload_hash": hash(
json.dumps(
event.payload,
sort_keys=True,
)
),
},
)

# Increment the event position after generating event
self._event_position = self._event_position + 1

self._events.append(event_with_metadata)


class IdentityMixin:
def __init_subclass__(subclass) -> None:
super().__init_subclass__()
Expand Down
15 changes: 13 additions & 2 deletions src/protean/fields/association.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,8 @@ def __set__(self, instance, value):
The `temp_cache` we set up here is eventually used by the `Repository` to determine
the changes to be persisted.
"""
if isinstance(value, dict):
value = self.to_cls(**value)

super().__set__(instance, value)

Expand Down Expand Up @@ -481,10 +483,19 @@ def __set__(self, instance, value):
"""This supports direct assignment of values to HasMany fields, like:
`order.items = [item1, item2, item3]`
"""
super().__set__(instance, value)
value = value if isinstance(value, list) else [value]

values = []
for item in value:
if isinstance(item, dict):
values.append(self.to_cls(**item))
else:
values.append(item)

super().__set__(instance, values)

if value is not None:
self.add(instance, value)
self.add(instance, values)

def add(self, instance, items) -> None:
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import pytest

from protean import BaseAggregate, BaseEntity, BaseEvent, BaseValueObject, apply
from protean.fields import (
HasMany,
HasOne,
Identifier,
Integer,
List,
String,
ValueObject,
)


class Department(BaseEntity):
name = String(max_length=50)
dean = HasOne("Dean")


class Dean(BaseEntity):
name = String(max_length=50)
age = Integer(min_value=21)
office = HasOne("Office")


class Office(BaseEntity):
building = String(max_length=25)
room = Integer(min_value=1)


class OfficeVO(BaseValueObject):
id = Identifier()
building = String(max_length=25)
room = Integer(min_value=1)


class DeanVO(BaseValueObject):
id = Identifier()
name = String(max_length=50)
age = Integer(min_value=21)
office = ValueObject(OfficeVO)


class DepartmentVO(BaseValueObject):
id = Identifier()
name = String(max_length=50)
dean = ValueObject(DeanVO)


class UniversityCreated(BaseEvent):
id = Identifier(identifier=True)
_version = Integer()
name = String(max_length=50)
departments = List(content_type=ValueObject(DepartmentVO))


class NameChanged(BaseEvent):
id = Identifier(identifier=True)
name = String(max_length=50)


class University(BaseAggregate):
name = String(max_length=50)
departments = HasMany(Department)

def raise_event(self):
self.raise_(UniversityCreated(**self.to_dict()))

def change_name(self, name):
self.name = name
self.raise_(NameChanged(id=self.id, name=name))

@apply
def on_university_created(self, event: UniversityCreated):
# We are not doing anything here, because Protean applies
# the first event automatically, with from_events
pass

@apply
def on_name_changed(self, event: NameChanged):
self.name = event.name


@pytest.fixture(autouse=True)
def register_elements(test_domain):
test_domain.register(University, is_event_sourced=True)
test_domain.register(Department, part_of=University)
test_domain.register(Dean, part_of=Department)
test_domain.register(Office, part_of=Dean)
test_domain.register(UniversityCreated, part_of=University)
test_domain.register(NameChanged, part_of=University)
test_domain.init(traverse=False)


@pytest.fixture
def university(test_domain):
university = University(
name="MIT",
departments=[
Department(
name="Computer Science",
dean=Dean(
name="John Doe", age=45, office=Office(building="NE43", room=123)
),
),
Department(
name="Electrical Engineering",
dean=Dean(
name="Jane Smith", age=50, office=Office(building="NE43", room=124)
),
),
],
)
university.raise_event()
test_domain.repository_for(University).add(university)

return university


def test_aggregate_persistence(test_domain, university):
refreshed_university = test_domain.repository_for(University).get(university.id)

assert refreshed_university.id == university.id
assert refreshed_university.name == university.name
assert len(refreshed_university.departments) == 2
assert refreshed_university.departments[0].name == university.departments[0].name
assert refreshed_university.departments[1].name == university.departments[1].name
assert (
refreshed_university.departments[0].dean.name
== university.departments[0].dean.name
)
assert (
refreshed_university.departments[1].dean.name
== university.departments[1].dean.name
)
assert (
refreshed_university.departments[0].dean.office.building
== university.departments[0].dean.office.building
)
assert (
refreshed_university.departments[1].dean.office.building
== university.departments[1].dean.office.building
)
assert (
refreshed_university.departments[0].dean.office.room
== university.departments[0].dean.office.room
)
assert (
refreshed_university.departments[1].dean.office.room
== university.departments[1].dean.office.room
)


def test_aggregate_persistence_after_update(test_domain, university):
refreshed_university = test_domain.repository_for(University).get(university.id)

refreshed_university.change_name("Harvard")
test_domain.repository_for(University).add(refreshed_university)

refreshed_university = test_domain.repository_for(University).get(
refreshed_university.id
)

assert refreshed_university.name == "Harvard"

0 comments on commit 807cc4d

Please sign in to comment.