Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assimilate Event Sourcing functionality into regular aggregates #447

Merged
merged 4 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/protean/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
__version__ = "0.12.1"

from .core.aggregate import BaseAggregate, atomic_change
from .core.aggregate import BaseAggregate, apply, atomic_change
from .core.application_service import BaseApplicationService
from .core.command import BaseCommand
from .core.command_handler import BaseCommandHandler
Expand All @@ -9,7 +9,6 @@
from .core.entity import BaseEntity, invariant
from .core.event import BaseEvent
from .core.event_handler import BaseEventHandler
from .core.event_sourced_aggregate import BaseEventSourcedAggregate, apply
from .core.model import BaseModel
from .core.queryset import Q, QuerySet
from .core.repository import BaseRepository
Expand All @@ -33,7 +32,6 @@
"BaseEntity",
"BaseEvent",
"BaseEventHandler",
"BaseEventSourcedAggregate",
"BaseModel",
"BaseRepository",
"BaseSerializer",
Expand Down
74 changes: 0 additions & 74 deletions src/protean/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,16 @@

import copy
import inspect
import json
import logging
from collections import defaultdict
from typing import Any, Type, Union

from protean.exceptions import (
ConfigurationError,
InvalidDataError,
NotSupportedError,
ValidationError,
)
from protean.fields import Auto, Field, FieldBase, ValueObject
from protean.reflection import id_field
from protean.utils import generate_identity

from .reflection import (
Expand Down Expand Up @@ -366,77 +363,6 @@ def _default_options(cls):
return []


class EventedMixin:
def __init__(self, *args, **kwargs) -> None:
"""Initialize an instance-level variable named `_events` to track events
raised in the aggregate cluster.

This method cannot have a super invocation, because we don't want it to
invoke BaseContainer's `__init__` method. But there is a conflict regarding
this between BaseAggregate and BaseEventSourcedAggregate. So this Mixin's
functionality has been replicated temporarily in BaseAggregate class.

Other mixins that are inherited by BaseEntity and BaseEventSourcedAggregate
work with `__init_subclass__`, and do not have this issue.
"""
super().__init__(*args, **kwargs)
self._events = []

def raise_(self, event, fact_event=False) -> None:
"""Raise an event in the aggregate cluster.

The version of the aggregate is incremented with every event raised, which is true
in the case of Event Sourced Aggregates.

Event is immutable, so we clone a new event object from the event raised,
and add the enhanced metadata to it.
"""
# Verify that event is indeed associated with this aggregate
if event.meta_.part_of != self.__class__:
raise ConfigurationError(
f"Event `{event.__class__.__name__}` is not associated with "
f"aggregate `{self.__class__.__name__}`"
)

if not fact_event:
self._version += 1

identifier = getattr(self, id_field(self).field_name)

# Set Fact Event stream to be `<aggregate_stream>-fact`
if event.__class__.__name__.endswith("FactEvent"):
stream = f"{self.meta_.stream_category}-fact-{identifier}"
else:
stream = f"{self.meta_.stream_category}-{identifier}"

event_with_metadata = event.__class__(
event.to_dict(),
_expected_version=self._event_position,
_metadata={
"id": (f"{stream}-{self._version}"),
"type": event._metadata.type,
"fqn": event._metadata.fqn,
"kind": event._metadata.kind,
"stream": stream,
"origin_stream": event._metadata.origin_stream,
"timestamp": event._metadata.timestamp,
"version": event._metadata.version,
"sequence_id": self._version,
"payload_hash": hash(
json.dumps(
event.payload,
sort_keys=True,
)
),
},
)

# Increment the event position after generating event
self._event_position = self._event_position + 1

self._events.append(event_with_metadata)


class IdentityMixin:
def __init_subclass__(subclass) -> None:
super().__init_subclass__()
Expand Down
126 changes: 119 additions & 7 deletions src/protean/core/aggregate.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
"""Aggregate Functionality and Classes"""

import functools
import inspect
import logging
import typing
from collections import defaultdict
from typing import List

from protean.core.entity import BaseEntity
from protean.core.event import BaseEvent
from protean.core.value_object import BaseValueObject
from protean.exceptions import NotSupportedError
from protean.fields import HasMany, HasOne, Integer, List, Reference, ValueObject
from protean.exceptions import IncorrectUsageError, NotSupportedError
from protean.fields import HasMany, HasOne, Integer, Reference, ValueObject
from protean.fields import List as ProteanList
from protean.reflection import fields
from protean.utils import DomainObjects, derive_element_class, inflection
from protean.utils import DomainObjects, derive_element_class, fqn, inflection

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -56,6 +61,23 @@ def __new__(cls, *args, **kwargs):
# a single aggregate update could have triggered multiple events.
_event_position = -1

def __init_subclass__(subclass) -> None:
super().__init_subclass__()

# Event-Sourcing Functionality
#
# Associate a `_projections` map with subclasses.
# It needs to be initialized here because if it
# were initialized in __init__, the same collection object
# would be made available across all subclasses,
# defeating its purpose.
setattr(subclass, "_projections", defaultdict(set))

# Event-Sourcing Functionality
#
# Store associated events
setattr(subclass, "_events_cls_map", {})

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

Expand All @@ -73,12 +95,51 @@ def _default_options(cls):
("aggregate_cluster", None),
("auto_add_id_field", True),
("fact_events", False),
("is_event_sourced", False),
("model", None),
("provider", "default"),
("schema_name", inflection.underscore(cls.__name__)),
("stream_category", inflection.underscore(cls.__name__)),
]

