Skip to content

Commit

Permalink
Publish messages on UoW exit
Browse files Browse the repository at this point in the history
Messages that are pushed into brokers within a UoW are actually published only
on UoW exit, adhering to transaction boundaries.
  • Loading branch information
subhashb committed Aug 1, 2024
1 parent 981eef3 commit a9ec708
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 22 deletions.
2 changes: 1 addition & 1 deletion src/protean/adapters/broker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def publish(self, channel: str, message: dict) -> None:
if current_uow:
logger.debug(f"Recording message {message} in {current_uow} for dispatch")

current_uow.register_message(message)
current_uow.register_message(channel, message)
else:
logger.debug(
f"Publishing message {message} to all brokers registered for channel {channel}"
Expand Down
2 changes: 1 addition & 1 deletion src/protean/adapters/broker/inline.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def _get_next(self, channel: str) -> dict | None:
"""Get next message in channel"""
if self._messages[channel]:
return self._messages[channel].pop(0)
return {}
return None

def _data_reset(self) -> None:
"""Flush all data in broker instance"""
Expand Down
8 changes: 4 additions & 4 deletions src/protean/core/unit_of_work.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ def commit(self): # noqa: C901
# Push messages to all brokers
# FIXME Send message to its designated broker?
# FIXME Send messages through domain.brokers.publish?
for message in self._messages_to_dispatch:
for channel, message in self._messages_to_dispatch:
for _, broker in self.domain.brokers.items():
broker.publish(message)
broker.publish(channel, message)
self._messages_to_dispatch = [] # Empty after dispatch

events = []
Expand Down Expand Up @@ -161,5 +161,5 @@ def get_session(self, provider_name):
else:
return self._initialize_session(provider_name)

def register_message(self, message): # FIXME Add annotations
self._messages_to_dispatch.append(message)
def register_message(self, channel, message): # FIXME Add annotations
self._messages_to_dispatch.append((channel, message))
4 changes: 2 additions & 2 deletions src/protean/utils/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
from collections import defaultdict
from enum import Enum
from typing import Callable, Dict, Union
from typing import Callable, Dict, Type, Union

from protean import fields
from protean.core.command import BaseCommand
Expand Down Expand Up @@ -128,7 +128,7 @@ def to_message(cls, message_object: Union[BaseEvent, BaseCommand]) -> Message:
class handle:
"""Class decorator to mark handler methods in EventHandler and CommandHandler classes."""

def __init__(self, target_cls: Union[BaseEvent, BaseCommand]) -> None:
def __init__(self, target_cls: Type[BaseEvent] | Type[BaseCommand]) -> None:
self._target_cls = target_cls

def __call__(self, fn: Callable) -> Callable:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import pytest

from protean.core.aggregate import BaseAggregate
from protean.core.event import BaseEvent
from protean.core.subscriber import BaseSubscriber
from protean.core.unit_of_work import UnitOfWork
from protean.fields import Auto, Integer, String
from protean.utils.globals import current_domain


class Person(BaseAggregate):
Expand All @@ -14,21 +15,20 @@ class Person(BaseAggregate):
def add_newcomer(cls, person_dict):
"""Factory method to add a new Person to the system"""
newcomer = Person(
id=person_dict["id"],
first_name=person_dict["first_name"],
last_name=person_dict["last_name"],
age=person_dict["age"],
)

# Publish Event via the domain
current_domain.publish(
newcomer.raise_(
PersonAdded(
id=newcomer.id,
first_name=newcomer.first_name,
last_name=newcomer.last_name,
age=newcomer.age,
)
)

return newcomer


Expand All @@ -39,13 +39,28 @@ class PersonAdded(BaseEvent):
age = Integer(default=21)


class NotifySSOSubscriber(BaseSubscriber):
"""Subscriber that notifies an external SSO system
that a new person was added into the system
"""
@pytest.fixture(autouse=True)
def register_elements(test_domain):
test_domain.register(Person)
test_domain.register(PersonAdded, part_of=Person)
test_domain.init(traverse=False)


def test_message_push_after_uow_exit(test_domain):
with UnitOfWork():
person = Person.add_newcomer(
{"id": "1", "first_name": "John", "last_name": "Doe", "age": 25}
)

test_domain.repository_for(Person).add(person)
test_domain.publish("person_added", person._events[0].to_dict())

def __call__(self, domain_event_dict):
print("Received Event: ", domain_event_dict)
print("Event class: ", self.meta_.event)
assert test_domain.brokers["default"].get_next("person_added") is None

print("Current domain: ", current_domain.name)
message = test_domain.brokers["default"].get_next("person_added")
assert message is not None
assert message["id"] == "1"
assert message["first_name"] == "John"
assert message["last_name"] == "Doe"
assert message["age"] == 25
assert "_metadata" in message
2 changes: 1 addition & 1 deletion tests/broker/test_publishing_and_retrieving_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def test_get_next_message(test_domain):

# No more messages, should return an empty dict
retrieved_message = test_domain.brokers["default"].get_next(channel)
assert retrieved_message == {}
assert retrieved_message is None


def test_data_reset(test_domain):
Expand Down

0 comments on commit a9ec708

Please sign in to comment.