Skip to content

Commit

Permalink
Stream Enhancements - Part 3
Browse files Browse the repository at this point in the history
Changes:
- Add `_event_position` temp variable in aggregates to track last event
  position in event store
- Track expected version per event with `_event_position`
- Fetch last event position on aggregate load and update `_event_position`
- Unify method to append event to event store in aggregates and event
  sourced aggregates
- Unify method to construct message from event for aggregates and
  event sourced aggregates
- Track database state within the memory provider instead of a global object
  • Loading branch information
subhashb committed Jul 7, 2024
1 parent b600eff commit 828a4b1
Show file tree
Hide file tree
Showing 34 changed files with 360 additions and 151 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
"module": "pytest",
"justMyCode": false,
"args": [
"tests/adapters/model/sqlalchemy_model/postgresql/test_array_datatype.py::test_array_data_type_association",
"tests/adapters/model/sqlalchemy_model/postgresql/test_json_datatype.py::test_persistence_of_json_with_array_data",
"--postgresql"
]
},
Expand Down
27 changes: 12 additions & 15 deletions src/protean/adapters/repository/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,6 @@
from protean.reflection import attributes, fields, id_field
from protean.utils.query import Q

# Global in-memory store of dict data. Keyed by name, to provide
# multiple named local memory caches.
_databases = defaultdict(dict)
_locks = defaultdict(Lock)
_counters = defaultdict(count)


def derive_schema_name(model_cls):
if hasattr(model_cls.meta_, "schema_name"):
Expand Down Expand Up @@ -63,9 +57,9 @@ def __init__(self, provider, new_connection=False):
self._db = current_uow._sessions[self._provider.name]._db
else:
self._db = {
"data": copy.deepcopy(_databases),
"lock": _locks.setdefault(self._provider.name, Lock()),
"counters": _counters,
"data": copy.deepcopy(self._provider._databases),
"lock": self._provider._locks.setdefault(self._provider.name, Lock()),
"counters": self._provider._counters,
}

def add(self, element):
Expand All @@ -84,8 +78,7 @@ def commit(self):
if current_uow and self._provider.name in current_uow._sessions:
current_uow._sessions[self._provider.name]._db["data"] = self._db["data"]
else:
global _databases
_databases = self._db["data"]
self._provider._databases = self._db["data"]

def rollback(self):
pass
Expand All @@ -104,6 +97,11 @@ def __init__(self, name, domain, conn_info: dict):
conn_info["database"] = "memory"
super().__init__(name, domain, conn_info)

# Global in-memory store of dict data.
self._databases = defaultdict(dict)
self._locks = defaultdict(Lock)
self._counters = defaultdict(count)

# A temporary cache of already constructed model classes
self._model_classes = {}

Expand All @@ -122,10 +120,9 @@ def get_connection(self, session_cls=None):

def _data_reset(self):
"""Reset data"""
global _databases, _locks, _counters
_databases = defaultdict(dict)
_locks = defaultdict(Lock)
_counters = defaultdict(count)
self._databases = defaultdict(dict)
self._locks = defaultdict(Lock)
self._counters = defaultdict(count)

# Discard any active Unit of Work
if current_uow and current_uow.in_progress:
Expand Down
7 changes: 7 additions & 0 deletions src/protean/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,9 @@ def __setattr__(self, name, value):
"_root", # Root entity in the hierarchy
"_owner", # Owner entity in the hierarchy
"_disable_invariant_checks", # Flag to disable invariant checks
"_next_version", # Temp placeholder to track next version of the entity
"_event_position", # Temp placeholder to track event version of the entity
"_expected_version", # Temp placeholder to track expected version of an event
]
or name.startswith(("add_", "remove_", "get_one_from_", "filter_"))
):
Expand Down Expand Up @@ -408,6 +411,7 @@ def raise_(self, event, fact_event=False) -> None:

event_with_metadata = event.__class__(
event.to_dict(),
_expected_version=self._event_position,
_metadata={
"id": (f"{stream_name}-{identifier}-{self._version}"),
"type": f"{self.__class__.__name__}.{event.__class__.__name__}.{event._metadata.version}",
Expand All @@ -426,6 +430,9 @@ def raise_(self, event, fact_event=False) -> None:
},
)

# Increment the event position after generating event
self._event_position = self._event_position + 1

self._events.append(event_with_metadata)


Expand Down
9 changes: 7 additions & 2 deletions src/protean/core/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ def __new__(cls, *args, **kwargs):
_version = Integer(default=-1)

# Temporary variable to track next version of Aggregate
_next_version = Integer(default=0)
_next_version = 0

