Skip to content

Commit

Permalink
Stream Enhancements
Browse files Browse the repository at this point in the history
Contains:
- Unify metadata in Event and Command objects
- Remove `MessageMetadata` class
- Build Message metadata directly from event/command object
- Introduce `kind`, `stream_name`, and `origin_stream_name` to metadata
- Initialize event store in `init`
- Don't build stream name in different places. Use `.meta_stream_name`
  • Loading branch information
subhashb committed Jul 4, 2024
1 parent af95302 commit 8302be0
Show file tree
Hide file tree
Showing 46 changed files with 473 additions and 233 deletions.
11 changes: 11 additions & 0 deletions docs/core-concepts/building-blocks/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ consistent and informed.

## Facts

### Events are always associated with aggregates. { data-toc-label="Linked to Aggregates" }
An event is always associated to the aggregate that emits it. Events of an
event type are emitted to the aggregate stream that the event type is
associated with.

### Events are essentially Data Transfer Objects (DTO). { data-toc-label="Data Transfer Objects" }
They can only hold simple fields and Value Objects.

Expand Down Expand Up @@ -97,6 +102,11 @@ or notifying external consumers for choice events, like `LowInventoryAlert`.
They are also appropriate for composing a custom view of the state based on
events (for example in Command Query Resource Separation).

#### Multiple Event Types

Aggregates usually emit events of multiple delta event types. Each event
is individually associated with the aggregate.

### Fact Events

A fact event encloses the entire state of the aggregate at that specific point
Expand All @@ -112,6 +122,7 @@ multiple delta event types, which can be risky and error-prone, especially as
data schemas evolve and change over time. Instead, they rely on the owning
service to compute and produce a fully detailed fact event.


## Persistence

### Events are stored in an Event Store. { data-toc-label="Event Store" }
Expand Down
2 changes: 1 addition & 1 deletion docs/stylesheets/extra.css
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ p a, article ul li a {
}

/* Primary color and tighter space */
.md-typeset h1, .md-typeset h2, .md-typeset h3 {
.md-typeset h1, .md-typeset h2, .md-typeset h3, .md-typeset h4 {
color: var(--md-primary-fg-color);
letter-spacing: -.025em;
}
Expand Down
66 changes: 23 additions & 43 deletions src/protean/adapters/event_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,38 +31,32 @@ def __init__(self, domain):

@property
def store(self):
if self._event_store is None:
self._initialize()

return self._event_store

def _initialize(self):
if not self._event_store:
logger.debug("Initializing Event Store...")

configured_event_store = self.domain.config["event_store"]
if configured_event_store and isinstance(configured_event_store, dict):
event_store_full_path = EVENT_STORE_PROVIDERS[
configured_event_store["provider"]
]
event_store_module, event_store_class = event_store_full_path.rsplit(
".", maxsplit=1
)
logger.debug("Initializing Event Store...")

configured_event_store = self.domain.config["event_store"]
if configured_event_store and isinstance(configured_event_store, dict):
event_store_full_path = EVENT_STORE_PROVIDERS[
configured_event_store["provider"]
]
event_store_module, event_store_class = event_store_full_path.rsplit(
".", maxsplit=1
)

event_store_cls = getattr(
importlib.import_module(event_store_module), event_store_class
)
event_store_cls = getattr(
importlib.import_module(event_store_module), event_store_class
)

store = event_store_cls(self.domain, configured_event_store)
else:
raise ConfigurationError(
"Configure at least one event store in the domain"
)
store = event_store_cls(self.domain, configured_event_store)
else:
raise ConfigurationError("Configure at least one event store in the domain")

Check warning on line 54 in src/protean/adapters/event_store/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/protean/adapters/event_store/__init__.py#L54

Added line #L54 was not covered by tests

self._event_store = store
self._event_store = store

self._initialize_event_streams()
self._initialize_command_streams()
self._initialize_event_streams()
self._initialize_command_streams()

return self._event_store

Expand All @@ -85,9 +79,6 @@ def _initialize_command_streams(self):
)

def repository_for(self, part_of):
if self._event_store is None:
self._initialize()

repository_cls = type(
part_of.__name__ + "Repository", (BaseEventSourcedRepository,), {}
)
Expand All @@ -97,20 +88,12 @@ def repository_for(self, part_of):
return repository_cls(self.domain)

def handlers_for(self, event: BaseEvent) -> List[BaseEventHandler]:
if self._event_streams is None:
self._initialize_event_streams()