def _apply(self, event: BaseEvent) -> None:
"""Event-Sourcing Functionality

Apply the event onto the aggregate by calling the appropriate projection.

Args:
event (BaseEvent): Event object to apply
"""
# FIXME Handle case of missing projection
event_name = fqn(event.__class__)

# FIXME Handle case of missing projection method
if event_name not in self._projections:
raise NotImplementedError(
f"No handler registered for event `{event_name}` in `{self.__class__.__name__}`"
)

for fn in self._projections[event_name]:
# Call event handler method
fn(self, event)
self._version += 1

@classmethod
def from_events(cls, events: List[BaseEvent]) -> "BaseAggregate":
"""Event-Sourcing Functionality

Reconstruct an aggregate from a list of events.
"""
# Initialize the aggregate with the first event's payload and apply it
aggregate = cls(**events[0].payload)
aggregate._apply(events[0])

# Apply the rest of the events
for event in events[1:]:
aggregate._apply(event)

return aggregate


def element_to_fact_event(element_cls):
"""Convert an Element to a Fact Event.
Expand All @@ -94,9 +155,7 @@ def element_to_fact_event(element_cls):
The target class of associations is constructed as the Value Object.
"""
# Gather all fields defined in the element, except References.
# We ignore references in event payloads. We also ignore
# the `_next_version` field because it is a temporary in-flight
# field used to track the next version of the aggregate.
# We ignore references in event payloads.
attrs = {
key: value
for key, value in fields(element_cls).items()
Expand All @@ -108,7 +167,7 @@ def element_to_fact_event(element_cls):
if isinstance(value, HasOne):
attrs[key] = element_to_fact_event(value.to_cls)
elif isinstance(value, HasMany):
attrs[key] = List(content_type=element_to_fact_event(value.to_cls))
attrs[key] = ProteanList(content_type=element_to_fact_event(value.to_cls))

# If we are dealing with an Entity, we convert it to a Value Object
# and return it.
Expand Down Expand Up @@ -160,6 +219,16 @@ def aggregate_factory(element_cls, domain, **opts):
f"{domain.normalized_name}::{element_cls.meta_.stream_category}"
)

# Event-Sourcing Functionality
# Iterate through methods marked as `@apply` and construct a projections map
methods = inspect.getmembers(element_cls, predicate=inspect.isroutine)
for method_name, method in methods:
if not (
method_name.startswith("__") and method_name.endswith("__")
) and hasattr(method, "_event_cls"):
element_cls._projections[fqn(method._event_cls)].add(method)
element_cls._events_cls_map[fqn(method._event_cls)] = method._event_cls

