Skip to content

Commit

Permalink
Bugfixes to reflect correct versions in Events
Browse files Browse the repository at this point in the history
Also:
- Introduce `data_fields` to reflect `_version` in `to_dict()` output
- Avoid associating aggregates to events via the `apply()` method.
  All events need to be registered with `part_of` explicitly.
- Track current and next version better with `_next_version` field in aggregates.
- Support registering custom Event Sourced repositories, in preparation for
  accepting custom SQL to run on MessageDB store.
- Aggregate core concepts documentation
  • Loading branch information
subhashb committed Jul 1, 2024
1 parent 28a5cca commit 8dae00d
Show file tree
Hide file tree
Showing 37 changed files with 905 additions and 100 deletions.
74 changes: 74 additions & 0 deletions docs/core-concepts/building-blocks/aggregates.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,76 @@
# Aggregates

An aggregate is a cluster of domain objects that can be treated as a single
unit for data changes.

Each aggregate has a root entity, known as the aggregate root, which is
responsible for enforcing business rules and ensuring the consistency of
changes within the aggregate. In Protean, **aggregate** and **aggregate root**
are synonymous.

Aggregates help to maintain the integrity of the data by defining boundaries
within which invariants must be maintained.

## Facts

### Aggregates are black boxes. { data-toc-label="Black Boxes" }
The external world communicates with aggregates solely through their published
API. Aggregates, in turn, communicate with the external world through domain
events.

### Aggregates are versioned. { data-toc-label="Versioning" }
The version is a simple incrementing number. Every aggregate instance's version
starts at 0.

### Aggregates have concurrency control. { data-toc-label="Concurrency Control" }
Aggregates are persisted with optimistic concurrency. If the expected version
of the aggregate does not match the version in the database, the transaction
is aborted.

### Aggregates enclose business invariants. { data-toc-label="Invariants" }

Aggregates contain invariants that should be satisfied at all times - they
are checked before and after every change to the aggregate. Invariants can be
specified at the level of an aggregate's fields, the entire aggregate cluster,
individual entities, or domain services that operate on multiple aggregates.

## Object Graphs

Aggregates compose a graph of enclosed elements. The objects themselves can nest
other objects and so on infinitely, though it is recommended to not go beyond
2 levels.

### Aggregates can hold two types of objects - Entites and Value Objects. { data-toc-label="Types of Objects" }
Entities are objects with an identity. Value objects don't have identity; their
data defines their identity.

### Entities are accessible only via aggregates. { data-toc-label="Entity Access" }
Entities within aggregates are loaded and accessible only through the aggregate.
All changes to entities should be driven through the aggregates.

## Persistence

Data persistence and retrieval are always at the level of an aggregate.
They internally load and manage the objects within their cluster.

### Aggregates persist data with the help of Repositories. { data-toc-label="Repositories" }

Aggregates are persisted and retrieved with the help of repositories.
Repositories are collection-oriented - they mimic how a collection data type,
like list, dictionary and set, would work. Repositories can be augmented with
custom methods to perform business queries.

### Aggregates are transaction boundaries. { data-toc-label="Transactions" }

All changes to aggregates are performed within a transaction. This means that
all objects in the aggregates cluster are enclosed in a single transaction
during persistence. This also translates to mean that all objects within an
aggregate cluster are kep together in the same persistence store.

### Aggregates can enclose up to 500 entities. { data-toc-label="Limits" }

The object graph under an aggregate is loaded eagerly. The number of associations
under an aggregate are limited to 500. If you expect the number of entities to
exceed this limit, rethink your aggregate boundary. One way would be to split
the aggregate into multiple aggregates. Another would be to make the underlying
entity an aggregate by itself.
41 changes: 41 additions & 0 deletions docs/core-concepts/building-blocks/events.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Events

### Events allows different components to communicate with each other.

Within a domain or across, events can be used as a mechanism to implement
eventual consistency, in the same bounded context or across. This promotes
loose coupling by decoupling the producer (e.g., an aggregate that raises
an event) from the consumers (e.g., various components that handle the
event).

Such a design eliminates the need for two-phase commits (global
transactions) across bounded contexts, optimizing performance at the level
of each transaction.

### Events act as API contracts.

Events define a clear and consistent structure for data that is shared
between different components of the system. This promotes system-wide
interoperability and integration between components.

### Events help preserve context boundaries.