"""Return all handlers configured to run on the given event."""
# Gather handlers configured to run on all events
all_stream_handlers = self._event_streams.get("$all", set())

# Get the Aggregate's stream_name
aggregate_stream_name = None
if event.meta_.aggregate_cluster:
aggregate_stream_name = event.meta_.aggregate_cluster.meta_.stream_name

stream_name = event.meta_.stream_name or aggregate_stream_name
stream_handlers = self._event_streams.get(stream_name, set())

# Gather all handlers that are configured to run on this event
# Gather all handlers configured to run on this event
stream_handlers = self._event_streams.get(event.meta_.stream_name, set())
configured_stream_handlers = set()
for stream_handler in stream_handlers:
if fqn(event.__class__) in stream_handler._handlers:
Expand All @@ -119,9 +102,6 @@ def handlers_for(self, event: BaseEvent) -> List[BaseEventHandler]:
return set.union(configured_stream_handlers, all_stream_handlers)

def command_handler_for(self, command: BaseCommand) -> Optional[BaseCommandHandler]:
if self._command_streams is None:
self._initialize_command_streams()

stream_name = command.meta_.stream_name or (
command.meta_.part_of.meta_.stream_name if command.meta_.part_of else None
)
Expand Down
5 changes: 3 additions & 2 deletions src/protean/adapters/event_store/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
from typing import Any, Dict, List

from protean.core.aggregate import BaseAggregate
from protean.core.event import Metadata
from protean.core.repository import BaseRepository
from protean.globals import current_domain
from protean.port.event_store import BaseEventStore
from protean.utils.mixins import MessageMetadata, MessageRecord
from protean.utils.mixins import MessageRecord


class MemoryMessage(BaseAggregate, MessageRecord):
Expand Down Expand Up @@ -52,7 +53,7 @@ def write(
position=next_position,
type=message_type,
data=data,
metadata=MessageMetadata(**metadata) if metadata else None,
metadata=metadata,
time=datetime.now(UTC),
)
)
Expand Down
11 changes: 6 additions & 5 deletions src/protean/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
ValidationError,
)
from protean.fields import Auto, Field, FieldBase, ValueObject
from protean.globals import current_domain
from protean.globals import g
from protean.reflection import id_field
from protean.utils import generate_identity

