Skip to content

Commit

Permalink
Simplify Event and Command class definitions
Browse files Browse the repository at this point in the history
There was a lot of duplicate code between Event and Command classes.
This commit unifies the common code in a base class called `BaseMessageType`
and inherits Event and Command element classes from it.
  • Loading branch information
subhashb committed Jul 18, 2024
1 parent d6374f1 commit efb2635
Show file tree
Hide file tree
Showing 13 changed files with 214 additions and 279 deletions.
4 changes: 2 additions & 2 deletions docs/guides/domain-definition/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ In [2]: renamed = UserRenamed(user_id=user.id, name="John Doe Jr.")
In [3]: renamed.name = "John Doe Sr."
...
IncorrectUsageError: {
'_event': [
'Event Objects are immutable and cannot be modified once created'
'_message': [
'Event/Command Objects are immutable and cannot be modified once created'
]
}
```
98 changes: 3 additions & 95 deletions src/protean/core/command.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
from protean.container import BaseContainer, OptionsMixin
from protean.core.event import Metadata
from protean.exceptions import (
IncorrectUsageError,
InvalidDataError,
NotSupportedError,
ValidationError,
)
from protean.fields import Field, ValueObject
from protean.fields.association import Association, Reference
from protean.globals import g
from protean.reflection import _ID_FIELD_NAME, declared_fields, fields
from protean.utils import DomainObjects, derive_element_class, fqn
from protean.utils.eventing import BaseMessageType, Metadata


class BaseCommand(BaseContainer, OptionsMixin):
class BaseCommand(BaseMessageType):
"""Base Command class that all commands should inherit from.
Core functionality associated with commands, like timestamping and authentication, are specified
Expand All @@ -27,38 +23,9 @@ def __new__(cls, *args, **kwargs):
raise NotSupportedError("BaseCommand cannot be instantiated")
return super().__new__(cls)

# Track Metadata
_metadata = ValueObject(Metadata, default=lambda: Metadata()) # pragma: no cover

def __init_subclass__(subclass) -> None:
super().__init_subclass__()

if not subclass.meta_.abstract:
subclass.__track_id_field()

# Use explicit version if specified, else default to "v1"
if not hasattr(subclass, "__version__"):
setattr(subclass, "__version__", "v1")

subclass.__validate_for_basic_field_types()

@classmethod
def __validate_for_basic_field_types(subclass):
for field_name, field_obj in fields(subclass).items():
# Value objects can hold all kinds of fields, except associations
if isinstance(field_obj, (Reference, Association)):
raise IncorrectUsageError(
{
"_event": [
f"Commands cannot have associations. "
f"Remove {field_name} ({field_obj.__class__.__name__}) from class {subclass.__name__}"
]
}
)

def __init__(self, *args, **kwargs):
try:
super().__init__(*args, finalize=False, **kwargs)
super().__init__(*args, **kwargs)

version = (
self.__class__.__version__
Expand Down Expand Up @@ -86,65 +53,6 @@ def __init__(self, *args, **kwargs):
except ValidationError as exception:
raise InvalidDataError(exception.messages)

@property
def payload(self):
"""Return the payload of the event."""
return {
field_name: field_obj.as_dict(getattr(self, field_name, None))
for field_name, field_obj in fields(self).items()
if field_name not in {"_metadata"}
}

def __setattr__(self, name, value):
if not hasattr(self, "_initialized") or not self._initialized:
return super().__setattr__(name, value)
else:
raise IncorrectUsageError(
{
"_command": [
"Command Objects are immutable and cannot be modified once created"
]
}
)

@classmethod
def _default_options(cls):
return [
("abstract", False),
("aggregate_cluster", None),
("part_of", None),
]

@classmethod
def __track_id_field(subclass):
"""Check if an identifier field has been associated with the command.
When an identifier is provided, its value is used to construct
unique stream name."""
try:
id_field = next(
field
for _, field in declared_fields(subclass).items()
if isinstance(field, (Field)) and field.identifier
)

setattr(subclass, _ID_FIELD_NAME, id_field.field_name)

except StopIteration:
# No Identity fields declared
pass

def to_dict(self):
"""Return data as a dictionary.
We need to override this method in Command, because `to_dict()` of `BaseContainer`
eliminates `_metadata`.
"""
return {
field_name: field_obj.as_dict(getattr(self, field_name, None))
for field_name, field_obj in fields(self).items()
}


def command_factory(element_cls, domain, **opts):
element_cls = derive_element_class(element_cls, BaseCommand, **opts)
Expand Down
163 changes: 3 additions & 160 deletions src/protean/core/event.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,17 @@
import json
import logging
from datetime import datetime, timezone

from protean.container import BaseContainer, OptionsMixin
from protean.core.value_object import BaseValueObject
from protean.exceptions import (
ConfigurationError,
IncorrectUsageError,
NotSupportedError,
)
from protean.fields import DateTime, Field, Integer, String, ValueObject
from protean.fields.association import Association, Reference
from protean.globals import g
from protean.reflection import _ID_FIELD_NAME, declared_fields, fields
from protean.utils import DomainObjects, derive_element_class, fqn
from protean.utils.eventing import BaseMessageType, Metadata

logger = logging.getLogger(__name__)


class Metadata(BaseValueObject):
# Unique identifier of the event/command
#
# FIXME Fix the format documentation
# Event Format is <domain-name>.<class-name>.<version>.<aggregate-id>.<aggregate-version>
# Command Format is <domain-name>.<class-name>.<version>
id = String()

# Type of the event
# Format is <domain-name>.<event-class-name>.<event-version>
type = String()

# Fully Qualified Name of the event/command
fqn = String(sanitize=False)

# Kind of the object
# Can be one of "EVENT", "COMMAND"
kind = String()

# Name of the stream to which the event/command is written
stream = String()

# Name of the stream that originated this event/command
origin_stream = String()

# Time of event generation
timestamp = DateTime(default=lambda: datetime.now(timezone.utc))

# Version of the event
# Can be overridden with `__version__` class attr in event/command class definition
version = String(default="v1")

# Applies to Events only
# Sequence of the event in the aggregate
# This is the version of the aggregate as it will be *after* persistence.
#
# For Event Sourced aggregates, sequence_id is the same as version (like "1").
# For Regular aggregates, sequence_id is `version`.`eventnumber` (like "0.1"). This is to
# ensure that the ordering is possible even when multiple events are raised as past of
# single update.
sequence_id = String()

# Hash of the payload
payload_hash = Integer()


class BaseEvent(BaseContainer, OptionsMixin): # FIXME Remove OptionsMixin
class BaseEvent(BaseMessageType):
"""Base Event class that all Events should inherit from.
Core functionality associated with Events, like timestamping, are specified
Expand All @@ -78,81 +25,8 @@ def __new__(cls, *args, **kwargs):
raise NotSupportedError("BaseEvent cannot be instantiated")
return super().__new__(cls)

# Track Metadata
_metadata = ValueObject(Metadata, default=lambda: Metadata()) # pragma: no cover

def __init_subclass__(subclass) -> None:
super().__init_subclass__()

if not subclass.meta_.abstract:
subclass.__track_id_field()

# Use explicit version if specified, else default to "v1"
if not hasattr(subclass, "__version__"):
setattr(subclass, "__version__", "v1")

subclass.__validate_for_basic_field_types()

@classmethod
def __validate_for_basic_field_types(subclass):
for field_name, field_obj in fields(subclass).items():
# Value objects can hold all kinds of fields, except associations
if isinstance(field_obj, (Reference, Association)):
raise IncorrectUsageError(
{
"_event": [
f"Events cannot have associations. "
f"Remove {field_name} ({field_obj.__class__.__name__}) from class {subclass.__name__}"
]
}
)

def __setattr__(self, name, value):
if not hasattr(self, "_initialized") or not self._initialized:
return super().__setattr__(name, value)
else:
raise IncorrectUsageError(
{
"_event": [
"Event Objects are immutable and cannot be modified once created"
]
}
)

@classmethod
def _default_options(cls):
return [
("abstract", False),
("aggregate_cluster", None),
("part_of", None),
]

@classmethod
def __track_id_field(subclass):
"""Check if an identifier field has been associated with the command.
When an identifier is provided, its value is used to construct
unique stream name."""
try:
id_field = next(
field
for _, field in declared_fields(subclass).items()
if isinstance(field, (Field)) and field.identifier
)

setattr(subclass, _ID_FIELD_NAME, id_field.field_name)

except StopIteration:
# No Identity fields declared
pass

def __init__(self, *args, **kwargs):
super().__init__(*args, finalize=False, **kwargs)

if not hasattr(self.__class__, "__type__"):
raise ConfigurationError(
f"Event `{self.__class__.__name__}` should be registered with a domain"
)
super().__init__(*args, **kwargs)

# Store the expected version temporarily for use during persistence
self._expected_version = kwargs.pop("_expected_version", -1)
Expand All @@ -178,37 +52,6 @@ def __init__(self, *args, **kwargs):
# Finally lock the event and make it immutable
self._initialized = True

@property
def payload(self):
"""Return the payload of the event."""
return {
field_name: field_obj.as_dict(getattr(self, field_name, None))
for field_name, field_obj in fields(self).items()
if field_name not in {"_metadata"}
}

def __eq__(self, other) -> bool:
"""Equivalence check based only on data."""
if type(other) is not type(self):
return False

return self._metadata.id == other._metadata.id

def __hash__(self) -> int:
"""Hash based on data."""
return hash(json.dumps(self.payload, sort_keys=True))

def to_dict(self):
"""Return data as a dictionary.
We need to override this method in Event, because `to_dict()` of `BaseContainer`
eliminates `_metadata`.
"""
return {
field_name: field_obj.as_dict(getattr(self, field_name, None))
for field_name, field_obj in fields(self).items()
}


def domain_event_factory(element_cls, domain, **opts):
element_cls = derive_element_class(element_cls, BaseEvent, **opts)
Expand Down
Loading

0 comments on commit efb2635

Please sign in to comment.