# Temporary variable to track version of events of Aggregate
# This can be different from the version of the Aggregate itself because
# a single aggregate update could have triggered multiple events.
_event_position = -1

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -95,7 +100,7 @@ def element_to_fact_event(element_cls):
attrs = {
key: value
for key, value in fields(element_cls).items()
if not isinstance(value, Reference) and key not in ["_next_version"]
if not isinstance(value, Reference)
}

# Recursively convert HasOne and HasMany associations to Value Objects
Expand Down
18 changes: 17 additions & 1 deletion src/protean/core/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
from functools import partial

from protean.container import BaseContainer, IdentityMixin, OptionsMixin
from protean.exceptions import IncorrectUsageError, NotSupportedError, ValidationError
from protean.exceptions import (
ConfigurationError,
IncorrectUsageError,
NotSupportedError,
ValidationError,
)
from protean.fields import Auto, HasMany, Reference, ValueObject
from protean.fields.association import Association
from protean.reflection import (
Expand Down Expand Up @@ -427,6 +432,13 @@ def raise_(self, event) -> None:
The event is always registered on the aggregate, irrespective of where
it is raised in the entity cluster."""
# Verify that event is indeed associated with this aggregate
if event.meta_.part_of != self._root.__class__:
raise ConfigurationError(
f"Event `{event.__class__.__name__}` is not associated with"
f" aggregate `{self._root.__class__.__name__}`"
)

# Events are sometimes raised from within the aggregate, well-before persistence.
# In that case, the aggregate's next version has to be considered in events,
# because we want to associate the event with the version that will be persisted.
Expand All @@ -452,6 +464,7 @@ def raise_(self, event) -> None:

event_with_metadata = event.__class__(
event.to_dict(),
_expected_version=self._root._event_position,
_metadata={
"id": (
f"{stream_name}-{identifier}-{aggregate_version}.{event_number}"
Expand All @@ -472,6 +485,9 @@ def raise_(self, event) -> None:
},
)

# Increment the event position after generating event
self._root._event_position = self._root._event_position + 1

self._root._events.append(event_with_metadata)

def __eq__(self, other):
Expand Down
3 changes: 3 additions & 0 deletions src/protean/core/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ def __track_id_field(subclass):
def __init__(self, *args, **kwargs):
super().__init__(*args, finalize=False, **kwargs)

# Store the expected version temporarily for use during persistence
self._expected_version = kwargs.pop("_expected_version", -1)

version = (
self.__class__.__version__
if hasattr(self.__class__, "__version__")
Expand Down
5 changes: 5 additions & 0 deletions src/protean/core/event_sourced_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ class BaseEventSourcedAggregate(
# Track current version of Aggregate
_version = Integer(default=-1)

# Temporary variable to track version of events of Aggregate
# This can be different from the version of the Aggregate itself because
# a single aggregate update could have triggered multiple events.
_event_position = -1

def __new__(cls, *args, **kwargs):
if cls is BaseEventSourcedAggregate:
raise NotSupportedError("BaseEventSourcedAggregate cannot be instantiated")
Expand Down
2 changes: 2 additions & 0 deletions src/protean/core/event_sourced_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ def get(self, identifier: Identifier) -> BaseEventSourcedAggregate:
}
)

aggregate._event_position = aggregate._version

return aggregate


Expand Down
14 changes: 11 additions & 3 deletions src/protean/core/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from protean.core.unit_of_work import UnitOfWork
from protean.exceptions import IncorrectUsageError, NotSupportedError
from protean.fields import HasMany, HasOne
from protean.globals import current_uow
from protean.globals import current_domain, current_uow
from protean.port.dao import BaseDAO
from protean.port.provider import BaseProvider
from protean.reflection import association_fields, has_association_fields
Expand Down Expand Up @@ -139,7 +139,6 @@ def add(self, aggregate: BaseAggregate) -> BaseAggregate: # noqa: C901

# Remove state attribute from the payload, as it is not needed for the Fact Event
payload.pop("state_", None)
payload.pop("_next_version", None)

# Construct and raise the Fact Event
fact_event = aggregate._fact_event_cls(**payload)
Expand Down Expand Up @@ -246,7 +245,16 @@ def get(self, identifier) -> BaseAggregate:
`find_residents_of_area(zipcode)`, etc. It is also possible to make use of more complicated,
domain-friendly design patterns like the `Specification` pattern.
"""
return self._dao.get(identifier)
aggregate = self._dao.get(identifier)

# Fetch and sync events version
last_message = current_domain.event_store.store.read_last_message(
f"{aggregate.meta_.stream_name}-{identifier}"
)
if last_message:
aggregate._event_position = last_message.position

return aggregate


def repository_factory(element_cls, **opts):
Expand Down
15 changes: 4 additions & 11 deletions src/protean/core/unit_of_work.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
)
from protean.globals import _uow_context_stack, current_domain
from protean.reflection import id_field
from protean.utils import DomainObjects, EventProcessing, fqn
from protean.utils import EventProcessing, fqn

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -78,16 +78,9 @@ def commit(self): # noqa: C901
events = []
for _, item in self._identity_map.items():
if item._events:
if item.element_type == DomainObjects.EVENT_SOURCED_AGGREGATE:
for event in item._events:
current_domain.event_store.store.append_aggregate_event(
item, event
)
events.append((item, event))
else:
for event in item._events:
current_domain.event_store.store.append(event)
events.append((item, event))
for event in item._events:
current_domain.event_store.store.append(event)
events.append((item, event))
item._events = []

