Skip to content

Commit

Permalink
Stream enhancements - Part 2
Browse files Browse the repository at this point in the history
Changes:
- Ensure stream names have aggregate id within them
- Use `stream_name` in Event/Command metadata as-is for persistence into event store
- Base handlers and handler retreival on stream name
- Ensure commands always belong to an aggregate cluster
- Remove `stream_name` option from events and and commands
  All elements in an aggregate cluster should use the aggregate's stream name
- Enhance `Domain.process` to enrich command before processing
  • Loading branch information
subhashb committed Jul 7, 2024
1 parent 978ece9 commit b600eff
Show file tree
Hide file tree
Showing 34 changed files with 301 additions and 340 deletions.
14 changes: 8 additions & 6 deletions src/protean/adapters/event_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ def handlers_for(self, event: BaseEvent) -> List[BaseEventHandler]:
all_stream_handlers = self._event_streams.get("$all", set())

# Gather all handlers configured to run on this event
stream_handlers = self._event_streams.get(event.meta_.stream_name, set())
stream_handlers = self._event_streams.get(
event.meta_.part_of.meta_.stream_name, set()
)
configured_stream_handlers = set()
for stream_handler in stream_handlers:
if fqn(event.__class__) in stream_handler._handlers:
Expand All @@ -102,12 +104,12 @@ def handlers_for(self, event: BaseEvent) -> List[BaseEventHandler]:
return set.union(configured_stream_handlers, all_stream_handlers)

def command_handler_for(self, command: BaseCommand) -> Optional[BaseCommandHandler]:
stream_name = command.meta_.stream_name or (
command.meta_.part_of.meta_.stream_name if command.meta_.part_of else None
)
if not command.meta_.part_of:
raise ConfigurationError(
f"Command `{command.__name__}` needs to be associated with an aggregate"
)

if not stream_name:
return None
stream_name = command.meta_.part_of.meta_.stream_name

handler_classes = self._command_streams.get(stream_name, set())

Expand Down
1 change: 0 additions & 1 deletion src/protean/adapters/event_store/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from typing import Any, Dict, List

from protean.core.aggregate import BaseAggregate
from protean.core.event import Metadata
from protean.core.repository import BaseRepository
from protean.globals import current_domain
from protean.port.event_store import BaseEventStore
Expand Down
19 changes: 16 additions & 3 deletions src/protean/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
from typing import Any, Type, Union

from protean.exceptions import (
ConfigurationError,
InvalidDataError,
NotSupportedError,
ValidationError,
)
from protean.fields import Auto, Field, FieldBase, ValueObject
from protean.globals import g
from protean.reflection import id_field
from protean.utils import generate_identity

