From a9ec7083b54e52591a7d2bb1c6807a76585d74eb Mon Sep 17 00:00:00 2001 From: Subhash Bhushan Date: Wed, 31 Jul 2024 22:10:30 -0700 Subject: [PATCH] Publish messages on UoW exit Messages that are pushed into brokers within a UoW are actually published only on UoW exit, adhering to transaction boundaries. --- src/protean/adapters/broker/__init__.py | 2 +- src/protean/adapters/broker/inline.py | 2 +- src/protean/core/unit_of_work.py | 8 ++-- src/protean/utils/mixins.py | 4 +- .../test_publish_within_uow.py} | 41 +++++++++++++------ ...test_publishing_and_retrieving_messages.py | 2 +- 6 files changed, 37 insertions(+), 22 deletions(-) rename tests/{adapters/broker/redis_broker/elements.py => broker/test_publish_within_uow.py} (50%) diff --git a/src/protean/adapters/broker/__init__.py b/src/protean/adapters/broker/__init__.py index 6805c736..d5f7774a 100644 --- a/src/protean/adapters/broker/__init__.py +++ b/src/protean/adapters/broker/__init__.py @@ -83,7 +83,7 @@ def publish(self, channel: str, message: dict) -> None: if current_uow: logger.debug(f"Recording message {message} in {current_uow} for dispatch") - current_uow.register_message(message) + current_uow.register_message(channel, message) else: logger.debug( f"Publishing message {message} to all brokers registered for channel {channel}" diff --git a/src/protean/adapters/broker/inline.py b/src/protean/adapters/broker/inline.py index 2ff647d5..a3368e3b 100644 --- a/src/protean/adapters/broker/inline.py +++ b/src/protean/adapters/broker/inline.py @@ -27,7 +27,7 @@ def _get_next(self, channel: str) -> dict | None: """Get next message in channel""" if self._messages[channel]: return self._messages[channel].pop(0) - return {} + return None def _data_reset(self) -> None: """Flush all data in broker instance""" diff --git a/src/protean/core/unit_of_work.py b/src/protean/core/unit_of_work.py index 71a3781e..e8bb5f63 100644 --- a/src/protean/core/unit_of_work.py +++ b/src/protean/core/unit_of_work.py @@ -70,9 +70,9 @@ def commit(self): # noqa: C901 # Push messages to all brokers # FIXME Send message to its designated broker? # FIXME Send messages through domain.brokers.publish? - for message in self._messages_to_dispatch: + for channel, message in self._messages_to_dispatch: for _, broker in self.domain.brokers.items(): - broker.publish(message) + broker.publish(channel, message) self._messages_to_dispatch = [] # Empty after dispatch events = [] @@ -161,5 +161,5 @@ def get_session(self, provider_name): else: return self._initialize_session(provider_name) - def register_message(self, message): # FIXME Add annotations - self._messages_to_dispatch.append(message) + def register_message(self, channel, message): # FIXME Add annotations + self._messages_to_dispatch.append((channel, message)) diff --git a/src/protean/utils/mixins.py b/src/protean/utils/mixins.py index 60647389..1b2e9cb2 100644 --- a/src/protean/utils/mixins.py +++ b/src/protean/utils/mixins.py @@ -4,7 +4,7 @@ import logging from collections import defaultdict from enum import Enum -from typing import Callable, Dict, Union +from typing import Callable, Dict, Type, Union from protean import fields from protean.core.command import BaseCommand @@ -128,7 +128,7 @@ def to_message(cls, message_object: Union[BaseEvent, BaseCommand]) -> Message: class handle: """Class decorator to mark handler methods in EventHandler and CommandHandler classes.""" - def __init__(self, target_cls: Union[BaseEvent, BaseCommand]) -> None: + def __init__(self, target_cls: Type[BaseEvent] | Type[BaseCommand]) -> None: self._target_cls = target_cls def __call__(self, fn: Callable) -> Callable: diff --git a/tests/adapters/broker/redis_broker/elements.py b/tests/broker/test_publish_within_uow.py similarity index 50% rename from tests/adapters/broker/redis_broker/elements.py rename to tests/broker/test_publish_within_uow.py index 58242942..7aef3129 100644 --- a/tests/adapters/broker/redis_broker/elements.py +++ b/tests/broker/test_publish_within_uow.py @@ -1,8 +1,9 @@ +import pytest + from protean.core.aggregate import BaseAggregate from protean.core.event import BaseEvent -from protean.core.subscriber import BaseSubscriber +from protean.core.unit_of_work import UnitOfWork from protean.fields import Auto, Integer, String -from protean.utils.globals import current_domain class Person(BaseAggregate): @@ -14,13 +15,13 @@ class Person(BaseAggregate): def add_newcomer(cls, person_dict): """Factory method to add a new Person to the system""" newcomer = Person( + id=person_dict["id"], first_name=person_dict["first_name"], last_name=person_dict["last_name"], age=person_dict["age"], ) - # Publish Event via the domain - current_domain.publish( + newcomer.raise_( PersonAdded( id=newcomer.id, first_name=newcomer.first_name, @@ -28,7 +29,6 @@ def add_newcomer(cls, person_dict): age=newcomer.age, ) ) - return newcomer @@ -39,13 +39,28 @@ class PersonAdded(BaseEvent): age = Integer(default=21) -class NotifySSOSubscriber(BaseSubscriber): - """Subscriber that notifies an external SSO system - that a new person was added into the system - """ +@pytest.fixture(autouse=True) +def register_elements(test_domain): + test_domain.register(Person) + test_domain.register(PersonAdded, part_of=Person) + test_domain.init(traverse=False) + + +def test_message_push_after_uow_exit(test_domain): + with UnitOfWork(): + person = Person.add_newcomer( + {"id": "1", "first_name": "John", "last_name": "Doe", "age": 25} + ) + + test_domain.repository_for(Person).add(person) + test_domain.publish("person_added", person._events[0].to_dict()) - def __call__(self, domain_event_dict): - print("Received Event: ", domain_event_dict) - print("Event class: ", self.meta_.event) + assert test_domain.brokers["default"].get_next("person_added") is None - print("Current domain: ", current_domain.name) + message = test_domain.brokers["default"].get_next("person_added") + assert message is not None + assert message["id"] == "1" + assert message["first_name"] == "John" + assert message["last_name"] == "Doe" + assert message["age"] == 25 + assert "_metadata" in message diff --git a/tests/broker/test_publishing_and_retrieving_messages.py b/tests/broker/test_publishing_and_retrieving_messages.py index aac61568..983dc094 100644 --- a/tests/broker/test_publishing_and_retrieving_messages.py +++ b/tests/broker/test_publishing_and_retrieving_messages.py @@ -26,7 +26,7 @@ def test_get_next_message(test_domain): # No more messages, should return an empty dict retrieved_message = test_domain.brokers["default"].get_next(channel) - assert retrieved_message == {} + assert retrieved_message is None def test_data_reset(test_domain):