Events propagate information across bounded contexts, thus helping to
sync changes throughout the application domain. This allows each domain
to be modeled in the architecture pattern that is most appropriate for its
use case.

- Events should be named in past tense, because we observe domain events _after
the fact_. `StockDepleted` is a better choice than the imperative
`DepleteStock` as an event name.
- An event is associated with an aggregate or a stream, specified with
`part_of` or `stream` parameters to the decorator, as above. We will
dive deeper into these parameters in the Processing Events section.
<!-- FIXME Add link to events processing section -->
- Events are essentially Data Transfer Objects (DTO)- they can only hold
simple fields and Value Objects.
- Events should only contain information directly relevant to the event. A
receiver that needs more information should be listening to other pertinent
events and add read-only structures to its own state to take decisions later.
A receiver should not query the current state from the sender because the
sender's state could have already mutated.
174 changes: 130 additions & 44 deletions docs/guides/domain-definition/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,6 @@ occurrence or change in the domain. Events are raised by aggregates to signal
that something noteworthy has happened, allowing other parts of the system to
react - and sync - to these changes in a decoupled manner.

Events have a few primary functions:

1. **Events allows different components to communicate with each other.**

Within a domain or across, events can be used as a mechanism to implement
eventual consistency, in the same bounded context or across. This promotes
loose coupling by decoupling the producer (e.g., an aggregate that raises
an event) from the consumers (e.g., various components that handle the
event).

Such a design eliminates the need for two-phase commits (global
transactions) across bounded contexts, optimizing performance at the level
of each transaction.

2. **Events act as API contracts.**

Events define a clear and consistent structure for data that is shared
between different components of the system. This promotes system-wide
interoperability and integration between components.

3. **Events help preserve context boundaries.**

Events propagate information across bounded contexts, thus helping to
sync changes throughout the application domain. This allows each domain
to be modeled in the architecture pattern that is most appropriate for its
use case.

## Defining Events

Event names should be descriptive and convey the specific change or occurrence
Expand All @@ -54,22 +27,135 @@ Events are always connected to an Aggregate class, specified with the
`part_of` param in the decorator. An exception to this rule is when the
Event class has been marked _Abstract_.

## Key Facts

- Events should be named in past tense, because we observe domain events _after
the fact_. `StockDepleted` is a better choice than the imperative
`DepleteStock` as an event name.
- An event is associated with an aggregate or a stream, specified with
`part_of` or `stream` parameters to the decorator, as above. We will
dive deeper into these parameters in the Processing Events section.
<!-- FIXME Add link to events processing section -->
- Events are essentially Data Transfer Objects (DTO)- they can only hold
simple fields and Value Objects.
- Events should only contain information directly relevant to the event. A
receiver that needs more information should be listening to other pertinent
events and add read-only structures to its own state to take decisions later.
A receiver should not query the current state from the sender because the
sender's state could have already mutated.

## Event Structure

An event is made of three parts:

### Headers

#### `trace_id`

The `trace_id` is a unique identifier of UUID format, that connects all
processing originating from a request. Trace IDs provide a detailed view of
the request's journey through the system. It helps in understanding the
complete flow of a request, showing each service interaction, the time taken,
and where any delays occur.

### Metadata

An event's metadata provides additional context about the event.

#### `id`

The unique identifier of the event. The event ID is a structured string, of the
format **<domain>.<aggregate>.<version>.<aggregate-id>.<sequence_id>**.

#### `timestamp`

The timestamp of event generation.

#### `version`

The version of the event.

#### `sequence_id`

The sequence ID is the version of the aggregate when the event was generated,
along with the sequence number of the event within the update.

For example, if the aggregate was updated twice, the first update would have a
sequence ID of `1.1`, and the second update would have a sequence ID of `2.1`.
If the next update generated two events, then the sequence ID of the second
event would be `3.2`.

#### `payload_hash`

The hash of the event's payload.

## Payload

The payload is a dictionary of key-value pairs that convey the information
about the event.

The payload is made available as the data in the event. If
you want to extract just the payload, you can use the `payload` property
of the event.