Expand Down Expand Up @@ -396,10 +396,11 @@ def raise_(self, event, fact_event=False) -> None:
event_with_metadata = event.__class__(
event.to_dict(),
_metadata={
"id": (
f"{current_domain.name}.{self.__class__.__name__}.{event._metadata.version}"
f".{identifier}.{self._version}"
),
"id": (f"{self.meta_.stream_name}-{identifier}-{self._version}"),
"type": f"{self.__class__.__name__}.{event.__class__.__name__}.{event._metadata.version}",
"kind": "EVENT",
"stream_name": self.meta_.stream_name,
"origin_stream_name": event._metadata.origin_stream_name,
"timestamp": event._metadata.timestamp,
"version": event._metadata.version,
"sequence_id": self._version,
Expand Down
45 changes: 42 additions & 3 deletions src/protean/core/command.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from protean.container import BaseContainer, OptionsMixin
from protean.core.event import Metadata
from protean.exceptions import (
IncorrectUsageError,
InvalidDataError,
NotSupportedError,
ValidationError,
)
from protean.fields import Field
from protean.fields import Field, ValueObject
from protean.globals import g
from protean.reflection import _ID_FIELD_NAME, declared_fields
from protean.utils import DomainObjects, derive_element_class

Expand All @@ -24,6 +26,9 @@ def __new__(cls, *args, **kwargs):
raise NotSupportedError("BaseCommand cannot be instantiated")
return super().__new__(cls)

# Track Metadata
_metadata = ValueObject(Metadata, default=lambda: Metadata()) # pragma: no cover

def __init_subclass__(subclass) -> None:
super().__init_subclass__()

Expand All @@ -32,7 +37,30 @@ def __init_subclass__(subclass) -> None:

def __init__(self, *args, **kwargs):
try:
super().__init__(*args, **kwargs)
super().__init__(*args, finalize=False, **kwargs)

version = (
self.__class__.__version__
if hasattr(self.__class__, "__version__")
else "v1"
)

origin_stream_name = None
if hasattr(g, "message_in_context"):
if g.message_in_context.metadata.kind == "EVENT":
origin_stream_name = g.message_in_context.stream_name

# Value Objects are immutable, so we create a clone/copy and associate it
self._metadata = Metadata(
self._metadata.to_dict(), # Template
kind="COMMAND",
origin_stream_name=origin_stream_name,
version=version,
)

# Finally lock the event and make it immutable
self._initialized = True

except ValidationError as exception:
raise InvalidDataError(exception.messages)

Expand All @@ -50,11 +78,22 @@ def __setattr__(self, name, value):

@classmethod
def _default_options(cls):
part_of = (
getattr(cls.meta_, "part_of") if hasattr(cls.meta_, "part_of") else None
)

# This method is called during class import, so we cannot use part_of if it
# is still a string. We ignore it for now, and resolve `stream_name` later
# when the domain has resolved references.
# FIXME A better mechanism would be to not set stream_name here, unless explicitly
# specified, and resolve it during `domain.init()`
part_of = None if isinstance(part_of, str) else part_of

return [
("abstract", False),
("aggregate_cluster", None),
("part_of", None),
("stream_name", None),
("stream_name", part_of.meta_.stream_name if part_of else None),
]

@classmethod
Expand Down
9 changes: 6 additions & 3 deletions src/protean/core/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from protean.exceptions import IncorrectUsageError, NotSupportedError, ValidationError
from protean.fields import Auto, HasMany, Reference, ValueObject
from protean.fields.association import Association
from protean.globals import current_domain
from protean.globals import g
from protean.reflection import (
_FIELDS,
attributes,
Expand Down Expand Up @@ -449,9 +449,12 @@ def raise_(self, event) -> None:
event.to_dict(),
_metadata={
"id": (
f"{current_domain.name}.{self.__class__.__name__}.{event._metadata.version}"
f".{identifier}.{aggregate_version}.{event_number}"
f"{self._root.meta_.stream_name}-{identifier}-{aggregate_version}.{event_number}"
),
"type": f"{self._root.__class__.__name__}.{event.__class__.__name__}.{event._metadata.version}",
"kind": "EVENT",
"stream_name": self._root.meta_.stream_name,
"origin_stream_name": event._metadata.origin_stream_name,
"timestamp": event._metadata.timestamp,
"version": event._metadata.version,
"sequence_id": f"{aggregate_version}.{event_number}",
Expand Down
41 changes: 36 additions & 5 deletions src/protean/core/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from protean.core.value_object import BaseValueObject
from protean.exceptions import IncorrectUsageError, NotSupportedError
from protean.fields import DateTime, Field, Integer, String, ValueObject
from protean.globals import g
from protean.reflection import _ID_FIELD_NAME, declared_fields, fields
from protean.utils import DomainObjects, derive_element_class

Expand All @@ -17,6 +18,20 @@ class Metadata(BaseValueObject):
# Format is <domain-name>.<event-class-name>.<event-version>.<aggregate-id>.<aggregate-version>
id = String()

# Type of the event
# Format is <domain-name>.<event-class-name>.<event-version>
type = String()

# Kind of the object
# Can be one of "EVENT", "COMMAND"
kind = String()

# Name of the stream to which the event/command is written
stream_name = String()

# Name of the stream that originated this event/command
origin_stream_name = String()

# Time of event generation
timestamp = DateTime(default=lambda: datetime.now(timezone.utc))

Expand Down Expand Up @@ -114,11 +129,27 @@ def __track_id_field(subclass):
def __init__(self, *args, **kwargs):
super().__init__(*args, finalize=False, **kwargs)

if hasattr(self.__class__, "__version__"):
# Value Objects are immutable, so we create a clone/copy and associate it
self._metadata = Metadata(
self._metadata.to_dict(), version=self.__class__.__version__
)
version = (
self.__class__.__version__
if hasattr(self.__class__, "__version__")
else "v1"
)

origin_stream_name = None
if hasattr(g, "message_in_context"):
if (
g.message_in_context.metadata.kind == "COMMAND"
and g.message_in_context.metadata.origin_stream_name is not None
):
origin_stream_name = g.message_in_context.metadata.origin_stream_name

# Value Objects are immutable, so we create a clone/copy and associate it
self._metadata = Metadata(
self._metadata.to_dict(), # Template
kind="EVENT",
origin_stream_name=origin_stream_name,
version=version,
)

# Finally lock the event and make it immutable
self._initialized = True
Expand Down
1 change: 1 addition & 0 deletions src/protean/domain/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ def _initialize(self):
self.providers._initialize()
self.caches._initialize()
self.brokers._initialize()
self.event_store._initialize()

def make_config(self):
"""Used to construct the config; invoked by the Domain constructor."""
Expand Down
Loading

0 comments on commit 8302be0

Please sign in to comment.