Skip to content

Commit

Permalink
Allow publishing multiple events in a single call
Browse files Browse the repository at this point in the history
  • Loading branch information
subhashb committed Jun 10, 2024
1 parent 1b17e5d commit 5b24ed2
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 17 deletions.
38 changes: 21 additions & 17 deletions src/protean/domain/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -836,27 +836,31 @@ def view(self, _cls=None, **kwargs):
# Broker Functionality #
########################

def publish(self, event: BaseEvent) -> None:
def publish(self, events: Union[BaseEvent, List[BaseEvent]]) -> None:
"""Publish Events to all configured brokers.
Args:
event (BaseEvent): The Event object containing data to be pushed
events (BaseEvent): The Event object containing data to be pushed
"""
# Persist event in Message Store
self.event_store.store.append(event)

self.brokers.publish(event)

if self.config["event_processing"] == EventProcessing.SYNC.value:
# Consume events right-away
handler_classes = self.handlers_for(event)
for handler_cls in handler_classes:
handler_methods = (
handler_cls._handlers[fqn(event.__class__)]
or handler_cls._handlers["$any"]
)
if not isinstance(events, list):
events = [events]

for event in events:
# Persist event in Message Store
self.event_store.store.append(event)

self.brokers.publish(event)

if self.config["event_processing"] == EventProcessing.SYNC.value:
# Consume events right-away
handler_classes = self.handlers_for(event)
for handler_cls in handler_classes:
handler_methods = (
handler_cls._handlers[fqn(event.__class__)]
or handler_cls._handlers["$any"]
)

for handler_method in handler_methods:
handler_method(handler_cls(), event)
for handler_method in handler_methods:
handler_method(handler_cls(), event)

#####################
# Handling Commands #
Expand Down
25 changes: 25 additions & 0 deletions tests/test_brokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,31 @@ def test_that_event_is_persisted_on_publish(self, mocker, test_domain):
assert len(messages) == 1
messages[0].stream_name == "person-1234"

@pytest.mark.eventstore
def test_that_multiple_events_are_persisted_on_publish(self, mocker, test_domain):
test_domain.publish(
[
PersonAdded(
id="1234",
first_name="John",
last_name="Doe",
age=24,
),
PersonAdded(
id="1235",
first_name="Jane",
last_name="Doe",
age=25,
),
]
)

messages = test_domain.event_store.store.read("person")

assert len(messages) == 2
assert messages[0].stream_name == "person-1234"
assert messages[1].stream_name == "person-1235"


class TestBrokerSubscriberInitialization:
def test_that_registered_subscribers_are_initialized(self, test_domain):
Expand Down

0 comments on commit 5b24ed2

Please sign in to comment.