return element_cls


Expand All @@ -178,3 +247,46 @@ def __exit__(self, *args):
# Validate on exit to trigger invariant checks
self.aggregate._disable_invariant_checks = False
self.aggregate._postcheck()


def apply(fn):
"""Event-Sourcing Functionality

Decorator to mark methods in EventHandler classes.
"""

if len(typing.get_type_hints(fn)) > 2:
raise IncorrectUsageError(
{
"_entity": [
f"Handler method `{fn.__name__}` has incorrect number of arguments"
]
}
)

try:
_event_cls = next(
iter(
{
value
for value in typing.get_type_hints(fn).values()
if inspect.isclass(value) and issubclass(value, BaseEvent)
}
)
)
except StopIteration:
raise IncorrectUsageError(
{
"_entity": [
f"Apply method `{fn.__name__}` should accept an argument annotated with the Event class"
]
}
)

@functools.wraps(fn)
def wrapper(*args):
fn(*args)

setattr(wrapper, "_event_cls", _event_cls)

return wrapper
50 changes: 33 additions & 17 deletions src/protean/core/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,21 +440,6 @@ def raise_(self, event) -> None:
f" aggregate `{self._root.__class__.__name__}`"
)

# Events are sometimes raised from within the aggregate, well-before persistence.
# In that case, the aggregate's next version has to be considered in events,
# because we want to associate the event with the version that will be persisted.
#
# Other times, an event is generated after persistence, like in the case of
# fact events. In this case, the aggregate's current version and next version
# will be the same.
#
# So we simply take the latest version, among `_version` and `_next_version`.
aggregate_version = max(self._root._version, self._root._next_version)

# This is just a counter to uniquely gather all events generated
# in the same edit session
event_number = len(self._root._events) + 1

identifier = getattr(self._root, id_field(self._root).field_name)

# Set Fact Event stream to be `<aggregate_stream_name>-fact`
Expand All @@ -463,19 +448,50 @@ def raise_(self, event) -> None:
else:
stream = f"{self._root.meta_.stream_category}-{identifier}"

if self._root.meta_.is_event_sourced:
# The version of the aggregate is incremented with every event raised, which is true
# in the case of Event Sourced Aggregates.
#
# Except for Fact Events. Fact Events are raised after the aggregate has been persisted,
if not event.__class__.__name__.endswith("FactEvent"):
self._version += 1

event_identity = f"{stream}-{self._version}"
sequence_id = f"{self._version}"
else:
# Events are sometimes raised from within the aggregate, well-before persistence.
# In that case, the aggregate's next version has to be considered in events,
# because we want to associate the event with the version that will be persisted.
#
# Other times, an event is generated after persistence, like in the case of
# fact events. In this case, the aggregate's current version and next version
# will be the same.
#
# So we simply take the latest version, among `_version` and `_next_version`.
aggregate_version = max(self._root._version, self._root._next_version)

# This is just a counter to uniquely gather all events generated
# in the same edit session
event_number = len(self._root._events) + 1

event_identity = f"{stream}-{aggregate_version}.{event_number}"
sequence_id = f"{aggregate_version}.{event_number}"

# Event is immutable, so we clone a new event object from the event raised,
# and add the enhanced metadata to it.
event_with_metadata = event.__class__(
event.to_dict(),
_expected_version=self._root._event_position,
_metadata={
"id": (f"{stream}-{aggregate_version}.{event_number}"),
"id": event_identity,
"type": event._metadata.type,
"fqn": event._metadata.fqn,
"kind": event._metadata.kind,
"stream": stream,
"origin_stream": event._metadata.origin_stream,
"timestamp": event._metadata.timestamp,
"version": event._metadata.version,
"sequence_id": f"{aggregate_version}.{event_number}",
"sequence_id": sequence_id,
"payload_hash": hash(
json.dumps(
event.payload,
Expand Down
Loading