```shell hl_lines="17 19-20"
In [1]: user = User(id="1", email="<EMAIL>", name="<NAME>")

In [2]: user.login()

In [3]: event = user._events[0]

In [4]: event
Out[4]: <UserLoggedIn: UserLoggedIn object ({'_metadata': {'id': '002.User.v1.1.0.1', 'timestamp': '2024-06-30 19:20:53.587542+00:00', 'version': 'v1', 'sequence_id': '0.1', 'payload_hash': 5473995227001335107}, 'user_id': '1'})>

In [5]: event.to_dict()
Out[5]:
{'_metadata': {'id': '002.User.v1.1.0.1',
'timestamp': '2024-06-30 19:20:53.587542+00:00',
'version': 'v1',
'sequence_id': '0.1',
'payload_hash': 5473995227001335107},
'user_id': '1'}

In [6]: event.payload
Out[6]: {'user_id': '1'}
```

## Versioning

Because events serve as API contracts of an aggregate with the rest of the
ecosystem, they are versioned to signal changes to contract.

By default, events have a version of **v1**.

You can specify a version with the `__version__` class attribute:

```python hl_lines="3"
@domain.event(part_of=User)
class UserActivated:
__version__ = "v2"

user_id = Identifier(required=True)
activated_at = DateTime(required=True)
```

The configured version is reflected in `version` and `id` attributes of the
generated event:

```python hl_lines="34 50 52 66 68"
{! docs_src/guides/domain-definition/events/002.py !}
```

## Fact Events

A fact event encloses the entire state of the aggregate at that specific point
in time. It contains all of the attributes and values necessary to completely
describe the fact in the context of your business. You can think of a fact
event similarly to how you may think of a row in a database: a complete set of
data pertaining to the row at that point in time.

Fact events enable a pattern known as **Event-carried State Transfer**, which is
one of the best ways to asynchronously distribute immutable state to all
consumers who need it. With fact events, consumers do not have to build up the
state themselves from multiple delta event types, which can be risky and
error-prone, especially as data schemas evolve and change over time. Instead,
they rely on the owning service to compute and produce a fully detailed fact
event.

Fact events are generated automatically by the framework with the
`fact_events=True` option in the `domain.aggregate` decorator.

Fact events are automatically generated by Protean. The event name is of the
format `<AggregateName>FactEvent`, and the stream name will be
`<snakecase_aggregate_name>-<fact>-<aggregate_-_id>`.

```python hl_lines="11 38-52"
{! docs_src/guides/domain-definition/events/003.py!}
```

## Immutability

Expand All @@ -89,4 +175,4 @@ IncorrectUsageError: {
'Event Objects are immutable and cannot be modified once created'
]
}
```
```
75 changes: 75 additions & 0 deletions docs_src/guides/domain-definition/events/002.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import json
from datetime import datetime, timezone

from protean import BaseEvent, Domain
from protean.fields import DateTime, Identifier, String

domain = Domain(__name__)


@domain.aggregate
class User:
id = Identifier(identifier=True)
email = String()
name = String()
status = String(choices=["INACTIVE", "ACTIVE", "ARCHIVED"], default="INACTIVE")

def login(self):
self.raise_(UserLoggedIn(user_id=self.id))

def activate(self):
self.status = "ACTIVE"
self.raise_(UserActivated(user_id=self.id))


@domain.event(part_of="User")
class UserLoggedIn(BaseEvent):
user_id = Identifier(identifier=True)


@domain.event(part_of="User")
class UserActivated:
__version__ = "v2"

user_id = Identifier(required=True)
activated_at = DateTime(required=True, default=lambda: datetime.now(timezone.utc))


domain.init(traverse=False)
with domain.domain_context():
user = User(id="1", email="<EMAIL>", name="<NAME>")

user.login()
print(json.dumps(user._events[0].to_dict(), indent=4))

""" Output:
{
"_metadata": {
"id": "__main__.User.v1.1.0.1",
"timestamp": "2024-06-30 16:29:31.312727+00:00",
"version": "v1",
"sequence_id": "0.1",
"payload_hash": -7433283101704735063
},
"user_id": "1"
}
"""

user.activate()
print(json.dumps(user._events[1].to_dict(), indent=4))

""" Output:
{
"_metadata": {
"id": "__main__.User.v2.1.0.2",
"timestamp": "2024-06-30 16:32:59.703965+00:00",
"version": "v2",
"sequence_id": "0.2",
"payload_hash": 7340170219237812824
},
"user_id": "1",
"activated_at": "2024-06-30 16:32:59.704063+00:00"
}
"""

print(json.dumps(user._events[1].payload, indent=4))
Loading

0 comments on commit 8dae00d

Please sign in to comment.