From 5b24ed209ef5eb8a2a3fb1364f1e0d0726074eab Mon Sep 17 00:00:00 2001 From: Subhash Bhushan Date: Mon, 10 Jun 2024 15:40:54 -0700 Subject: [PATCH] Allow publishing multiple events in a single call --- src/protean/domain/__init__.py | 38 +++++++++++++++++++--------------- tests/test_brokers.py | 25 ++++++++++++++++++++++ 2 files changed, 46 insertions(+), 17 deletions(-) diff --git a/src/protean/domain/__init__.py b/src/protean/domain/__init__.py index ad4ac589..3c302ea0 100644 --- a/src/protean/domain/__init__.py +++ b/src/protean/domain/__init__.py @@ -836,27 +836,31 @@ def view(self, _cls=None, **kwargs): # Broker Functionality # ######################## - def publish(self, event: BaseEvent) -> None: + def publish(self, events: Union[BaseEvent, List[BaseEvent]]) -> None: """Publish Events to all configured brokers. Args: - event (BaseEvent): The Event object containing data to be pushed + events (BaseEvent): The Event object containing data to be pushed """ - # Persist event in Message Store - self.event_store.store.append(event) - - self.brokers.publish(event) - - if self.config["event_processing"] == EventProcessing.SYNC.value: - # Consume events right-away - handler_classes = self.handlers_for(event) - for handler_cls in handler_classes: - handler_methods = ( - handler_cls._handlers[fqn(event.__class__)] - or handler_cls._handlers["$any"] - ) + if not isinstance(events, list): + events = [events] + + for event in events: + # Persist event in Message Store + self.event_store.store.append(event) + + self.brokers.publish(event) + + if self.config["event_processing"] == EventProcessing.SYNC.value: + # Consume events right-away + handler_classes = self.handlers_for(event) + for handler_cls in handler_classes: + handler_methods = ( + handler_cls._handlers[fqn(event.__class__)] + or handler_cls._handlers["$any"] + ) - for handler_method in handler_methods: - handler_method(handler_cls(), event) + for handler_method in handler_methods: + handler_method(handler_cls(), event) ##################### # Handling Commands # diff --git a/tests/test_brokers.py b/tests/test_brokers.py index d3671640..11b6b391 100644 --- a/tests/test_brokers.py +++ b/tests/test_brokers.py @@ -117,6 +117,31 @@ def test_that_event_is_persisted_on_publish(self, mocker, test_domain): assert len(messages) == 1 messages[0].stream_name == "person-1234" + @pytest.mark.eventstore + def test_that_multiple_events_are_persisted_on_publish(self, mocker, test_domain): + test_domain.publish( + [ + PersonAdded( + id="1234", + first_name="John", + last_name="Doe", + age=24, + ), + PersonAdded( + id="1235", + first_name="Jane", + last_name="Doe", + age=25, + ), + ] + ) + + messages = test_domain.event_store.store.read("person") + + assert len(messages) == 2 + assert messages[0].stream_name == "person-1234" + assert messages[1].stream_name == "person-1235" + class TestBrokerSubscriberInitialization: def test_that_registered_subscribers_are_initialized(self, test_domain):