Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream Enhancements #441

Merged
merged 4 commits into from
Jul 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,6 @@ omit =
show_missing = true
precision = 2
omit = *migrations*

exclude_lines =
pragma: no cover
if TYPE_CHECKING:
4 changes: 2 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
"module": "pytest",
"justMyCode": false,
"args": [
"tests/adapters/model/sqlalchemy_model/postgresql/test_array_datatype.py::test_array_data_type_association",
"tests/adapters/model/sqlalchemy_model/postgresql/test_json_datatype.py::test_persistence_of_json_with_array_data",
"--postgresql"
]
},
Expand Down Expand Up @@ -108,7 +108,7 @@
"module": "pytest",
"justMyCode": false,
"args": [
"tests/adapters/model/elasticsearch_model/tests.py::TestDefaultModel::test_dynamically_constructed_model_attributes",
"tests/adapters/model/elasticsearch_model/tests.py::TestModelWithVO::test_conversion_from_model_to_entity",
"--elasticsearch"
]
},
Expand Down
11 changes: 11 additions & 0 deletions docs/core-concepts/building-blocks/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ consistent and informed.

## Facts

### Events are always associated with aggregates. { data-toc-label="Linked to Aggregates" }
An event is always associated to the aggregate that emits it. Events of an
event type are emitted to the aggregate stream that the event type is
associated with.

### Events are essentially Data Transfer Objects (DTO). { data-toc-label="Data Transfer Objects" }
They can only hold simple fields and Value Objects.

Expand Down Expand Up @@ -97,6 +102,11 @@ or notifying external consumers for choice events, like `LowInventoryAlert`.
They are also appropriate for composing a custom view of the state based on
events (for example in Command Query Resource Separation).

#### Multiple Event Types

Aggregates usually emit events of multiple delta event types. Each event
is individually associated with the aggregate.

### Fact Events

A fact event encloses the entire state of the aggregate at that specific point
Expand All @@ -112,6 +122,7 @@ multiple delta event types, which can be risky and error-prone, especially as
data schemas evolve and change over time. Instead, they rely on the owning
service to compute and produce a fully detailed fact event.


## Persistence

### Events are stored in an Event Store. { data-toc-label="Event Store" }
Expand Down
2 changes: 1 addition & 1 deletion docs/stylesheets/extra.css
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ p a, article ul li a {
}

