diff --git a/src/protean/__init__.py b/src/protean/__init__.py index e44a6720..185a038d 100644 --- a/src/protean/__init__.py +++ b/src/protean/__init__.py @@ -1,6 +1,6 @@ __version__ = "0.12.1" -from .core.aggregate import BaseAggregate, atomic_change +from .core.aggregate import BaseAggregate, apply, atomic_change from .core.application_service import BaseApplicationService from .core.command import BaseCommand from .core.command_handler import BaseCommandHandler @@ -9,7 +9,6 @@ from .core.entity import BaseEntity, invariant from .core.event import BaseEvent from .core.event_handler import BaseEventHandler -from .core.event_sourced_aggregate import BaseEventSourcedAggregate, apply from .core.model import BaseModel from .core.queryset import Q, QuerySet from .core.repository import BaseRepository @@ -33,7 +32,6 @@ "BaseEntity", "BaseEvent", "BaseEventHandler", - "BaseEventSourcedAggregate", "BaseModel", "BaseRepository", "BaseSerializer", diff --git a/src/protean/core/aggregate.py b/src/protean/core/aggregate.py index a27e73e1..c88d05f8 100644 --- a/src/protean/core/aggregate.py +++ b/src/protean/core/aggregate.py @@ -1,14 +1,16 @@ """Aggregate Functionality and Classes""" +import functools import inspect import logging +import typing from collections import defaultdict from typing import List from protean.core.entity import BaseEntity from protean.core.event import BaseEvent from protean.core.value_object import BaseValueObject -from protean.exceptions import NotSupportedError +from protean.exceptions import IncorrectUsageError, NotSupportedError from protean.fields import HasMany, HasOne, Integer, Reference, ValueObject from protean.fields import List as ProteanList from protean.reflection import fields @@ -235,3 +237,43 @@ def __exit__(self, *args): # Validate on exit to trigger invariant checks self.aggregate._disable_invariant_checks = False self.aggregate._postcheck() + + +def apply(fn): + """Decorator to mark methods in EventHandler classes.""" + + if len(typing.get_type_hints(fn)) > 2: + raise IncorrectUsageError( + { + "_entity": [ + f"Handler method `{fn.__name__}` has incorrect number of arguments" + ] + } + ) + + try: + _event_cls = next( + iter( + { + value + for value in typing.get_type_hints(fn).values() + if inspect.isclass(value) and issubclass(value, BaseEvent) + } + ) + ) + except StopIteration: + raise IncorrectUsageError( + { + "_entity": [ + f"Apply method `{fn.__name__}` should accept an argument annotated with the Event class" + ] + } + ) + + @functools.wraps(fn) + def wrapper(*args): + fn(*args) + + setattr(wrapper, "_event_cls", _event_cls) + + return wrapper diff --git a/src/protean/core/event_sourced_aggregate.py b/src/protean/core/event_sourced_aggregate.py deleted file mode 100644 index 37ab28e2..00000000 --- a/src/protean/core/event_sourced_aggregate.py +++ /dev/null @@ -1,183 +0,0 @@ -import functools -import inspect -import logging -import typing -from collections import defaultdict -from typing import List - -from protean.container import BaseContainer, EventedMixin, IdentityMixin, OptionsMixin -from protean.core.event import BaseEvent -from protean.exceptions import IncorrectUsageError, NotSupportedError -from protean.fields import Integer -from protean.reflection import id_field -from protean.utils import ( - DomainObjects, - derive_element_class, - fully_qualified_name, - inflection, -) - -logger = logging.getLogger(__name__) - - -class BaseEventSourcedAggregate( - OptionsMixin, IdentityMixin, EventedMixin, BaseContainer -): - """Base Event Sourced Aggregate class that all EventSourced Aggregates should inherit from. - - The order of inheritance is important. We want BaseContainer to be initialised first followed by - OptionsMixin (so that `meta_` is in place) before inheriting other mixins.""" - - element_type = DomainObjects.EVENT_SOURCED_AGGREGATE - - # Track current version of Aggregate - _version = Integer(default=-1) - - # Temporary variable to track version of events of Aggregate - # This can be different from the version of the Aggregate itself because - # a single aggregate update could have triggered multiple events. - _event_position = -1 - - def __new__(cls, *args, **kwargs): - if cls is BaseEventSourcedAggregate: - raise NotSupportedError("BaseEventSourcedAggregate cannot be instantiated") - return super().__new__(cls) - - @classmethod - def _default_options(cls): - return [ - ("aggregate_cluster", None), - ("auto_add_id_field", True), - ("fact_events", False), - ("stream_category", inflection.underscore(cls.__name__)), - ] - - def __init_subclass__(subclass) -> None: - super().__init_subclass__() - - # Associate a `_projections` map with subclasses. - # It needs to be initialized here because if it - # were initialized in __init__, the same collection object - # would be made available across all subclasses, - # defeating its purpose. - setattr(subclass, "_projections", defaultdict(set)) - - # Store associated events - setattr(subclass, "_events_cls_map", {}) - - def __eq__(self, other): - """Equivalence check to be based only on Identity""" - - # FIXME Enhanced Equality Checks - # * Ensure IDs have values and both of them are not null - # * Ensure that the ID is of the right type - # * Ensure that Objects belong to the same `type` - # * Check Reference equality - - # FIXME Check if `==` and `in` operator work with __eq__ - - if type(other) is type(self): - self_id = getattr(self, id_field(self).field_name) - other_id = getattr(other, id_field(other).field_name) - - return self_id == other_id - - return False - - def __hash__(self): - """Overrides the default implementation and bases hashing on identity""" - - # FIXME Add Object Class Type to hash - return hash(getattr(self, id_field(self).field_name)) - - def _apply(self, event: BaseEvent) -> None: - """Apply the event onto the aggregate by calling the appropriate projection. - - Args: - event (BaseEvent): Event object to apply - """ - # FIXME Handle case of missing projection - event_name = fully_qualified_name(event.__class__) - - # FIXME Handle case of missing projection method - if event_name not in self._projections: - raise NotImplementedError( - f"No handler registered for event `{event_name}` in `{self.__class__.__name__}`" - ) - - for fn in self._projections[event_name]: - # Call event handler method - fn(self, event) - self._version += 1 - - @classmethod - def from_events(cls, events: List[BaseEvent]) -> "BaseEventSourcedAggregate": - """Reconstruct an aggregate from a list of events.""" - # Initialize the aggregate with the first event's payload and apply it - aggregate = cls(**events[0].payload) - aggregate._apply(events[0]) - - # Apply the rest of the events - for event in events[1:]: - aggregate._apply(event) - - return aggregate - - -def apply(fn): - """Decorator to mark methods in EventHandler classes.""" - - if len(typing.get_type_hints(fn)) > 2: - raise IncorrectUsageError( - { - "_entity": [ - f"Handler method `{fn.__name__}` has incorrect number of arguments" - ] - } - ) - - try: - _event_cls = next( - iter( - { - value - for value in typing.get_type_hints(fn).values() - if inspect.isclass(value) and issubclass(value, BaseEvent) - } - ) - ) - except StopIteration: - raise IncorrectUsageError( - { - "_entity": [ - f"Apply method `{fn.__name__}` should accept an argument annotated with the Event class" - ] - } - ) - - @functools.wraps(fn) - def wrapper(*args): - fn(*args) - - setattr(wrapper, "_event_cls", _event_cls) - - return wrapper - - -def event_sourced_aggregate_factory(element_cls, domain, **opts): - element_cls = derive_element_class(element_cls, BaseEventSourcedAggregate, **opts) - - # Iterate through methods marked as `@apply` and construct a projections map - methods = inspect.getmembers(element_cls, predicate=inspect.isroutine) - for method_name, method in methods: - if not ( - method_name.startswith("__") and method_name.endswith("__") - ) and hasattr(method, "_event_cls"): - element_cls._projections[fully_qualified_name(method._event_cls)].add( - method - ) - element_cls._events_cls_map[fully_qualified_name(method._event_cls)] = ( - method._event_cls - ) - - return element_cls diff --git a/src/protean/core/event_sourced_repository.py b/src/protean/core/event_sourced_repository.py index acb4f1fe..f605504d 100644 --- a/src/protean/core/event_sourced_repository.py +++ b/src/protean/core/event_sourced_repository.py @@ -1,6 +1,6 @@ import logging -from protean import BaseEventSourcedAggregate, UnitOfWork +from protean import BaseAggregate, UnitOfWork from protean.container import Element, OptionsMixin from protean.exceptions import ( IncorrectUsageError, @@ -30,7 +30,7 @@ def __new__(cls, *args, **kwargs): def __init__(self, domain) -> None: self._domain = domain - def add(self, aggregate: BaseEventSourcedAggregate) -> None: + def add(self, aggregate: BaseAggregate) -> None: if aggregate is None: raise IncorrectUsageError( {"_entity": ["Aggregate object to persist is invalid"]} @@ -63,7 +63,7 @@ def add(self, aggregate: BaseEventSourcedAggregate) -> None: if own_current_uow: own_current_uow.commit() - def get(self, identifier: Identifier) -> BaseEventSourcedAggregate: + def get(self, identifier: Identifier) -> BaseAggregate: """Retrieve a fully-formed Aggregate from a stream of Events. If the aggregate was already loaded in the current UnitOfWork, diff --git a/src/protean/domain/__init__.py b/src/protean/domain/__init__.py index badc6c98..d45ea172 100644 --- a/src/protean/domain/__init__.py +++ b/src/protean/domain/__init__.py @@ -445,7 +445,6 @@ def factory_for(self, domain_object_type): from protean.core.entity import entity_factory from protean.core.event import domain_event_factory from protean.core.event_handler import event_handler_factory - from protean.core.event_sourced_aggregate import event_sourced_aggregate_factory from protean.core.event_sourced_repository import ( event_sourced_repository_factory, ) @@ -463,7 +462,6 @@ def factory_for(self, domain_object_type): DomainObjects.COMMAND_HANDLER.value: command_handler_factory, DomainObjects.EVENT.value: domain_event_factory, DomainObjects.EVENT_HANDLER.value: event_handler_factory, - DomainObjects.EVENT_SOURCED_AGGREGATE.value: event_sourced_aggregate_factory, DomainObjects.EVENT_SOURCED_REPOSITORY.value: event_sourced_repository_factory, DomainObjects.DOMAIN_SERVICE.value: domain_service_factory, DomainObjects.EMAIL.value: email_factory, @@ -568,7 +566,6 @@ def _resolve_references(self): field_obj.to_cls, ( DomainObjects.AGGREGATE, - DomainObjects.EVENT_SOURCED_AGGREGATE, DomainObjects.ENTITY, ), ) @@ -584,10 +581,7 @@ def _resolve_references(self): cls = params to_cls = self.fetch_element_cls_from_registry( cls.meta_.part_of, - ( - DomainObjects.AGGREGATE, - DomainObjects.EVENT_SOURCED_AGGREGATE, - ), + (DomainObjects.AGGREGATE,), ) cls.meta_.part_of = to_cls case _: @@ -810,12 +804,10 @@ def _validate_domain(self): def _assign_aggregate_clusters(self): """Assign Aggregate Clusters to all relevant elements""" from protean.core.aggregate import BaseAggregate - from protean.core.event_sourced_aggregate import BaseEventSourcedAggregate # Assign Aggregates and EventSourcedAggregates to their own cluster for element_type in [ DomainObjects.AGGREGATE, - DomainObjects.EVENT_SOURCED_AGGREGATE, ]: for _, element in self.registry._elements[element_type.value].items(): element.cls.meta_.aggregate_cluster = element.cls @@ -830,9 +822,7 @@ def _assign_aggregate_clusters(self): part_of = element.cls.meta_.part_of if part_of: # Traverse up the graph tree to find the root aggregate - while not issubclass( - part_of, (BaseAggregate, BaseEventSourcedAggregate) - ): + while not issubclass(part_of, BaseAggregate): part_of = part_of.meta_.part_of element.cls.meta_.aggregate_cluster = part_of @@ -978,10 +968,7 @@ def _setup_event_handlers(self): def _generate_fact_event_classes(self): """Generate FactEvent classes for all aggregates with `fact_events` enabled""" - for element_type in [ - DomainObjects.AGGREGATE, - DomainObjects.EVENT_SOURCED_AGGREGATE, - ]: + for element_type in [DomainObjects.AGGREGATE]: for _, element in self.registry._elements[element_type.value].items(): if element.cls.meta_.fact_events: event_cls = element_to_fact_event(element.cls) @@ -1029,13 +1016,6 @@ def event_handler(self, _cls=None, **kwargs): **kwargs, ) - def event_sourced_aggregate(self, _cls=None, **kwargs): - return self._domain_element( - DomainObjects.EVENT_SOURCED_AGGREGATE, - _cls=_cls, - **kwargs, - ) - def domain_service(self, _cls=None, **kwargs): return self._domain_element( DomainObjects.DOMAIN_SERVICE, diff --git a/src/protean/port/event_store.py b/src/protean/port/event_store.py index 586dcc1d..2d1e0b1c 100644 --- a/src/protean/port/event_store.py +++ b/src/protean/port/event_store.py @@ -2,7 +2,7 @@ from collections import deque from typing import Any, Dict, List, Optional, Type, Union -from protean import BaseCommand, BaseEvent, BaseEventSourcedAggregate +from protean import BaseAggregate, BaseCommand, BaseEvent from protean.fields import Identifier from protean.utils.mixins import Message @@ -104,8 +104,8 @@ def append(self, object: Union[BaseEvent, BaseCommand]) -> int: return position def load_aggregate( - self, part_of: Type[BaseEventSourcedAggregate], identifier: Identifier - ) -> Optional[BaseEventSourcedAggregate]: + self, part_of: Type[BaseAggregate], identifier: Identifier + ) -> Optional[BaseAggregate]: """Load an aggregate from underlying events. The first event is used to initialize the aggregate, after which each event is diff --git a/src/protean/utils/__init__.py b/src/protean/utils/__init__.py index b2b29712..9182cbda 100644 --- a/src/protean/utils/__init__.py +++ b/src/protean/utils/__init__.py @@ -118,7 +118,6 @@ class DomainObjects(Enum): COMMAND_HANDLER = "COMMAND_HANDLER" EVENT = "EVENT" EVENT_HANDLER = "EVENT_HANDLER" - EVENT_SOURCED_AGGREGATE = "EVENT_SOURCED_AGGREGATE" EVENT_SOURCED_REPOSITORY = "EVENT_SOURCED_REPOSITORY" DOMAIN_SERVICE = "DOMAIN_SERVICE" EMAIL = "EMAIL" diff --git a/tests/event_sourced_aggregates/test_raising_events_from_within_aggregates.py b/tests/event_sourced_aggregates/test_raising_events_from_within_aggregates.py index 16b937ac..76a39588 100644 --- a/tests/event_sourced_aggregates/test_raising_events_from_within_aggregates.py +++ b/tests/event_sourced_aggregates/test_raising_events_from_within_aggregates.py @@ -4,9 +4,8 @@ import pytest -from protean import BaseAggregate, BaseCommandHandler, BaseEvent, handle +from protean import BaseAggregate, BaseCommandHandler, BaseEvent, apply, handle from protean.core.command import BaseCommand -from protean.core.event_sourced_aggregate import apply from protean.fields import Identifier, String from protean.globals import current_domain diff --git a/tests/event_store/test_appending_aggregate_events.py b/tests/event_store/test_appending_aggregate_events.py index 408a95fc..62e54a30 100644 --- a/tests/event_store/test_appending_aggregate_events.py +++ b/tests/event_store/test_appending_aggregate_events.py @@ -4,8 +4,7 @@ import pytest -from protean import BaseAggregate, BaseEvent -from protean.core.event_sourced_aggregate import apply +from protean import BaseAggregate, BaseEvent, apply from protean.fields import String from protean.fields.basic import Identifier diff --git a/tests/test_registry.py b/tests/test_registry.py index 804e4982..31382ba5 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -96,7 +96,6 @@ def test_properties_method_returns_a_dictionary_of_all_protean_elements(): "emails": DomainObjects.EMAIL.value, "entities": DomainObjects.ENTITY.value, "event_handlers": DomainObjects.EVENT_HANDLER.value, - "event_sourced_aggregates": DomainObjects.EVENT_SOURCED_AGGREGATE.value, "event_sourced_repositories": DomainObjects.EVENT_SOURCED_REPOSITORY.value, "events": DomainObjects.EVENT.value, "models": DomainObjects.MODEL.value,