Expand Down Expand Up @@ -388,18 +388,31 @@ def raise_(self, event, fact_event=False) -> None:
Event is immutable, so we clone a new event object from the event raised,
and add the enhanced metadata to it.
"""
# Verify that event is indeed associated with this aggregate
if event.meta_.part_of != self.__class__:
raise ConfigurationError(
f"Event `{event.__class__.__name__}` is not associated with "
f"aggregate `{self.__class__.__name__}`"
)

if not fact_event:
self._version += 1

identifier = getattr(self, id_field(self).field_name)

# Set Fact Event stream to be `<aggregate_stream_name>-fact`
if event.__class__.__name__.endswith("FactEvent"):
stream_name = f"{self.meta_.stream_name}-fact"
else:
stream_name = self.meta_.stream_name

event_with_metadata = event.__class__(
event.to_dict(),
_metadata={
"id": (f"{self.meta_.stream_name}-{identifier}-{self._version}"),
"id": (f"{stream_name}-{identifier}-{self._version}"),
"type": f"{self.__class__.__name__}.{event.__class__.__name__}.{event._metadata.version}",
"kind": "EVENT",
"stream_name": self.meta_.stream_name,
"stream_name": f"{stream_name}-{identifier}",
"origin_stream_name": event._metadata.origin_stream_name,
"timestamp": event._metadata.timestamp,
"version": event._metadata.version,
Expand Down
28 changes: 11 additions & 17 deletions src/protean/core/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
)
from protean.fields import Field, ValueObject
from protean.globals import g
from protean.reflection import _ID_FIELD_NAME, declared_fields
from protean.reflection import _ID_FIELD_NAME, declared_fields, fields
from protean.utils import DomainObjects, derive_element_class


Expand Down Expand Up @@ -64,6 +64,15 @@ def __init__(self, *args, **kwargs):
except ValidationError as exception:
raise InvalidDataError(exception.messages)

@property
def payload(self):
"""Return the payload of the event."""
return {
field_name: field_obj.as_dict(getattr(self, field_name, None))
for field_name, field_obj in fields(self).items()
if field_name not in {"_metadata"}
}

def __setattr__(self, name, value):
if not hasattr(self, "_initialized") or not self._initialized:
return super().__setattr__(name, value)
Expand All @@ -78,22 +87,10 @@ def __setattr__(self, name, value):

@classmethod
def _default_options(cls):
part_of = (
getattr(cls.meta_, "part_of") if hasattr(cls.meta_, "part_of") else None
)

# This method is called during class import, so we cannot use part_of if it
# is still a string. We ignore it for now, and resolve `stream_name` later
# when the domain has resolved references.
# FIXME A better mechanism would be to not set stream_name here, unless explicitly
# specified, and resolve it during `domain.init()`
part_of = None if isinstance(part_of, str) else part_of

return [
("abstract", False),
("aggregate_cluster", None),
("part_of", None),
("stream_name", part_of.meta_.stream_name if part_of else None),
]

@classmethod
Expand All @@ -119,10 +116,7 @@ def __track_id_field(subclass):
def command_factory(element_cls, **kwargs):
element_cls = derive_element_class(element_cls, BaseCommand, **kwargs)

if (
not (element_cls.meta_.part_of or element_cls.meta_.stream_name)
and not element_cls.meta_.abstract
):
if not element_cls.meta_.part_of and not element_cls.meta_.abstract:
raise IncorrectUsageError(
{
"_command": [
Expand Down
13 changes: 12 additions & 1 deletion src/protean/core/command_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,23 @@ def command_handler_factory(element_cls, **kwargs):
}
)

# Throw error if target_cls is not associated with an aggregate
if not method._target_cls.meta_.part_of:
raise IncorrectUsageError(
{
"_command_handler": [
f"Command `{method._target_cls.__name__}` in Command Handler `{element_cls.__name__}` "
"is not associated with an aggregate"
]
}
)

# Associate Command with the handler's stream
# Order of preference:
# 1. Stream name defined in command
# 2. Stream name derived from aggregate associated with command handler
method._target_cls.meta_.stream_name = (
method._target_cls.meta_.stream_name
method._target_cls.meta_.part_of.meta_.stream_name
or element_cls.meta_.part_of.meta_.stream_name
)

Expand Down
13 changes: 9 additions & 4 deletions src/protean/core/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from protean.exceptions import IncorrectUsageError, NotSupportedError, ValidationError
from protean.fields import Auto, HasMany, Reference, ValueObject
from protean.fields.association import Association
from protean.globals import g
from protean.reflection import (
_FIELDS,
attributes,
Expand Down Expand Up @@ -443,17 +442,23 @@ def raise_(self, event) -> None:
# in the same edit session
event_number = len(self._root._events) + 1

identifier = getattr(self, id_field(self).field_name)
identifier = getattr(self._root, id_field(self._root).field_name)

# Set Fact Event stream to be `<aggregate_stream_name>-fact`
if event.__class__.__name__.endswith("FactEvent"):
stream_name = f"{self._root.meta_.stream_name}-fact"
else:
stream_name = self._root.meta_.stream_name

event_with_metadata = event.__class__(
event.to_dict(),
_metadata={
"id": (
f"{self._root.meta_.stream_name}-{identifier}-{aggregate_version}.{event_number}"
f"{stream_name}-{identifier}-{aggregate_version}.{event_number}"
),
"type": f"{self._root.__class__.__name__}.{event.__class__.__name__}.{event._metadata.version}",
"kind": "EVENT",
"stream_name": self._root.meta_.stream_name,
"stream_name": f"{stream_name}-{identifier}",
"origin_stream_name": event._metadata.origin_stream_name,
"timestamp": event._metadata.timestamp,
"version": event._metadata.version,
Expand Down
17 changes: 1 addition & 16 deletions src/protean/core/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,22 +89,10 @@ def __setattr__(self, name, value):

@classmethod
def _default_options(cls):
part_of = (
getattr(cls.meta_, "part_of") if hasattr(cls.meta_, "part_of") else None
)

# This method is called during class import, so we cannot use part_of if it
# is still a string. We ignore it for now, and resolve `stream_name` later
# when the domain has resolved references.
# FIXME A better mechanism would be to not set stream_name here, unless explicitly
# specified, and resolve it during `domain.init()`
part_of = None if isinstance(part_of, str) else part_of

return [
("abstract", False),
("aggregate_cluster", None),
("part_of", None),
("stream_name", part_of.meta_.stream_name if part_of else None),
]

@classmethod
Expand Down Expand Up @@ -189,10 +177,7 @@ def to_dict(self):
def domain_event_factory(element_cls, **kwargs):
element_cls = derive_element_class(element_cls, BaseEvent, **kwargs)

if (
not (element_cls.meta_.part_of or element_cls.meta_.stream_name)
and not element_cls.meta_.abstract
):
if not element_cls.meta_.part_of and not element_cls.meta_.abstract:
raise IncorrectUsageError(
{
"_event": [
Expand Down
55 changes: 48 additions & 7 deletions src/protean/domain/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
"""

