Skip to content

Commit

Permalink
Add Event Handler Documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
subhashb committed Jun 19, 2024
1 parent 7486072 commit 56dd9e6
Show file tree
Hide file tree
Showing 22 changed files with 266 additions and 31 deletions.
51 changes: 51 additions & 0 deletions docs/adapters/repository/elasticsearch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Elasticsearch

To use Elasticsearch as a database provider, use the below configuration setting:

```toml
[databases.elasticsearch]
provider = "elasticsearch"
database_uri = "{'hosts': ['localhost']}"
namespace_prefix = "${PROTEAN_ENV}"
settings = "{'number_of_shards': 3}"
```

## Options

Additional options for finer control:

### namespace_prefix =IE${lasticsearch instance are prefixed with the specified stri}norexample, if the namespace prefix is `prod`, the index for aggregate
`Person` will be `prod-person`.

### NAMESPACE_SEPARATOR

Custom character to join namespace_prefix =n ${Default} yphen(`-`). For example, with `NAMESPACE_SEPARATOR` as `_` and namespace
prefix as `prod`, the index of aggregate `Person` will be `prod_person`.

### SETTINGS

Index settings passed as-is to Elasticsearch instance.

## Elasticsearch Model

Note that if you supply a custom Elasticsearch Model with an `Index` inner class, the options specified in the
inner class override those at the config level.

In the sample below, with the configuration settings specified above, the options at Aggregate level will be
overridden and the Elasticsearch Model will have the default index value `*` and number of shards as `1`.

```python
class Person(BaseAggregate):
name = String()
about = Text()

class Meta:
schema_name = "people"

class PeopleModel(ElasticsearchModel):
name = Text(fields={"raw": Keyword()})
about = Text()

class Index:
settings = {"number_of_shards": 1}
```
1 change: 0 additions & 1 deletion docs/guides/access-domain/index.md

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ domain events.
Command Handlers are defined with the `Domain.command_handler` decorator:

```python hl_lines="20-23 47-53"
{! docs_src/guides/access-domain/002.py !}
{! docs_src/guides/change-state/007.py !}
```

## Workflow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ to eventually make the rest of the system consistent.

A command is defined with the `Domain.command` decorator:

```python hl_lines="12-15"
{! docs_src/guides/access-domain/001.py !}
```python hl_lines="13-16"
{! docs_src/guides/change-state/006.py !}
```

A command is always associated with an aggregate class with the `part_of`
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
# Persisting State
# Changing State

## Persisting State

- About Repositories and Repository Pattern

- Different available repositories
- Repository Configuration
- Automatic generation of repositories

## Basic Structure
### Basic Structure

A repository provides three primary methods to interact with the persistence
store:

### **`add`** - Adds a new entity to the persistence store.
#### **`add`** - Adds a new entity to the persistence store.

### **`get`** - Retrieves an entity from the persistence store.
#### **`get`** - Retrieves an entity from the persistence store.

- Persisting aggregates
- Retreiving aggregates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Aggregates are saved into the configured database using `add` method of the
repository.

```python hl_lines="20"
{! docs_src/guides/persist-state/001.py !}
{! docs_src/guides/change-state/001.py !}
```

1. Identity, by default, is a string.
Expand Down Expand Up @@ -44,7 +44,7 @@ This means changes across the aggregate cluster are committed as a single
transaction (assuming the underlying database supports transactions, of course).

```python hl_lines="22-30 33"
{! docs_src/guides/persist-state/002.py !}
{! docs_src/guides/change-state/002.py !}
```

!!!note
Expand All @@ -57,7 +57,7 @@ The `add` method also publishes events to configured brokers upon successfully
persisting to the database.

```python hl_lines="15"
{! docs_src/guides/persist-state/003.py !}
{! docs_src/guides/change-state/003.py !}
```

```shell hl_lines="12-16 21-22"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ An aggregate can be retreived with the repository's `get` method, if you know
its identity:

```python hl_lines="16 20"
{! docs_src/guides/persist-state/001.py !}
{! docs_src/guides/change-state/001.py !}
```

1. Identity is explicitly set to **1**.
Expand All @@ -26,7 +26,7 @@ expected to enclose methods that represent business queries.
Defining a custom repository is straight-forward:

```python hl_lines="16"
{! docs_src/guides/persist-state/004.py !}
{! docs_src/guides/change-state/004.py !}
```

1. The repository is connected to `Person` aggregate through the `part_of`
Expand Down Expand Up @@ -66,6 +66,10 @@ Out[8]:
perform. `adults` is a good name for a method that fetches persons
over the age of 18.

!!!note
A repository can be connected to a specific persistence store by specifying
the `database` parameter.

## Data Acsess Objects (DAO)

You would have observed the query in the repository above was performed on a
Expand All @@ -88,7 +92,7 @@ For the purposes of this guide, assume that the following `Person` aggregates
exist in the database:

```python hl_lines="7-11"
{! docs_src/guides/persist-state/005.py !}
{! docs_src/guides/change-state/005.py !}
```

```shell
Expand Down
File renamed without changes.
68 changes: 66 additions & 2 deletions docs/guides/compose-a-domain/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,74 @@ needed.

## Adapter Configuration

### `database`
### `databases`

Database repositories are used to access the underlying data store for
persisting and retrieving domain objects.

They are defined in the `[databases]` section.

```toml hl_lines="1 4 7"
[databases.default]
provider = "memory"

[databases.memory]
provider = "memory"

