Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event stream handling enhancements #445

Merged
merged 17 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/guides/change-state/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ carry intent and information necessary to perform a specific action.

## Key Facts

- Commands are unique throughout the domain.
- Commands are typically named using imperative verbs that clearly describe the intended action or change. E.g. CreateOrder, UpdateCustomerAddress,
ShipProduct, and CancelReservation.
- Commands are typically related to an aggregate, because aggregates are the
Expand Down
4 changes: 2 additions & 2 deletions docs/guides/compose-a-domain/register-elements.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ depending upon the element being registered.
{! docs_src/guides/composing-a-domain/015.py !}
```

In the above example, the `User` aggregate's default stream name **`user`** is
customized to **`account`**.
In the above example, the `User` aggregate's default stream category **`user`**
is customized to **`account`**.

Review the [object model](../object-model.md) to understand
multiple ways to pass these options. Refer to each domain element's
Expand Down
12 changes: 8 additions & 4 deletions docs/guides/consume-state/event-handlers.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,14 @@ Out[8]: {
## Configuration Options

- **`part_of`**: The aggregate to which the event handler is connected.
- **`stream_name`**: The event handler listens to events on this stream.
The stream name defaults to the aggregate's stream. This option comes handy
when the event handler belongs to an aggregate and needs to listen to another
aggregate's events.
- **`stream_category`**: The event handler listens to events on this stream
category. The stream category defaults to
[the category of the aggregate](../domain-definition/aggregates.md#stream_category)
associated with the handler.

An Event Handler can be part of an aggregate, and have the stream category of
a different aggregate. This is the mechanism for an aggregate to listen to
another aggregate's events to sync its own state.
- **`source_stream`**: When specified, the event handler only consumes events
generated in response to events or commands from this original stream.
For example, `EmailNotifications` event handler listening to `OrderShipped`
Expand Down
11 changes: 11 additions & 0 deletions docs/guides/domain-definition/aggregates.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,17 @@ the aggregate.
`Customizing Persistence schemas` for more information.
<!-- FIXME Add link to customizing persistence schemas -->

### `stream_category`

The stream to which the aggregate outpus events and processes commands from.
The category is automatically derived as the `underscore` version of the
aggregate's name, but can be overridden. E.g. `User` has `user` as the
automatic stream category, `OrderItem` will have `order_item`.

The stream category is used by all elements in the aggregate's cluster,
including Command Handlers and Event Handlers to determine the event or command
stream to listen to.

## Associations

Protean provides multiple options for Aggregates to weave object graphs with
Expand Down
6 changes: 6 additions & 0 deletions docs/patterns/creating-identities-early.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
An identity can be created very early in the game.
Right in the Frontend.
Can be packaged into the API call
Identifier can be used when constructing the command.
This applies only to first time creation.
After the first time, every command after should contain the identifier anyway.
Empty file.
46 changes: 21 additions & 25 deletions src/protean/adapters/event_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
event_sourced_repository_factory,
)
from protean.exceptions import ConfigurationError, NotSupportedError
from protean.utils import fqn
from protean.utils.mixins import Message

if TYPE_CHECKING:
Expand Down Expand Up @@ -72,15 +71,15 @@ def _initialize(self) -> None:

def _initialize_event_streams(self):
for _, record in self.domain.registry.event_handlers.items():
stream_name = (
record.cls.meta_.stream_name
or record.cls.meta_.part_of.meta_.stream_name
stream_category = (
record.cls.meta_.stream_category
or record.cls.meta_.part_of.meta_.stream_category
)
self._event_streams[stream_name].add(record.cls)
self._event_streams[stream_category].add(record.cls)

def _initialize_command_streams(self):
for _, record in self.domain.registry.command_handlers.items():
self._command_streams[record.cls.meta_.part_of.meta_.stream_name].add(
self._command_streams[record.cls.meta_.part_of.meta_.stream_category].add(
record.cls
)

Expand All @@ -89,7 +88,7 @@ def repository_for(self, part_of):
part_of.__name__ + "Repository", (BaseEventSourcedRepository,), {}
)
repository_cls = event_sourced_repository_factory(
repository_cls, part_of=part_of
repository_cls, self.domain, part_of=part_of
)
return repository_cls(self.domain)

Expand All @@ -100,11 +99,11 @@ def handlers_for(self, event: BaseEvent) -> List[BaseEventHandler]:

# Gather all handlers configured to run on this event
stream_handlers = self._event_streams.get(
event.meta_.part_of.meta_.stream_name, set()
event.meta_.part_of.meta_.stream_category, set()
)
configured_stream_handlers = set()
for stream_handler in stream_handlers:
if fqn(event.__class__) in stream_handler._handlers:
if event.__class__.__type__ in stream_handler._handlers:
configured_stream_handlers.add(stream_handler)

return set.union(configured_stream_handlers, all_stream_handlers)
Expand All @@ -115,9 +114,9 @@ def command_handler_for(self, command: BaseCommand) -> Optional[BaseCommandHandl
f"Command `{command.__name__}` needs to be associated with an aggregate"
)

stream_name = command.meta_.part_of.meta_.stream_name
stream_category = command.meta_.part_of.meta_.stream_category

handler_classes = self._command_streams.get(stream_name, set())
handler_classes = self._command_streams.get(stream_category, set())

# No command handlers have been configured to run this command
if len(handler_classes) == 0:
Expand All @@ -129,7 +128,7 @@ def command_handler_for(self, command: BaseCommand) -> Optional[BaseCommandHandl
for handler_cls in handler_classes:
try:
handler_method = next(
iter(handler_cls._handlers[fqn(command.__class__)])
iter(handler_cls._handlers[command.__class__.__type__])
)
handler_methods.add((handler_cls, handler_method))
except StopIteration:
Expand All @@ -143,19 +142,19 @@ def command_handler_for(self, command: BaseCommand) -> Optional[BaseCommandHandl
return next(iter(handler_methods))[0] if handler_methods else None

def last_event_of_type(
self, event_cls: Type[BaseEvent], stream_name: str = None
self, event_cls: Type[BaseEvent], stream_category: str = None
) -> BaseEvent:
stream_name = stream_name or "$all"
stream_category = stream_category or "$all"
events = [
event
for event in self.domain.event_store.store._read(stream_name)
if event["type"] == fqn(event_cls)
for event in self.domain.event_store.store._read(stream_category)
if event["type"] == event_cls.__type__
]

return Message.from_dict(events[-1]).to_object() if len(events) > 0 else None

def events_of_type(
self, event_cls: Type[BaseEvent], stream_name: str = None
self, event_cls: Type[BaseEvent], stream_category: str = None
) -> List[BaseEvent]:
"""Read events of a specific type in a given stream.

Expand All @@ -164,16 +163,13 @@ def events_of_type(

If no stream is specified, events of the requested type will be retrieved from all streams.

:param event_cls: Class of the event type to be retrieved
:param stream_name: Stream from which events are to be retrieved
:type event_cls: BaseEvent Class
:type stream_name: String, optional, default is `None`
:param event_cls: Class of the event type to be retrieved. Subclass of `BaseEvent`.
:param stream_category: Stream from which events are to be retrieved. String, optional, default is `None`
:return: A list of events of `event_cls` type
:rtype: list
"""
stream_name = stream_name or "$all"
stream_category = stream_category or "$all"
return [
Message.from_dict(event).to_object()
for event in self.domain.event_store.store._read(stream_name)
if event["type"] == fqn(event_cls)
for event in self.domain.event_store.store._read(stream_category)
if event["type"] == event_cls.__type__
]
8 changes: 6 additions & 2 deletions src/protean/adapters/event_store/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,13 @@ def read(
)

if stream_name == "$all":
pass # Don't filter on stream name
pass # Don't filter on stream name or category
elif self.is_category(stream_name):
q = q.filter(stream_name__contains=stream_name)
# If filtering on category, ensure the supplied stream name
# is the only thing in the category.
# Eg. If stream is 'user', then only 'user' should be in the category,
# and not even `user:command`
q = q.filter(stream_name__contains=f"{stream_name}-")
else:
q = q.filter(stream_name=stream_name)

Expand Down
4 changes: 3 additions & 1 deletion src/protean/adapters/repository/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ def __delitem__(self, key):

def _construct_repository(self, part_of):
repository_cls = type(part_of.__name__ + "Repository", (BaseRepository,), {})
repository_cls = repository_factory(repository_cls, part_of=part_of)
repository_cls = repository_factory(
repository_cls, self.domain, part_of=part_of
)
return repository_cls

def _register_repository(self, part_of, repository_cls):
Expand Down
17 changes: 9 additions & 8 deletions src/protean/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,21 +403,22 @@ def raise_(self, event, fact_event=False) -> None:

identifier = getattr(self, id_field(self).field_name)

# Set Fact Event stream to be `<aggregate_stream_name>-fact`
# Set Fact Event stream to be `<aggregate_stream>-fact`
if event.__class__.__name__.endswith("FactEvent"):
stream_name = f"{self.meta_.stream_name}-fact"
stream = f"{self.meta_.stream_category}-fact-{identifier}"
else:
stream_name = self.meta_.stream_name
stream = f"{self.meta_.stream_category}-{identifier}"

event_with_metadata = event.__class__(
event.to_dict(),
_expected_version=self._event_position,
_metadata={
"id": (f"{stream_name}-{identifier}-{self._version}"),
"type": f"{self.__class__.__name__}.{event.__class__.__name__}.{event._metadata.version}",
"kind": "EVENT",
"stream_name": f"{stream_name}-{identifier}",
"origin_stream_name": event._metadata.origin_stream_name,
"id": (f"{stream}-{self._version}"),
"type": event._metadata.type,
"fqn": event._metadata.fqn,
"kind": event._metadata.kind,
"stream": stream,
"origin_stream": event._metadata.origin_stream,
"timestamp": event._metadata.timestamp,
"version": event._metadata.version,
"sequence_id": self._version,
Expand Down
11 changes: 8 additions & 3 deletions src/protean/core/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def _default_options(cls):
("model", None),
("provider", "default"),
("schema_name", inflection.underscore(cls.__name__)),
("stream_name", inflection.underscore(cls.__name__)),
("stream_category", inflection.underscore(cls.__name__)),
]


Expand Down Expand Up @@ -143,8 +143,8 @@ def element_to_fact_event(element_cls):
return event_cls


def aggregate_factory(element_cls, **kwargs):
element_cls = derive_element_class(element_cls, BaseAggregate, **kwargs)
def aggregate_factory(element_cls, domain, **opts):
element_cls = derive_element_class(element_cls, BaseAggregate, **opts)

# Iterate through methods marked as `@invariant` and record them for later use
# `_invariants` is a dictionary initialized in BaseEntity.__init_subclass__
Expand All @@ -155,6 +155,11 @@ def aggregate_factory(element_cls, **kwargs):
) and hasattr(method, "_invariant"):
element_cls._invariants[method._invariant][method_name] = method

# Set stream name to be `domain_name::aggregate_name`
element_cls.meta_.stream_category = (
f"{domain.normalized_name}::{element_cls.meta_.stream_category}"
)

return element_cls


Expand Down
4 changes: 2 additions & 2 deletions src/protean/core/application_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ def _default_options(cls):
return []


def application_service_factory(element_cls, **kwargs):
return derive_element_class(element_cls, BaseApplicationService, **kwargs)
def application_service_factory(element_cls, domain, **opts):
return derive_element_class(element_cls, BaseApplicationService, **opts)
45 changes: 39 additions & 6 deletions src/protean/core/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
ValidationError,
)
from protean.fields import Field, ValueObject
from protean.fields.association import Association, Reference
from protean.globals import g
from protean.reflection import _ID_FIELD_NAME, declared_fields, fields
from protean.utils import DomainObjects, derive_element_class
from protean.utils import DomainObjects, derive_element_class, fqn


class BaseCommand(BaseContainer, OptionsMixin):
Expand All @@ -35,6 +36,26 @@ def __init_subclass__(subclass) -> None:
if not subclass.meta_.abstract:
subclass.__track_id_field()

# Use explicit version if specified, else default to "v1"
if not hasattr(subclass, "__version__"):
setattr(subclass, "__version__", "v1")

subclass.__validate_for_basic_field_types()

@classmethod
def __validate_for_basic_field_types(subclass):
for field_name, field_obj in fields(subclass).items():
# Value objects can hold all kinds of fields, except associations
if isinstance(field_obj, (Reference, Association)):
raise IncorrectUsageError(
{
"_event": [
f"Commands cannot have associations. "
f"Remove {field_name} ({field_obj.__class__.__name__}) from class {subclass.__name__}"
]
}
)

def __init__(self, *args, **kwargs):
try:
super().__init__(*args, finalize=False, **kwargs)
Expand All @@ -45,16 +66,17 @@ def __init__(self, *args, **kwargs):
else "v1"
)

origin_stream_name = None
origin_stream = None
if hasattr(g, "message_in_context"):
if g.message_in_context.metadata.kind == "EVENT":
origin_stream_name = g.message_in_context.stream_name
origin_stream = g.message_in_context.stream_name

# Value Objects are immutable, so we create a clone/copy and associate it
self._metadata = Metadata(
self._metadata.to_dict(), # Template
kind="COMMAND",
origin_stream_name=origin_stream_name,
fqn=fqn(self.__class__),
origin_stream=origin_stream,
version=version,
)

Expand Down Expand Up @@ -112,9 +134,20 @@ def __track_id_field(subclass):
# No Identity fields declared
pass

def to_dict(self):
"""Return data as a dictionary.

We need to override this method in Command, because `to_dict()` of `BaseContainer`
eliminates `_metadata`.
"""
return {
field_name: field_obj.as_dict(getattr(self, field_name, None))
for field_name, field_obj in fields(self).items()
}


def command_factory(element_cls, **kwargs):
element_cls = derive_element_class(element_cls, BaseCommand, **kwargs)
def command_factory(element_cls, domain, **opts):
element_cls = derive_element_class(element_cls, BaseCommand, **opts)

if not element_cls.meta_.part_of and not element_cls.meta_.abstract:
raise IncorrectUsageError(
Expand Down
Loading