Skip to content

Commit

Permalink
Rename aggregate's stream_name option to stream_category
Browse files Browse the repository at this point in the history
  • Loading branch information
subhashb committed Jul 13, 2024
1 parent 711f0df commit e7f2347
Show file tree
Hide file tree
Showing 38 changed files with 130 additions and 106 deletions.
1 change: 1 addition & 0 deletions docs/guides/change-state/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ carry intent and information necessary to perform a specific action.

## Key Facts

- Commands are unique throughout the domain.
- Commands are typically named using imperative verbs that clearly describe the intended action or change. E.g. CreateOrder, UpdateCustomerAddress,
ShipProduct, and CancelReservation.
- Commands are typically related to an aggregate, because aggregates are the
Expand Down
4 changes: 2 additions & 2 deletions docs/guides/compose-a-domain/register-elements.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ depending upon the element being registered.
{! docs_src/guides/composing-a-domain/015.py !}
```

In the above example, the `User` aggregate's default stream name **`user`** is
customized to **`account`**.
In the above example, the `User` aggregate's default stream category **`user`**
is customized to **`account`**.

Review the [object model](../object-model.md) to understand
multiple ways to pass these options. Refer to each domain element's
Expand Down
12 changes: 8 additions & 4 deletions docs/guides/consume-state/event-handlers.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,14 @@ Out[8]: {
## Configuration Options

- **`part_of`**: The aggregate to which the event handler is connected.
- **`stream_name`**: The event handler listens to events on this stream.
The stream name defaults to the aggregate's stream. This option comes handy
when the event handler belongs to an aggregate and needs to listen to another
aggregate's events.
- **`stream_category`**: The event handler listens to events on this stream
category. The stream category defaults to
[the category of the aggregate](../domain-definition/aggregates.md#stream_category)
associated with the handler.

An Event Handler can be part of an aggregate, and have the stream category of
a different aggregate. This is the mechanism for an aggregate to listen to
another aggregate's events to sync its own state.
- **`source_stream`**: When specified, the event handler only consumes events
generated in response to events or commands from this original stream.
For example, `EmailNotifications` event handler listening to `OrderShipped`
Expand Down
11 changes: 11 additions & 0 deletions docs/guides/domain-definition/aggregates.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,17 @@ the aggregate.
`Customizing Persistence schemas` for more information.
<!-- FIXME Add link to customizing persistence schemas -->

### `stream_category`

The stream to which the aggregate outpus events and processes commands from.
The category is automatically derived as the `underscore` version of the
aggregate's name, but can be overridden. E.g. `User` has `user` as the
automatic stream category, `OrderItem` will have `order_item`.

The stream category is used by all elements in the aggregate's cluster,
including Command Handlers and Event Handlers to determine the event or command
stream to listen to.

## Associations

Protean provides multiple options for Aggregates to weave object graphs with
Expand Down
6 changes: 6 additions & 0 deletions docs/patterns/creating-identities-early.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
An identity can be created very early in the game.
Right in the Frontend.
Can be packaged into the API call
Identifier can be used when constructing the command.
This applies only to first time creation.
After the first time, every command after should contain the identifier anyway.
35 changes: 16 additions & 19 deletions src/protean/adapters/event_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ def _initialize(self) -> None:

def _initialize_event_streams(self):
for _, record in self.domain.registry.event_handlers.items():
stream_name = (
record.cls.meta_.stream_name
or record.cls.meta_.part_of.meta_.stream_name
stream_category = (
record.cls.meta_.stream_category
or record.cls.meta_.part_of.meta_.stream_category
)
self._event_streams[stream_name].add(record.cls)
self._event_streams[stream_category].add(record.cls)

def _initialize_command_streams(self):
for _, record in self.domain.registry.command_handlers.items():
self._command_streams[record.cls.meta_.part_of.meta_.stream_name].add(
self._command_streams[record.cls.meta_.part_of.meta_.stream_category].add(
record.cls
)

Expand All @@ -99,7 +99,7 @@ def handlers_for(self, event: BaseEvent) -> List[BaseEventHandler]:

# Gather all handlers configured to run on this event
stream_handlers = self._event_streams.get(
event.meta_.part_of.meta_.stream_name, set()
event.meta_.part_of.meta_.stream_category, set()
)
configured_stream_handlers = set()
for stream_handler in stream_handlers:
Expand All @@ -114,9 +114,9 @@ def command_handler_for(self, command: BaseCommand) -> Optional[BaseCommandHandl
f"Command `{command.__name__}` needs to be associated with an aggregate"
)

stream_name = command.meta_.part_of.meta_.stream_name
stream_category = command.meta_.part_of.meta_.stream_category

handler_classes = self._command_streams.get(stream_name, set())
handler_classes = self._command_streams.get(stream_category, set())

# No command handlers have been configured to run this command
if len(handler_classes) == 0:
Expand All @@ -142,19 +142,19 @@ def command_handler_for(self, command: BaseCommand) -> Optional[BaseCommandHandl
return next(iter(handler_methods))[0] if handler_methods else None

def last_event_of_type(
self, event_cls: Type[BaseEvent], stream_name: str = None
self, event_cls: Type[BaseEvent], stream_category: str = None
) -> BaseEvent:
stream_name = stream_name or "$all"
stream_category = stream_category or "$all"
events = [
event
for event in self.domain.event_store.store._read(stream_name)
for event in self.domain.event_store.store._read(stream_category)
if event["type"] == event_cls.__type__
]

return Message.from_dict(events[-1]).to_object() if len(events) > 0 else None

def events_of_type(
self, event_cls: Type[BaseEvent], stream_name: str = None
self, event_cls: Type[BaseEvent], stream_category: str = None
) -> List[BaseEvent]:
"""Read events of a specific type in a given stream.
Expand All @@ -163,16 +163,13 @@ def events_of_type(
If no stream is specified, events of the requested type will be retrieved from all streams.
:param event_cls: Class of the event type to be retrieved
:param stream_name: Stream from which events are to be retrieved
:type event_cls: BaseEvent Class
:type stream_name: String, optional, default is `None`
:param event_cls: Class of the event type to be retrieved. Subclass of `BaseEvent`.
:param stream_category: Stream from which events are to be retrieved. String, optional, default is `None`
:return: A list of events of `event_cls` type
:rtype: list
"""
stream_name = stream_name or "$all"
stream_category = stream_category or "$all"
return [
Message.from_dict(event).to_object()
for event in self.domain.event_store.store._read(stream_name)
for event in self.domain.event_store.store._read(stream_category)
if event["type"] == event_cls.__type__
]
2 changes: 1 addition & 1 deletion src/protean/adapters/event_store/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def read(
)

if stream_name == "$all":
pass # Don't filter on stream name
pass # Don't filter on stream name or category
elif self.is_category(stream_name):
# If filtering on category, ensure the supplied stream name
# is the only thing in the category.
Expand Down
4 changes: 2 additions & 2 deletions src/protean/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,9 +405,9 @@ def raise_(self, event, fact_event=False) -> None:

# Set Fact Event stream to be `<aggregate_stream_name>-fact`
if event.__class__.__name__.endswith("FactEvent"):
stream_name = f"{self.meta_.stream_name}-fact-{identifier}"
stream_name = f"{self.meta_.stream_category}-fact-{identifier}"
else:
stream_name = f"{self.meta_.stream_name}-{identifier}"
stream_name = f"{self.meta_.stream_category}-{identifier}"

event_with_metadata = event.__class__(
event.to_dict(),
Expand Down
2 changes: 1 addition & 1 deletion src/protean/core/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def _default_options(cls):
("model", None),
("provider", "default"),
("schema_name", inflection.underscore(cls.__name__)),
("stream_name", inflection.underscore(cls.__name__)),
("stream_category", inflection.underscore(cls.__name__)),
]


Expand Down
4 changes: 2 additions & 2 deletions src/protean/core/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,9 @@ def raise_(self, event) -> None:

# Set Fact Event stream to be `<aggregate_stream_name>-fact`
if event.__class__.__name__.endswith("FactEvent"):
stream_name = f"{self._root.meta_.stream_name}-fact-{identifier}"
stream_name = f"{self._root.meta_.stream_category}-fact-{identifier}"
else:
stream_name = f"{self._root.meta_.stream_name}-{identifier}"
stream_name = f"{self._root.meta_.stream_category}-{identifier}"

event_with_metadata = event.__class__(
event.to_dict(),
Expand Down
4 changes: 2 additions & 2 deletions src/protean/core/event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ def _default_options(cls):
return [
("part_of", None),
("source_stream", None),
("stream_name", part_of.meta_.stream_name if part_of else None),
("stream_category", part_of.meta_.stream_category if part_of else None),
]


def event_handler_factory(element_cls, domain, **opts):
element_cls = derive_element_class(element_cls, BaseEventHandler, **opts)

if not (element_cls.meta_.part_of or element_cls.meta_.stream_name):
if not (element_cls.meta_.part_of or element_cls.meta_.stream_category):
raise IncorrectUsageError(
{
"_entity": [
Expand Down
2 changes: 1 addition & 1 deletion src/protean/core/event_sourced_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def _default_options(cls):
("aggregate_cluster", None),
("auto_add_id_field", True),
("fact_events", False),
("stream_name", inflection.underscore(cls.__name__)),
("stream_category", inflection.underscore(cls.__name__)),
]

def __init_subclass__(subclass) -> None:
Expand Down
2 changes: 1 addition & 1 deletion src/protean/core/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ def get(self, identifier) -> BaseAggregate:

# Fetch and sync events version
last_message = current_domain.event_store.store.read_last_message(
f"{aggregate.meta_.stream_name}-{identifier}"
f"{aggregate.meta_.stream_category}-{identifier}"
)
if last_message:
aggregate._event_position = last_message.position
Expand Down
4 changes: 3 additions & 1 deletion src/protean/domain/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,9 @@ def _enrich_command(self, command: BaseCommand) -> BaseCommand:
else:
identifier = str(uuid4())

stream_name = f"{command.meta_.part_of.meta_.stream_name}:command-{identifier}"
stream_name = (
f"{command.meta_.part_of.meta_.stream_category}:command-{identifier}"
)

origin_stream_name = None
if hasattr(g, "message_in_context"):
Expand Down
8 changes: 4 additions & 4 deletions src/protean/port/event_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def load_aggregate(
or None.
"""
snapshot_message = self._read_last_message(
f"{part_of.meta_.stream_name}:snapshot-{identifier}"
f"{part_of.meta_.stream_category}:snapshot-{identifier}"
)

if snapshot_message:
Expand All @@ -135,7 +135,7 @@ def load_aggregate(

event_stream = deque(
self._read(
f"{part_of.meta_.stream_name}-{identifier}",
f"{part_of.meta_.stream_category}-{identifier}",
position=aggregate._version + 1,
)
)
Expand All @@ -147,7 +147,7 @@ def load_aggregate(
else:
# No snapshot, so initialize aggregate from events
event_stream = deque(
self._read(f"{part_of.meta_.stream_name}-{identifier}")
self._read(f"{part_of.meta_.stream_category}-{identifier}")
)

if not event_stream:
Expand Down Expand Up @@ -181,7 +181,7 @@ def load_aggregate(
# and also avoids spurious data just to satisfy Metadata's structure
# and conditions.
self._write(
f"{part_of.meta_.stream_name}:snapshot-{identifier}",
f"{part_of.meta_.stream_category}:snapshot-{identifier}",
"SNAPSHOT",
aggregate.to_dict(),
)
Expand Down
6 changes: 3 additions & 3 deletions src/protean/server/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ def __init__(self, domain, test_mode: bool = False, debug: bool = False) -> None
self._subscriptions[handler_name] = Subscription(
self,
handler_name,
record.cls.meta_.stream_name
or record.cls.meta_.part_of.meta_.stream_name,
record.cls.meta_.stream_category
or record.cls.meta_.part_of.meta_.stream_category,
record.cls,
origin_stream_name=record.cls.meta_.source_stream,
)
Expand All @@ -68,7 +68,7 @@ def __init__(self, domain, test_mode: bool = False, debug: bool = False) -> None
self._subscriptions[handler_name] = Subscription(
self,
handler_name,
f"{record.cls.meta_.part_of.meta_.stream_name}:command",
f"{record.cls.meta_.part_of.meta_.stream_category}:command",
record.cls,
)

Expand Down
10 changes: 5 additions & 5 deletions src/protean/server/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(
self,
engine,
subscriber_id: str,
stream_name: str,
stream_category: str,
handler: Union[BaseEventHandler, BaseCommandHandler],
messages_per_tick: int = 10,
position_update_interval: int = 10,
Expand All @@ -40,7 +40,7 @@ def __init__(
Args:
engine: The Protean engine instance.
subscriber_id (str): The unique identifier for the subscriber.
stream_name (str): The name of the stream to subscribe to.
stream_category (str): The name of the stream to subscribe to.
handler (Union[BaseEventHandler, BaseCommandHandler]): The event or command handler.
messages_per_tick (int, optional): The number of messages to process per tick. Defaults to 10.
position_update_interval (int, optional): The interval at which to update the current position. Defaults to 10.
Expand All @@ -53,7 +53,7 @@ def __init__(
self.loop = engine.loop

self.subscriber_id = subscriber_id
self.stream_name = stream_name
self.stream_category = stream_category
self.handler = handler
self.messages_per_tick = messages_per_tick
self.position_update_interval = position_update_interval
Expand Down Expand Up @@ -218,7 +218,7 @@ def write_position(self, position: int) -> int:
{"position": position},
metadata={
"kind": MessageType.READ_POSITION.value,
"origin_stream_name": self.stream_name,
"origin_stream_name": self.stream_category,
},
)

Expand Down Expand Up @@ -259,7 +259,7 @@ async def get_next_batch_of_messages(self):
List[Message]: The next batch of messages to process.
"""
messages = self.store.read(
self.stream_name,
self.stream_category,
position=self.current_position + 1,
no_of_messages=self.messages_per_tick,
) # FIXME Implement filtering
Expand Down
2 changes: 1 addition & 1 deletion tests/aggregate/events/test_aggregate_event_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def register_elements(test_domain):

class TestDeltaEvents:
def test_aggregate_stream_name(self):
assert User.meta_.stream_name == "user"
assert User.meta_.stream_category == "user"

def test_event_metadata(self):
user = User(name="John Doe", email="[email protected]")
Expand Down
2 changes: 1 addition & 1 deletion tests/command/test_command_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def test_command_associated_with_aggregate(test_domain):

@pytest.mark.eventstore
def test_command_associated_with_aggregate_with_custom_stream_name(test_domain):
test_domain.register(User, stream_name="foo")
test_domain.register(User, stream_category="foo")
test_domain.register(Register, part_of=User)
test_domain.init(traverse=False)

Expand Down
6 changes: 3 additions & 3 deletions tests/event/test_event_part_of_resolution.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ def register_elements(test_domain):
test_domain.register(UserLoggedIn, part_of="User")


def test_event_does_not_have_stream_name_before_domain_init():
def test_event_does_not_have_stream_category_before_domain_init():
assert isinstance(UserLoggedIn.meta_.part_of, str)


def test_event_has_stream_name_after_domain_init(test_domain):
def test_event_has_stream_category_after_domain_init(test_domain):
test_domain.init(traverse=False)

assert UserLoggedIn.meta_.part_of == User
assert UserLoggedIn.meta_.part_of.meta_.stream_name == "user"
assert UserLoggedIn.meta_.part_of.meta_.stream_category == "user"
2 changes: 1 addition & 1 deletion tests/event/test_raising_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class UserLoggedIn(BaseEvent):

@pytest.mark.eventstore
def test_raising_event(test_domain):
test_domain.register(User, stream_name="authentication")
test_domain.register(User, stream_category="authentication")
test_domain.register(UserLoggedIn, part_of=User)
test_domain.init(traverse=False)

Expand Down
Loading

0 comments on commit e7f2347

Please sign in to comment.