import inspect
import json
import logging
import sys
from collections import defaultdict
from functools import lru_cache
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from uuid import uuid4

from werkzeug.datastructures import ImmutableDict

Expand All @@ -27,7 +29,8 @@
NotSupportedError,
)
from protean.fields import HasMany, HasOne, Reference, ValueObject
from protean.reflection import declared_fields, has_fields
from protean.globals import g
from protean.reflection import declared_fields, has_fields, id_field
from protean.utils import (
CommandProcessing,
DomainObjects,
Expand Down Expand Up @@ -804,11 +807,7 @@ def _generate_fact_event_classes(self):
for _, element in self.registry._elements[element_type.value].items():
if element.cls.meta_.fact_events:
event_cls = element_to_fact_event(element.cls)
self.register(
event_cls,
part_of=element.cls,
stream_name=element.cls.meta_.stream_name + "-fact",
)
self.register(event_cls, part_of=element.cls)

######################
# Element Decorators #
Expand Down Expand Up @@ -935,6 +934,47 @@ def publish(self, events: Union[BaseEvent, List[BaseEvent]]) -> None:
#####################
# Handling Commands #
#####################
def _enrich_command(self, command: BaseCommand) -> BaseCommand:
# Enrich Command
identifier = None
identity_field = id_field(command)
if identity_field:
identifier = getattr(command, identity_field.field_name)
else:
identifier = str(uuid4())

stream_name = f"{command.meta_.part_of.meta_.stream_name}:command-{identifier}"

origin_stream_name = None
if hasattr(g, "message_in_context"):
if g.message_in_context.metadata.kind == "EVENT":
origin_stream_name = g.message_in_context.stream_name

command_with_metadata = command.__class__(
command.to_dict(),
_metadata={
"id": (str(uuid4())),
"type": (
f"{command.meta_.part_of.__class__.__name__}.{command.__class__.__name__}."
f"{command._metadata.version}"
),
"kind": "EVENT",
"stream_name": stream_name,
"origin_stream_name": origin_stream_name,
"timestamp": command._metadata.timestamp,
"version": command._metadata.version,
"sequence_id": None,
"payload_hash": hash(
json.dumps(
command.payload,
sort_keys=True,
)
),
},
)

return command_with_metadata

def process(self, command: BaseCommand, asynchronous: bool = True) -> Optional[Any]:
"""Process command and return results based on specified preference.
Expand All @@ -950,7 +990,8 @@ def process(self, command: BaseCommand, asynchronous: bool = True) -> Optional[A
Returns:
Optional[Any]: Returns either the command handler's return value or nothing, based on preference.
"""
position = self.event_store.store.append(command)
command_with_metadata = self._enrich_command(command)
position = self.event_store.store.append(command_with_metadata)

if (
not asynchronous
Expand Down
Loading

0 comments on commit b600eff

Please sign in to comment.