/* Primary color and tighter space */
.md-typeset h1, .md-typeset h2, .md-typeset h3 {
.md-typeset h1, .md-typeset h2, .md-typeset h3, .md-typeset h4 {
color: var(--md-primary-fg-color);
letter-spacing: -.025em;
}
Expand Down
106 changes: 47 additions & 59 deletions src/protean/adapters/event_store/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

import importlib
import logging
from collections import defaultdict
from typing import List, Optional, Type
from typing import TYPE_CHECKING, DefaultDict, List, Optional, Set, Type

from protean import BaseEvent, BaseEventHandler
from protean.core.command import BaseCommand
Expand All @@ -14,6 +16,10 @@
from protean.utils import fqn
from protean.utils.mixins import Message

if TYPE_CHECKING:
from protean.domain import Domain
from protean.port.event_store import BaseEventStore

logger = logging.getLogger(__name__)

EVENT_STORE_PROVIDERS = {
Expand All @@ -24,51 +30,47 @@

class EventStore:
def __init__(self, domain):
self.domain = domain
self._event_store = None
self._event_streams = None
self._command_streams = None
self.domain: Domain = domain
self._event_store: BaseEventStore = None
self._event_streams: DefaultDict[str, Set[BaseEventHandler]] = defaultdict(set)
self._command_streams: DefaultDict[str, Set[BaseCommandHandler]] = defaultdict(
set
)

@property
def store(self):
if self._event_store is None:
self._initialize()

return self._event_store

def _initialize(self):
if not self._event_store:
logger.debug("Initializing Event Store...")

configured_event_store = self.domain.config["event_store"]
if configured_event_store and isinstance(configured_event_store, dict):
event_store_full_path = EVENT_STORE_PROVIDERS[
configured_event_store["provider"]
]
event_store_module, event_store_class = event_store_full_path.rsplit(
".", maxsplit=1
)
def _initialize_event_store(self) -> BaseEventStore:
configured_event_store = self.domain.config["event_store"]
event_store_full_path = EVENT_STORE_PROVIDERS[
configured_event_store["provider"]
]
event_store_module, event_store_class = event_store_full_path.rsplit(
".", maxsplit=1
)

event_store_cls = getattr(
importlib.import_module(event_store_module), event_store_class
)
event_store_cls = getattr(
importlib.import_module(event_store_module), event_store_class
)

store = event_store_cls(self.domain, configured_event_store)
else:
raise ConfigurationError(
"Configure at least one event store in the domain"
)
store = event_store_cls(self.domain, configured_event_store)

self._event_store = store
return store

self._initialize_event_streams()
self._initialize_command_streams()
def _initialize(self) -> None:
logger.debug("Initializing Event Store...")

return self._event_store
# Initialize the Event Store
#
# An event store is always present by default. If not configured explicitly,
# a memory-based event store is used.
self._event_store = self._initialize_event_store()

def _initialize_event_streams(self):
self._event_streams = defaultdict(set)
self._initialize_event_streams()
self._initialize_command_streams()

def _initialize_event_streams(self):
for _, record in self.domain.registry.event_handlers.items():
stream_name = (
record.cls.meta_.stream_name
Expand All @@ -77,17 +79,12 @@ def _initialize_event_streams(self):
self._event_streams[stream_name].add(record.cls)

def _initialize_command_streams(self):
self._command_streams = defaultdict(set)

for _, record in self.domain.registry.command_handlers.items():
self._command_streams[record.cls.meta_.part_of.meta_.stream_name].add(
record.cls
)

def repository_for(self, part_of):
if self._event_store is None:
self._initialize()

repository_cls = type(
part_of.__name__ + "Repository", (BaseEventSourcedRepository,), {}
)
Expand All @@ -97,20 +94,14 @@ def repository_for(self, part_of):
return repository_cls(self.domain)

def handlers_for(self, event: BaseEvent) -> List[BaseEventHandler]:
if self._event_streams is None:
self._initialize_event_streams()

"""Return all handlers configured to run on the given event."""
# Gather handlers configured to run on all events
all_stream_handlers = self._event_streams.get("$all", set())

# Get the Aggregate's stream_name
aggregate_stream_name = None
if event.meta_.aggregate_cluster:
aggregate_stream_name = event.meta_.aggregate_cluster.meta_.stream_name

stream_name = event.meta_.stream_name or aggregate_stream_name
stream_handlers = self._event_streams.get(stream_name, set())

# Gather all handlers that are configured to run on this event
# Gather all handlers configured to run on this event
stream_handlers = self._event_streams.get(
event.meta_.part_of.meta_.stream_name, set()
)
configured_stream_handlers = set()
for stream_handler in stream_handlers:
if fqn(event.__class__) in stream_handler._handlers:
Expand All @@ -119,15 +110,12 @@ def handlers_for(self, event: BaseEvent) -> List[BaseEventHandler]:
return set.union(configured_stream_handlers, all_stream_handlers)

def command_handler_for(self, command: BaseCommand) -> Optional[BaseCommandHandler]:
if self._command_streams is None:
self._initialize_command_streams()

stream_name = command.meta_.stream_name or (
command.meta_.part_of.meta_.stream_name if command.meta_.part_of else None
)
if not command.meta_.part_of:
raise ConfigurationError(
f"Command `{command.__name__}` needs to be associated with an aggregate"
)

if not stream_name:
return None
stream_name = command.meta_.part_of.meta_.stream_name

handler_classes = self._command_streams.get(stream_name, set())

Expand Down
4 changes: 2 additions & 2 deletions src/protean/adapters/event_store/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from protean.core.repository import BaseRepository
from protean.globals import current_domain
from protean.port.event_store import BaseEventStore
from protean.utils.mixins import MessageMetadata, MessageRecord
from protean.utils.mixins import MessageRecord


class MemoryMessage(BaseAggregate, MessageRecord):
Expand Down Expand Up @@ -52,7 +52,7 @@ def write(
position=next_position,
type=message_type,
data=data,
metadata=MessageMetadata(**metadata) if metadata else None,
metadata=metadata,
time=datetime.now(UTC),
)
)
Expand Down
27 changes: 12 additions & 15 deletions src/protean/adapters/repository/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,6 @@
from protean.reflection import attributes, fields, id_field
from protean.utils.query import Q

# Global in-memory store of dict data. Keyed by name, to provide
# multiple named local memory caches.
_databases = defaultdict(dict)
_locks = defaultdict(Lock)
_counters = defaultdict(count)


def derive_schema_name(model_cls):
if hasattr(model_cls.meta_, "schema_name"):
Expand Down Expand Up @@ -63,9 +57,9 @@ def __init__(self, provider, new_connection=False):
self._db = current_uow._sessions[self._provider.name]._db
else:
self._db = {
"data": copy.deepcopy(_databases),
"lock": _locks.setdefault(self._provider.name, Lock()),
"counters": _counters,
"data": copy.deepcopy(self._provider._databases),
"lock": self._provider._locks.setdefault(self._provider.name, Lock()),
"counters": self._provider._counters,
}

def add(self, element):
Expand All @@ -84,8 +78,7 @@ def commit(self):
if current_uow and self._provider.name in current_uow._sessions:
current_uow._sessions[self._provider.name]._db["data"] = self._db["data"]
else:
global _databases
_databases = self._db["data"]
self._provider._databases = self._db["data"]

def rollback(self):
pass
Expand All @@ -104,6 +97,11 @@ def __init__(self, name, domain, conn_info: dict):
conn_info["database"] = "memory"
super().__init__(name, domain, conn_info)

# Global in-memory store of dict data.
self._databases = defaultdict(dict)
self._locks = defaultdict(Lock)
self._counters = defaultdict(count)

# A temporary cache of already constructed model classes
self._model_classes = {}

Expand All @@ -122,10 +120,9 @@ def get_connection(self, session_cls=None):

def _data_reset(self):
"""Reset data"""
global _databases, _locks, _counters
_databases = defaultdict(dict)
_locks = defaultdict(Lock)
_counters = defaultdict(count)
self._databases = defaultdict(dict)
self._locks = defaultdict(Lock)
self._counters = defaultdict(count)

# Discard any active Unit of Work
if current_uow and current_uow.in_progress:
Expand Down
17 changes: 17 additions & 0 deletions src/protean/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
CORE = "CORE"
EVENTSTORE = "EVENTSTORE"
DATABASE = "DATABASE"
COVERAGE = "COVERAGE"
FULL = "FULL"


Expand Down Expand Up @@ -119,6 +120,22 @@
for store in ["MESSAGE_DB"]:
print(f"Running tests for EVENTSTORE: {store}...")
subprocess.call(commands + ["-m", "eventstore", f"--store={store}"])
case "COVERAGE":
subprocess.call(

Check warning on line 124 in src/protean/cli/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/protean/cli/__init__.py#L124

Added line #L124 was not covered by tests
commands
+ [
"--slow",
"--sqlite",
"--postgresql",
"--elasticsearch",
"--redis",
"--message_db",
"--cov=protean",
"--cov-config",
".coveragerc",
"tests",
]
)
case _:
print("Running core tests...")
subprocess.call(commands)
Expand Down
Loading