Skip to content

Commit

Permalink
Simplify Broker and Subscriber functionality
Browse files Browse the repository at this point in the history
Previously, brokers in Protean were the primary conduits for events. So a lot of
their functionality included automatic translation of events to messages and back,
and handling Protean elements.

This commit simplifies broker functionality, and retains one primary responsibility:
communicating with a broker (publish and receive).

`Broker.publish` is used to push messages to a broker.
Subcribers connected to channels receive messages from broker.
  • Loading branch information
subhashb committed Jul 31, 2024
1 parent 43ff1ee commit 2643969
Show file tree
Hide file tree
Showing 30 changed files with 331 additions and 912 deletions.
185 changes: 1 addition & 184 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ elasticsearch-dsl = {version = "~7.4.1", optional = true}
redis = {version = "~5.0.7", optional = true}
sqlalchemy = {version = "~2.0.30", optional = true}
psycopg2 = {version = ">=2.9.9", optional = true}
celery = { version = "~5.2.7", extras = ["redis"], optional = true}
flask = {version = ">=1.1.1", optional = true}
sendgrid = {version = ">=6.1.3", optional = true}
message-db-py = {version = ">=0.2.0", optional = true}
Expand All @@ -77,7 +76,6 @@ elasticsearch = ["elasticsearch", "elasticsearch-dsl"]
redis = ["redis"]
postgresql = ["sqlalchemy", "psycopg2"]
sqlite = ["sqlalchemy"]
celery = ["celery"]
message-db = ["message-db-py"]
flask = ["flask"]
sendgrid = ["sendgrid"]
Expand Down
36 changes: 16 additions & 20 deletions src/protean/adapters/broker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
import collections
import collections.abc
import importlib
import logging

from protean.core.event import BaseEvent
from protean.exceptions import ConfigurationError
from protean.utils.globals import current_uow
from protean.utils.mixins import Message

logger = logging.getLogger(__name__)


BROKER_PROVIDERS = {
"inline": "protean.adapters.InlineBroker",
"redis": "protean.adapters.broker.redis.RedisBroker",
"celery": "protean.adapters.broker.celery.CeleryBroker",
}


class Brokers(collections.abc.MutableMapping):
def __init__(self, domain):
self.domain = domain
self._brokers = None
self._brokers = {}

def __getitem__(self, key):
return self._brokers[key] if self._brokers else None
Expand Down Expand Up @@ -58,40 +55,39 @@ def _initialize(self):
broker_cls = getattr(
importlib.import_module(broker_module), broker_class
)
broker_objects[broker_name] = broker_cls(broker_name, self, conn_info)
broker_objects[broker_name] = broker_cls(
broker_name, self.domain, conn_info
)
else:
raise ConfigurationError("Configure at least one broker in the domain")

self._brokers = broker_objects

# Initialize subscribers for Brokers
for _, subscriber_record in self.domain.registry.subscribers.items():
subscriber = subscriber_record.cls
broker_name = subscriber.meta_.broker
subscriber_cls = subscriber_record.cls
broker_name = subscriber_cls.meta_.broker

if broker_name not in self._brokers:
raise ConfigurationError(
f"Broker `{broker_name}` has not been configured."
)

self._brokers[broker_name].register(subscriber.meta_.event, subscriber)

def publish(self, object: BaseEvent) -> None:
"""Publish an object to all registered brokers"""
message = Message.to_message(object)
self._brokers[broker_name].register(subscriber_cls)

# Follow a naive strategy and dispatch event directly to message broker
def publish(self, channel: str, message: dict) -> None:
"""Publish a message payload to all registered brokers"""
# Follow a naive strategy and dispatch message directly to message broker
# If the operation is enclosed in a Unit of Work, delegate the responsibility
# of publishing the message to the UoW
if current_uow:
logger.debug(
f"Recording {object.__class__.__name__} "
f"with values {object.to_dict()} in {current_uow}"
)
logger.debug(f"Recording message {message} in {current_uow} for dispatch")

current_uow.register_message(message)
else:
logger.debug(
f"Publishing {object.__class__.__name__} with values {object.to_dict()}"
f"Publishing message {message} to all brokers registered for channel {channel}"
)

for _, broker in self._brokers.items():
broker.publish(message)
broker.publish(channel, message)
Loading

0 comments on commit 2643969

Please sign in to comment.