[databases.sqlite]
provider = "sqlalchemy"
database = "sqlite"
database_uri = "sqlite:///test.db"
```

You can define as many databases as you need. The default database is identified
by the `default` key, and is used when you do not specify a database name when
accessing the domain.

The only other database defined by default is `memory`, which is the in-memory
stub database provider. You can name all other database definitions as
necessary.

The persistence store defined here is then specified in the `provider` key of
aggregates and entities to assign them a specific database.

```python hl_lines="1"
@domain.aggregate(provider="sqlite") # (1)
class User:
name = String(max_length=50)
email = String(max_length=254)
```

1. `sqlite` is the key of the database definition in the `[databases.sqlite]`
section.

### `cache`

### `broker`

### `event_store`
### `event_store`

## Custom Attributes

Custom attributes can be defined in toml under the `[custom]` section (or
`[tool.protean.custom]` if you are leveraging the `pyproject.toml` file).

Custom attributes are also made available on the domain object directly.

```toml hl_lines="5"
debug = true
testing = true

[custom]
FOO = "bar"
```

```shell hl_lines="3-4 6-7"
In [1]: domain = Domain(__file__)

In [2]: domain.config["custom"]["FOO"]
Out[2]: 'bar'

In [3]: domain.FOO
Out[3]: 'bar'
```
4 changes: 1 addition & 3 deletions docs/guides/domain-behavior/raising-events.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
# Propagating State
# Raising Events

An aggregate rarely exists in isolation - it's state changes often mean
that other parts of the system of the system have to sync up. In DDD, the
mechanism to accomplish this is through Domain Events.

## Raising Events

When an aggregate mutates, it also (preferably) raises one or more events
to record the state change in time, as well as propagate it within and beyond
the bounded context.
Expand Down
2 changes: 0 additions & 2 deletions docs/guides/persist-state/database-specificity.md

This file was deleted.

65 changes: 65 additions & 0 deletions docs/guides/propagate-state/event-handlers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Event Handlers

Event handlers consume events raised in an aggregate and help sync the state of
the aggregate with other aggregates and other systems. They are the preferred
mechanism to update multiple aggregates.

## Defining an Event Handler

Event Handlers are defined with the `Domain.event_handler` decorator. Below is
a simplified example of an Event Handler connected to `Inventory` aggregate
syncing stock levels corresponding to changes in the `Order` aggregate.

```python hl_lines="26-27 44"
{! docs_src/guides/propagate-state/001.py !}
```

1. `Order` aggregate fires `OrderShipped` event on book being shipped.

2. Event handler picks up the event and updates stock levels in `Inventory`
aggregate.

Simulating a hypothetical example, we can see that the stock levels were
decreased in response to the `OrderShipped` event.

```shell hl_lines="21"
In [1]: order = Order(book_id=1, quantity=10, total_amount=100)

In [2]: domain.repository_for(Order).add(order)
Out[2]: <Order: Order object (id: 62f8fa8d-2963-4539-bd21-860d3bab639e)>

In [3]: inventory = Inventory(book_id=1, in_stock=100)

In [4]: domain.repository_for(Inventory).add(inventory)
Out[4]: <Inventory: Inventory object (id: 9272d70f-b796-417d-8f30-e01302d9f1a9)>

In [5]: order.ship_order()

In [6]: domain.repository_for(Order).add(order)
Out[6]: <Order: Order object (id: 62f8fa8d-2963-4539-bd21-860d3bab639e)>

In [7]: stock = domain.repository_for(Inventory).get(inventory.id)

In [8]: stock.to_dict()
Out[8]: {
'book_id': '1',
'in_stock': 90,
'id': '9272d70f-b796-417d-8f30-e01302d9f1a9'
}

In [9]:
```

## Configuration Options

- **`part_of`**: The aggregate to which the event handler is connected.
- **`stream_name`**: The event handler listens to events on this stream.
The stream name defaults to the aggregate's stream. This option comes handy
when the event handler belongs to an aggregate and needs to listen to another
aggregate's events.
- **`source_stream`**: When specified, the event handler only consumes events
generated in response to events or commands from this original stream.
For example, `EmailNotifications` event handler listening to `OrderShipped`
events can be configured to generate a `NotificationSent` event only when the
`OrderShipped` event (in stream `orders`) is generated in response to a
`ShipOrder` (in stream `manage_order`) command.
1 change: 1 addition & 0 deletions docs/guides/propagate-state/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Propagate State
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
51 changes: 51 additions & 0 deletions docs_src/guides/propagate-state/001.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from protean import Domain, handle
from protean.fields import Identifier, Integer, String

domain = Domain(__file__, load_toml=False)
domain.config["event_processing"] = "sync"


@domain.event(part_of="Order")
class OrderShipped:
order_id = Identifier(required=True)
book_id = Identifier(required=True)
quantity = Integer(required=True)
total_amount = Integer(required=True)


@domain.aggregate
class Order:
book_id = Identifier(required=True)
quantity = Integer(required=True)
total_amount = Integer(required=True)
status = String(choices=["PENDING", "SHIPPED", "DELIVERED"], default="PENDING")

def ship_order(self):
self.status = "SHIPPED"

self.raise_( # (1)
OrderShipped(
order_id=self.id,
book_id=self.book_id,
quantity=self.quantity,
total_amount=self.total_amount,
)
)


@domain.aggregate
class Inventory:
book_id = Identifier(required=True)
in_stock = Integer(required=True)


@domain.event_handler(part_of=Inventory, stream_name="order")
class ManageInventory:
@handle(OrderShipped)
def reduce_stock_level(self, event: OrderShipped):
repo = domain.repository_for(Inventory)
inventory = repo._dao.find_by(book_id=event.book_id)

inventory.in_stock -= event.quantity # (2)

repo.add(inventory)
Loading

0 comments on commit 56dd9e6

Please sign in to comment.