# Iteratively consume all events produced in this session
Expand Down
21 changes: 6 additions & 15 deletions src/protean/port/event_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,13 @@ def read(
def read_last_message(self, stream_name) -> Message:
# FIXME Rename to read_last_stream_message
raw_message = self._read_last_message(stream_name)
return Message.from_dict(raw_message)
if raw_message:
return Message.from_dict(raw_message)

def append_aggregate_event(
self, aggregate: BaseEventSourcedAggregate, event: BaseEvent
) -> int:
message = Message.to_aggregate_event_message(aggregate, event)
return None

def append(self, object: Union[BaseEvent, BaseCommand]) -> int:
message = Message.to_message(object)

position = self._write(
message.stream_name,
Expand All @@ -102,16 +103,6 @@ def append_aggregate_event(

return position

def append(self, object: Union[BaseEvent, BaseCommand]) -> int:
message = Message.to_message(object)

return self._write(
message.stream_name,
message.type,
message.data,
metadata=message.metadata.to_dict(),
)

def load_aggregate(
self, part_of: Type[BaseEventSourcedAggregate], identifier: Identifier
) -> Optional[BaseEventSourcedAggregate]:
Expand Down
2 changes: 0 additions & 2 deletions src/protean/reflection.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def data_fields(class_or_instance):
fields_dict = dict(getattr(class_or_instance, _FIELDS))

# Remove internal fields
fields_dict.pop("_next_version", None)
fields_dict.pop("_metadata", None)
except AttributeError:
raise IncorrectUsageError(
Expand Down Expand Up @@ -118,7 +117,6 @@ def declared_fields(class_or_instance):

# Remove internal fields
fields_dict.pop("_version", None)
fields_dict.pop("_next_version", None)
fields_dict.pop("_metadata", None)
except AttributeError:
raise IncorrectUsageError(
Expand Down
32 changes: 10 additions & 22 deletions src/protean/utils/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from protean.container import BaseContainer, OptionsMixin
from protean.core.command import BaseCommand
from protean.core.event import BaseEvent, Metadata
from protean.core.event_sourced_aggregate import BaseEventSourcedAggregate
from protean.core.unit_of_work import UnitOfWork
from protean.exceptions import ConfigurationError
from protean.globals import current_domain
Expand Down Expand Up @@ -81,25 +80,6 @@ def from_dict(cls, message: Dict) -> Message:
id=message["id"],
)

@classmethod
def to_aggregate_event_message(
cls, aggregate: BaseEventSourcedAggregate, event: BaseEvent
) -> Message:
# If this is a Fact Event, don't set an expected version.
# Otherwise, expect the previous version
if event.__class__.__name__.endswith("FactEvent"):
expected_version = None
else:
expected_version = int(event._metadata.sequence_id) - 1

return cls(
stream_name=event._metadata.stream_name,
type=fully_qualified_name(event.__class__),
data=event.to_dict(),
metadata=event._metadata,
expected_version=expected_version,
)

def to_object(self) -> Union[BaseEvent, BaseCommand]:
if self.metadata.kind == MessageType.EVENT.value:
element_record = current_domain.registry.events[self.type]
Expand All @@ -123,13 +103,21 @@ def to_message(cls, message_object: Union[BaseEvent, BaseCommand]) -> Message:
"Either specify an explicit stream name or associate the event with an aggregate."
)

stream_name = message_object._metadata.stream_name
# Set the expected version of the stream
# Applies only to events
expected_version = None
if message_object._metadata.kind == MessageType.EVENT.value:
# If this is a Fact Event, don't set an expected version.
# Otherwise, expect the previous version
if not message_object.__class__.__name__.endswith("FactEvent"):
expected_version = message_object._expected_version

return cls(
stream_name=stream_name,
stream_name=message_object._metadata.stream_name,
type=fully_qualified_name(message_object.__class__),
data=message_object.to_dict(),
metadata=message_object._metadata,
expected_version=expected_version,
)


Expand Down
Loading

0 comments on commit 828a4b1

Please sign in to comment.