Skip to content

Commit

Permalink
Simplify Broker and Subscriber functionality (#450)
Browse files Browse the repository at this point in the history
Previously, brokers in Protean were the primary conduits for events. So, a lot of
their functionality included automatic translation of events to messages and back,
and handling Protean elements.

This commit simplifies broker functionality and retains one primary responsibility: communicating with a broker (publish and receive).

* `Broker.publish()` is used to push messages to a broker.
* Subscribers connected to channels receive messages from brokers.
* Publish messages on UoW exit: Messages pushed into brokers within a UoW are published only
on UoW exit, adhering to transaction boundaries.
* Add type annotations to the broker adapter base
  • Loading branch information
subhashb authored Aug 1, 2024
1 parent 43ff1ee commit 5fd6580
Show file tree
Hide file tree
Showing 33 changed files with 425 additions and 944 deletions.
186 changes: 1 addition & 185 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ elasticsearch-dsl = {version = "~7.4.1", optional = true}
redis = {version = "~5.0.7", optional = true}
sqlalchemy = {version = "~2.0.30", optional = true}
psycopg2 = {version = ">=2.9.9", optional = true}
celery = { version = "~5.2.7", extras = ["redis"], optional = true}
flask = {version = ">=1.1.1", optional = true}
sendgrid = {version = ">=6.1.3", optional = true}
message-db-py = {version = ">=0.2.0", optional = true}
Expand All @@ -77,7 +76,6 @@ elasticsearch = ["elasticsearch", "elasticsearch-dsl"]
redis = ["redis"]
postgresql = ["sqlalchemy", "psycopg2"]
sqlite = ["sqlalchemy"]
celery = ["celery"]
message-db = ["message-db-py"]
flask = ["flask"]
sendgrid = ["sendgrid"]
Expand Down
66 changes: 33 additions & 33 deletions src/protean/adapters/broker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,48 @@
import collections
from __future__ import annotations

import collections.abc
import importlib
import logging
from typing import TYPE_CHECKING, Any, Iterator

from protean.core.event import BaseEvent
from protean.exceptions import ConfigurationError
from protean.port.broker import BaseBroker
from protean.utils.globals import current_uow
from protean.utils.mixins import Message

if TYPE_CHECKING:
from protean.domain import Domain

logger = logging.getLogger(__name__)


BROKER_PROVIDERS = {
"inline": "protean.adapters.InlineBroker",
"redis": "protean.adapters.broker.redis.RedisBroker",
"celery": "protean.adapters.broker.celery.CeleryBroker",
}


class Brokers(collections.abc.MutableMapping):
def __init__(self, domain):
class Brokers(collections.abc.MutableMapping[str, BaseBroker]):
def __init__(self, domain: "Domain"):
self.domain = domain
self._brokers = None
self._brokers: dict[str, BaseBroker] = {}

def __getitem__(self, key):
return self._brokers[key] if self._brokers else None
def __getitem__(self, key: str) -> BaseBroker:
return self._brokers[key]

def __iter__(self):
def __iter__(self) -> Iterator[str]:
return iter(self._brokers) if self._brokers else iter({})

def __len__(self):
def __len__(self) -> int:
return len(self._brokers) if self._brokers else 0

def __setitem__(self, key, value):
if self._brokers is None:
self._brokers = {}

def __setitem__(self, key: str, value: BaseBroker) -> None:
self._brokers[key] = value

def __delitem__(self, key):
def __delitem__(self, key: str) -> None:
if key in self._brokers:
del self._brokers[key]

def _initialize(self):
def _initialize(self) -> None:
"""Read config file and initialize brokers"""
configured_brokers = self.domain.config["brokers"]
broker_objects = {}
Expand All @@ -58,40 +59,39 @@ def _initialize(self):
broker_cls = getattr(
importlib.import_module(broker_module), broker_class
)
broker_objects[broker_name] = broker_cls(broker_name, self, conn_info)
broker_objects[broker_name] = broker_cls(
broker_name, self.domain, conn_info
)
else:
raise ConfigurationError("Configure at least one broker in the domain")

self._brokers = broker_objects

# Initialize subscribers for Brokers
for _, subscriber_record in self.domain.registry.subscribers.items():
subscriber = subscriber_record.cls
broker_name = subscriber.meta_.broker
subscriber_cls = subscriber_record.cls
broker_name = subscriber_cls.meta_.broker

if broker_name not in self._brokers:
raise ConfigurationError(
f"Broker `{broker_name}` has not been configured."
)

self._brokers[broker_name].register(subscriber.meta_.event, subscriber)

def publish(self, object: BaseEvent) -> None:
"""Publish an object to all registered brokers"""
message = Message.to_message(object)
self._brokers[broker_name].register(subscriber_cls)

# Follow a naive strategy and dispatch event directly to message broker
def publish(self, channel: str, message: dict[str, Any]) -> None:
"""Publish a message payload to all registered brokers"""
# Follow a naive strategy and dispatch message directly to message broker
# If the operation is enclosed in a Unit of Work, delegate the responsibility
# of publishing the message to the UoW
if current_uow:
logger.debug(
f"Recording {object.__class__.__name__} "
f"with values {object.to_dict()} in {current_uow}"
)
current_uow.register_message(message)
logger.debug(f"Recording message {message} in {current_uow} for dispatch")

current_uow.register_message(channel, message)
else:
logger.debug(
f"Publishing {object.__class__.__name__} with values {object.to_dict()}"
f"Publishing message {message} to all brokers registered for channel {channel}"
)

for _, broker in self._brokers.items():
broker.publish(message)
broker.publish(channel, message)
Loading

0 comments on commit 5fd6580

Please sign in to comment.