From 5fd65809639e9ac9a44248109ae3f43dd68592e5 Mon Sep 17 00:00:00 2001 From: Subhash Bhushan Date: Thu, 1 Aug 2024 08:38:34 -0700 Subject: [PATCH] Simplify Broker and Subscriber functionality (#450) Previously, brokers in Protean were the primary conduits for events. So, a lot of their functionality included automatic translation of events to messages and back, and handling Protean elements. This commit simplifies broker functionality and retains one primary responsibility: communicating with a broker (publish and receive). * `Broker.publish()` is used to push messages to a broker. * Subscribers connected to channels receive messages from brokers. * Publish messages on UoW exit: Messages pushed into brokers within a UoW are published only on UoW exit, adhering to transaction boundaries. * Add type annotations to the broker adapter base --- poetry.lock | 186 +----------------- pyproject.toml | 2 - src/protean/adapters/broker/__init__.py | 66 +++---- src/protean/adapters/broker/celery.py | 117 ----------- src/protean/adapters/broker/inline.py | 27 +-- src/protean/adapters/broker/redis.py | 11 +- src/protean/core/subscriber.py | 16 +- src/protean/core/unit_of_work.py | 13 +- src/protean/domain/__init__.py | 27 +-- src/protean/domain/config.py | 7 +- src/protean/port/broker.py | 90 +++++---- src/protean/template/copier.yml | 1 - src/protean/utils/__init__.py | 26 ++- src/protean/utils/mixins.py | 4 +- .../adapters/broker/celery_broker/__init__.py | 0 .../adapters/broker/celery_broker/conftest.py | 12 -- .../adapters/broker/celery_broker/domain.toml | 4 - .../adapters/broker/celery_broker/elements.py | 51 ----- .../broker/celery_broker/test_subscriber.py | 44 ----- tests/adapters/broker/celery_broker/tests.py | 37 ---- tests/adapters/broker/redis_broker/tests.py | 80 ++------ tests/broker/test_initialization.py | 85 ++++++++ tests/broker/test_initializing_subscribers.py | 31 +++ tests/broker/test_publish_to_all_brokers.py | 18 ++ .../test_publish_within_uow.py} | 41 ++-- ...test_publishing_and_retrieving_messages.py | 55 ++++++ tests/broker/test_sync_processing.py | 28 +++ .../test_inline_command_processing.py | 8 +- ...test_inline_event_processing_on_publish.py | 71 ------- tests/server/test_engine_run.py | 4 +- tests/test_brokers.py | 180 ----------------- tests/test_subscribers.py | 17 +- tests/workflows/test_event_flows.py | 10 +- 33 files changed, 425 insertions(+), 944 deletions(-) delete mode 100644 src/protean/adapters/broker/celery.py delete mode 100644 tests/adapters/broker/celery_broker/__init__.py delete mode 100644 tests/adapters/broker/celery_broker/conftest.py delete mode 100644 tests/adapters/broker/celery_broker/domain.toml delete mode 100644 tests/adapters/broker/celery_broker/elements.py delete mode 100644 tests/adapters/broker/celery_broker/test_subscriber.py delete mode 100644 tests/adapters/broker/celery_broker/tests.py create mode 100644 tests/broker/test_initialization.py create mode 100644 tests/broker/test_initializing_subscribers.py create mode 100644 tests/broker/test_publish_to_all_brokers.py rename tests/{adapters/broker/redis_broker/elements.py => broker/test_publish_within_uow.py} (50%) create mode 100644 tests/broker/test_publishing_and_retrieving_messages.py create mode 100644 tests/broker/test_sync_processing.py delete mode 100644 tests/event_store/test_inline_event_processing_on_publish.py delete mode 100644 tests/test_brokers.py diff --git a/poetry.lock b/poetry.lock index 61d7ce3f..077a7cae 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,19 +1,5 @@ # This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. -[[package]] -name = "amqp" -version = "5.2.0" -description = "Low-level AMQP client for Python (fork of amqplib)." -optional = true -python-versions = ">=3.6" -files = [ - {file = "amqp-5.2.0-py3-none-any.whl", hash = "sha256:827cb12fb0baa892aad844fd95258143bce4027fdac4fccddbc43330fd281637"}, - {file = "amqp-5.2.0.tar.gz", hash = "sha256:a1ecff425ad063ad42a486c902807d1482311481c8ad95a72694b2975e75f7fd"}, -] - -[package.dependencies] -vine = ">=5.0.0,<6.0.0" - [[package]] name = "annotated-types" version = "0.6.0" @@ -68,17 +54,6 @@ files = [ [package.extras] dev = ["freezegun (>=1.0,<2.0)", "pytest (>=6.0)", "pytest-cov"] -[[package]] -name = "billiard" -version = "3.6.4.0" -description = "Python multiprocessing fork with improvements and bugfixes" -optional = true -python-versions = "*" -files = [ - {file = "billiard-3.6.4.0-py3-none-any.whl", hash = "sha256:87103ea78fa6ab4d5c751c4909bcff74617d985de7fa8b672cf8618afd5a875b"}, - {file = "billiard-3.6.4.0.tar.gz", hash = "sha256:299de5a8da28a783d51b197d496bef4f1595dd023a93a4f59dde1886ae905547"}, -] - [[package]] name = "bleach" version = "6.1.0" @@ -130,61 +105,6 @@ test = ["filelock (>=3)", "pytest (>=6.2.4)", "pytest-cov (>=2.12)", "pytest-moc typing = ["importlib-metadata (>=5.1)", "mypy (>=1.5.0,<1.6.0)", "tomli", "typing-extensions (>=3.7.4.3)"] virtualenv = ["virtualenv (>=20.0.35)"] -[[package]] -name = "celery" -version = "5.2.7" -description = "Distributed Task Queue." -optional = true -python-versions = ">=3.7" -files = [ - {file = "celery-5.2.7-py3-none-any.whl", hash = "sha256:138420c020cd58d6707e6257b6beda91fd39af7afde5d36c6334d175302c0e14"}, - {file = "celery-5.2.7.tar.gz", hash = "sha256:fafbd82934d30f8a004f81e8f7a062e31413a23d444be8ee3326553915958c6d"}, -] - -[package.dependencies] -billiard = ">=3.6.4.0,<4.0" -click = ">=8.0.3,<9.0" -click-didyoumean = ">=0.0.3" -click-plugins = ">=1.1.1" -click-repl = ">=0.2.0" -kombu = ">=5.2.3,<6.0" -pytz = ">=2021.3" -redis = {version = ">=3.4.1,<4.0.0 || >4.0.0,<4.0.1 || >4.0.1", optional = true, markers = "extra == \"redis\""} -vine = ">=5.0.0,<6.0" - -[package.extras] -arangodb = ["pyArango (>=1.3.2)"] -auth = ["cryptography"] -azureblockblob = ["azure-storage-blob (==12.9.0)"] -brotli = ["brotli (>=1.0.0)", "brotlipy (>=0.7.0)"] -cassandra = ["cassandra-driver (<3.21.0)"] -consul = ["python-consul2"] -cosmosdbsql = ["pydocumentdb (==2.3.2)"] -couchbase = ["couchbase (>=3.0.0)"] -couchdb = ["pycouchdb"] -django = ["Django (>=1.11)"] -dynamodb = ["boto3 (>=1.9.178)"] -elasticsearch = ["elasticsearch"] -eventlet = ["eventlet (>=0.32.0)"] -gevent = ["gevent (>=1.5.0)"] -librabbitmq = ["librabbitmq (>=1.5.0)"] -memcache = ["pylibmc"] -mongodb = ["pymongo[srv] (>=3.11.1)"] -msgpack = ["msgpack"] -pymemcache = ["python-memcached"] -pyro = ["pyro4"] -pytest = ["pytest-celery"] -redis = ["redis (>=3.4.1,!=4.0.0,!=4.0.1)"] -s3 = ["boto3 (>=1.9.125)"] -slmq = ["softlayer-messaging (>=1.0.3)"] -solar = ["ephem"] -sqlalchemy = ["sqlalchemy"] -sqs = ["kombu[sqs]"] -tblib = ["tblib (>=1.3.0)", "tblib (>=1.5.0)"] -yaml = ["PyYAML (>=3.10)"] -zookeeper = ["kazoo (>=1.3.1)"] -zstd = ["zstandard"] - [[package]] name = "certifi" version = "2024.7.4" @@ -402,55 +322,6 @@ files = [ [package.dependencies] colorama = {version = "*", markers = "platform_system == \"Windows\""} -[[package]] -name = "click-didyoumean" -version = "0.3.0" -description = "Enables git-like *did-you-mean* feature in click" -optional = true -python-versions = ">=3.6.2,<4.0.0" -files = [ - {file = "click-didyoumean-0.3.0.tar.gz", hash = "sha256:f184f0d851d96b6d29297354ed981b7dd71df7ff500d82fa6d11f0856bee8035"}, - {file = "click_didyoumean-0.3.0-py3-none-any.whl", hash = "sha256:a0713dc7a1de3f06bc0df5a9567ad19ead2d3d5689b434768a6145bff77c0667"}, -] - -[package.dependencies] -click = ">=7" - -[[package]] -name = "click-plugins" -version = "1.1.1" -description = "An extension module for click to enable registering CLI commands via setuptools entry-points." -optional = true -python-versions = "*" -files = [ - {file = "click-plugins-1.1.1.tar.gz", hash = "sha256:46ab999744a9d831159c3411bb0c79346d94a444df9a3a3742e9ed63645f264b"}, - {file = "click_plugins-1.1.1-py2.py3-none-any.whl", hash = "sha256:5d262006d3222f5057fd81e1623d4443e41dcda5dc815c06b442aa3c02889fc8"}, -] - -[package.dependencies] -click = ">=4.0" - -[package.extras] -dev = ["coveralls", "pytest (>=3.6)", "pytest-cov", "wheel"] - -[[package]] -name = "click-repl" -version = "0.3.0" -description = "REPL plugin for Click" -optional = true -python-versions = ">=3.6" -files = [ - {file = "click-repl-0.3.0.tar.gz", hash = "sha256:17849c23dba3d667247dc4defe1757fff98694e90fe37474f3feebb69ced26a9"}, - {file = "click_repl-0.3.0-py3-none-any.whl", hash = "sha256:fb7e06deb8da8de86180a33a9da97ac316751c094c6899382da7feeeeb51b812"}, -] - -[package.dependencies] -click = ">=7.0" -prompt-toolkit = ">=3.0.36" - -[package.extras] -testing = ["pytest (>=7.2.1)", "pytest-cov (>=4.0.0)", "tox (>=4.4.3)"] - [[package]] name = "colorama" version = "0.4.6" @@ -1080,38 +951,6 @@ completion = ["shtab (>=1.1.0)"] docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (<7.2.5)", "sphinx (>=3.5)", "sphinx-lint"] testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-mypy", "pytest-ruff (>=0.2.1)"] -[[package]] -name = "kombu" -version = "5.3.5" -description = "Messaging library for Python." -optional = true -python-versions = ">=3.8" -files = [ - {file = "kombu-5.3.5-py3-none-any.whl", hash = "sha256:0eac1bbb464afe6fb0924b21bf79460416d25d8abc52546d4f16cad94f789488"}, - {file = "kombu-5.3.5.tar.gz", hash = "sha256:30e470f1a6b49c70dc6f6d13c3e4cc4e178aa6c469ceb6bcd55645385fc84b93"}, -] - -[package.dependencies] -amqp = ">=5.1.1,<6.0.0" -vine = "*" - -[package.extras] -azureservicebus = ["azure-servicebus (>=7.10.0)"] -azurestoragequeues = ["azure-identity (>=1.12.0)", "azure-storage-queue (>=12.6.0)"] -confluentkafka = ["confluent-kafka (>=2.2.0)"] -consul = ["python-consul2"] -librabbitmq = ["librabbitmq (>=2.0.0)"] -mongodb = ["pymongo (>=4.1.1)"] -msgpack = ["msgpack"] -pyro = ["pyro4"] -qpid = ["qpid-python (>=0.26)", "qpid-tools (>=0.26)"] -redis = ["redis (>=4.5.2,!=4.5.5,<6.0.0)"] -slmq = ["softlayer-messaging (>=1.0.3)"] -sqlalchemy = ["sqlalchemy (>=1.4.48,<2.1)"] -sqs = ["boto3 (>=1.26.143)", "pycurl (>=7.43.0.5)", "urllib3 (>=1.26.16)"] -yaml = ["PyYAML (>=3.10)"] -zookeeper = ["kazoo (>=2.8.0)"] - [[package]] name = "markdown" version = "3.6" @@ -1964,17 +1803,6 @@ files = [ {file = "python_http_client-3.3.7.tar.gz", hash = "sha256:bf841ee45262747e00dec7ee9971dfb8c7d83083f5713596488d67739170cea0"}, ] -[[package]] -name = "pytz" -version = "2024.1" -description = "World timezone definitions, modern and historical" -optional = true -python-versions = "*" -files = [ - {file = "pytz-2024.1-py2.py3-none-any.whl", hash = "sha256:328171f4e3623139da4983451950b28e95ac706e13f3f2630a879749e7a8b319"}, - {file = "pytz-2024.1.tar.gz", hash = "sha256:2a29735ea9c18baf14b448846bde5a48030ed267578472d8955cd0e7443a9812"}, -] - [[package]] name = "pywin32" version = "306" @@ -2685,17 +2513,6 @@ brotli = ["brotli (==1.0.9)", "brotli (>=1.0.9)", "brotlicffi (>=0.8.0)", "brotl secure = ["certifi", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "ipaddress", "pyOpenSSL (>=0.14)", "urllib3-secure-extra"] socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"] -[[package]] -name = "vine" -version = "5.1.0" -description = "Python promises." -optional = true -python-versions = ">=3.6" -files = [ - {file = "vine-5.1.0-py3-none-any.whl", hash = "sha256:40fdf3c48b2cfe1c38a49e9ae2da6fda88e4794c810050a728bd7413811fb1dc"}, - {file = "vine-5.1.0.tar.gz", hash = "sha256:8b62e981d35c41049211cf62a0a1242d8c1ee9bd15bb196ce38aefd6799e61e0"}, -] - [[package]] name = "virtualenv" version = "20.25.1" @@ -2812,7 +2629,6 @@ doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linke test = ["big-O", "jaraco.functools", "jaraco.itertools", "jaraco.test", "more-itertools", "pytest (>=6,!=8.1.*)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-ignore-flaky", "pytest-mypy", "pytest-ruff (>=0.2.1)"] [extras] -celery = ["celery"] elasticsearch = ["elasticsearch", "elasticsearch-dsl"] flask = ["flask"] message-db = ["message-db-py"] @@ -2824,4 +2640,4 @@ sqlite = ["sqlalchemy"] [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "52c7efabe6d47911cb47c4822b338bccf66510fb8c243e57369aafb3f6521740" +content-hash = "b7649bf7876f1d0ed6c8cf9a8c531e0a9c59beb63a7dd845ae68aecbdcfa87cf" diff --git a/pyproject.toml b/pyproject.toml index 2d79de76..ad6e56fe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -67,7 +67,6 @@ elasticsearch-dsl = {version = "~7.4.1", optional = true} redis = {version = "~5.0.7", optional = true} sqlalchemy = {version = "~2.0.30", optional = true} psycopg2 = {version = ">=2.9.9", optional = true} -celery = { version = "~5.2.7", extras = ["redis"], optional = true} flask = {version = ">=1.1.1", optional = true} sendgrid = {version = ">=6.1.3", optional = true} message-db-py = {version = ">=0.2.0", optional = true} @@ -77,7 +76,6 @@ elasticsearch = ["elasticsearch", "elasticsearch-dsl"] redis = ["redis"] postgresql = ["sqlalchemy", "psycopg2"] sqlite = ["sqlalchemy"] -celery = ["celery"] message-db = ["message-db-py"] flask = ["flask"] sendgrid = ["sendgrid"] diff --git a/src/protean/adapters/broker/__init__.py b/src/protean/adapters/broker/__init__.py index 02c721ad..ac028b2a 100644 --- a/src/protean/adapters/broker/__init__.py +++ b/src/protean/adapters/broker/__init__.py @@ -1,11 +1,16 @@ -import collections +from __future__ import annotations + +import collections.abc import importlib import logging +from typing import TYPE_CHECKING, Any, Iterator -from protean.core.event import BaseEvent from protean.exceptions import ConfigurationError +from protean.port.broker import BaseBroker from protean.utils.globals import current_uow -from protean.utils.mixins import Message + +if TYPE_CHECKING: + from protean.domain import Domain logger = logging.getLogger(__name__) @@ -13,35 +18,31 @@ BROKER_PROVIDERS = { "inline": "protean.adapters.InlineBroker", "redis": "protean.adapters.broker.redis.RedisBroker", - "celery": "protean.adapters.broker.celery.CeleryBroker", } -class Brokers(collections.abc.MutableMapping): - def __init__(self, domain): +class Brokers(collections.abc.MutableMapping[str, BaseBroker]): + def __init__(self, domain: "Domain"): self.domain = domain - self._brokers = None + self._brokers: dict[str, BaseBroker] = {} - def __getitem__(self, key): - return self._brokers[key] if self._brokers else None + def __getitem__(self, key: str) -> BaseBroker: + return self._brokers[key] - def __iter__(self): + def __iter__(self) -> Iterator[str]: return iter(self._brokers) if self._brokers else iter({}) - def __len__(self): + def __len__(self) -> int: return len(self._brokers) if self._brokers else 0 - def __setitem__(self, key, value): - if self._brokers is None: - self._brokers = {} - + def __setitem__(self, key: str, value: BaseBroker) -> None: self._brokers[key] = value - def __delitem__(self, key): + def __delitem__(self, key: str) -> None: if key in self._brokers: del self._brokers[key] - def _initialize(self): + def _initialize(self) -> None: """Read config file and initialize brokers""" configured_brokers = self.domain.config["brokers"] broker_objects = {} @@ -58,7 +59,9 @@ def _initialize(self): broker_cls = getattr( importlib.import_module(broker_module), broker_class ) - broker_objects[broker_name] = broker_cls(broker_name, self, conn_info) + broker_objects[broker_name] = broker_cls( + broker_name, self.domain, conn_info + ) else: raise ConfigurationError("Configure at least one broker in the domain") @@ -66,32 +69,29 @@ def _initialize(self): # Initialize subscribers for Brokers for _, subscriber_record in self.domain.registry.subscribers.items(): - subscriber = subscriber_record.cls - broker_name = subscriber.meta_.broker + subscriber_cls = subscriber_record.cls + broker_name = subscriber_cls.meta_.broker if broker_name not in self._brokers: raise ConfigurationError( f"Broker `{broker_name}` has not been configured." ) - self._brokers[broker_name].register(subscriber.meta_.event, subscriber) - - def publish(self, object: BaseEvent) -> None: - """Publish an object to all registered brokers""" - message = Message.to_message(object) + self._brokers[broker_name].register(subscriber_cls) - # Follow a naive strategy and dispatch event directly to message broker + def publish(self, channel: str, message: dict[str, Any]) -> None: + """Publish a message payload to all registered brokers""" + # Follow a naive strategy and dispatch message directly to message broker # If the operation is enclosed in a Unit of Work, delegate the responsibility # of publishing the message to the UoW if current_uow: - logger.debug( - f"Recording {object.__class__.__name__} " - f"with values {object.to_dict()} in {current_uow}" - ) - current_uow.register_message(message) + logger.debug(f"Recording message {message} in {current_uow} for dispatch") + + current_uow.register_message(channel, message) else: logger.debug( - f"Publishing {object.__class__.__name__} with values {object.to_dict()}" + f"Publishing message {message} to all brokers registered for channel {channel}" ) + for _, broker in self._brokers.items(): - broker.publish(message) + broker.publish(channel, message) diff --git a/src/protean/adapters/broker/celery.py b/src/protean/adapters/broker/celery.py deleted file mode 100644 index c62a31c5..00000000 --- a/src/protean/adapters/broker/celery.py +++ /dev/null @@ -1,117 +0,0 @@ -import logging -import logging.config -from collections.abc import Iterable -from typing import Dict - -from celery import Celery, Task -from kombu import Queue - -from protean.port.broker import BaseBroker -from protean.utils import ( - DomainObjects, - fully_qualified_name, -) -from protean.utils.inflection import underscore -from protean.utils.mixins import Message - -logger = logging.getLogger(__name__) - - -class ProteanTask(Task): - """The default base class for all Task classes constructed from Subscribers/Command Handlers.""" - - -class CeleryBroker(BaseBroker): - def __init__(self, name, domain, conn_info): - super().__init__(name, domain, conn_info) - self.celery_app = Celery( - broker=conn_info["URI"], - backend=conn_info["URI"], - ) - - self.celery_app.conf.update(enable_utc=True) - - # We construct queues dynamically when subscribers register - self.queues = [] - - def construct_and_register_celery_task(self, consumer_cls): - """Constructs a Celery-compliant Task class and also registers - Task with Celery App - - Arguments: - consumer_cls {BaseSubscriber} -- The Subscriber or Command Handler class - to be converted into a Celery Task - - Returns: - ProteanTask -- Decorated and Registered Celery Task class - """ - attrs = consumer_cls.__dict__ - custom_attrs = { - "run": attrs["__call__"], # `notify` is the method to run on event - "name": underscore( - fully_qualified_name(consumer_cls) - ), # `name` will be the same as the task's queue - } - attrs = {**attrs, **custom_attrs} - - # Construct `decorated_cls` dynamically from `ProteanTask`. - # `ProteanTask` acts as the base class for all celery tasks. - decorated_cls = type(consumer_cls.__name__ + "Task", (ProteanTask,), {**attrs}) - - # Register Task class with Celery app - decorated_cls_instance = self.celery_app.register_task(decorated_cls()) - - # Add to Queue so that workers pick it up automatically - self.queues.append(Queue(decorated_cls.name)) - - return decorated_cls_instance - - def register(self, initiator_cls, consumer_cls): - """Registers Events and Commands with Subscribers/Command Handlers - - Arguments: - initiator_cls {list} -- One or more Events or Commands - consumer_cls {Subscriber/CommandHandler} -- The consumer class connected to the Event or Command - """ - if not isinstance(initiator_cls, Iterable): - initiator_cls = [initiator_cls] - - decorated_cls_instance = self.construct_and_register_celery_task(consumer_cls) - - for initiator in initiator_cls: - if initiator.element_type == DomainObjects.EVENT: - self._subscribers[fully_qualified_name(initiator)].add( - decorated_cls_instance - ) - logger.debug( - f"Registered Subscriber {decorated_cls_instance.__class__.__name__} with queue " - "{self.celery_app.tasks} as Celery Task" - ) - else: - self._command_handlers[fully_qualified_name(initiator)] = ( - decorated_cls_instance - ) - - def publish(self, message: Message) -> None: - event_cls = self.domain.fetch_element_cls_from_registry( - message.type, (DomainObjects.EVENT,) - ) - for subscriber in self._subscribers[fully_qualified_name(event_cls)]: - if self.conn_info["IS_ASYNC"]: - subscriber.apply_async([message.data], queue=subscriber.name) - else: - subscriber.apply([message.data]) - - def get_next(self) -> Dict: - """Retrieve the next message to process from broker. - - Empty for Celery - """ - - def _data_reset(self) -> None: - """Flush all data in broker instance. - - Useful for clearing cache and running tests. - - Empty for Celery - """ diff --git a/src/protean/adapters/broker/inline.py b/src/protean/adapters/broker/inline.py index faee34fe..a3368e3b 100644 --- a/src/protean/adapters/broker/inline.py +++ b/src/protean/adapters/broker/inline.py @@ -1,31 +1,34 @@ +from collections import defaultdict from typing import TYPE_CHECKING, Dict from protean.port.broker import BaseBroker -from protean.utils import fully_qualified_name -from protean.utils.mixins import Message if TYPE_CHECKING: from protean.domain import Domain class InlineBroker(BaseBroker): - def __init__(self, name: str, domain: "Domain", conn_info: Dict[str, str]) -> None: + def __init__( + self, name: str, domain: "Domain", conn_info: Dict[str, str | bool] + ) -> None: super().__init__(name, domain, conn_info) # In case of `InlineBroker`, the `IS_ASYNC` value will always be `False`. conn_info["IS_ASYNC"] = False - def publish(self, message: Message) -> None: - initiator_obj = message.to_object() + # Initialize storage for messages + self._messages = defaultdict(list) - for subscriber in self._subscribers[ - fully_qualified_name(initiator_obj.__class__) - ]: - subscriber()(message.data) + def _publish(self, channel: str, message: dict) -> None: + """Publish a message dict to the channel""" + self._messages[channel].append(message) - def get_next(self) -> Dict: - """No-Op""" + def _get_next(self, channel: str) -> dict | None: + """Get next message in channel""" + if self._messages[channel]: + return self._messages[channel].pop(0) return None def _data_reset(self) -> None: - """No-Op""" + """Flush all data in broker instance""" + self._messages.clear() diff --git a/src/protean/adapters/broker/redis.py b/src/protean/adapters/broker/redis.py index a9943c4d..f4165be8 100644 --- a/src/protean/adapters/broker/redis.py +++ b/src/protean/adapters/broker/redis.py @@ -4,7 +4,6 @@ import redis from protean.port.broker import BaseBroker -from protean.utils.mixins import Message if TYPE_CHECKING: from protean.domain import Domain @@ -21,14 +20,14 @@ def __init__(self, name: str, domain: "Domain", conn_info: Dict) -> None: self.redis_instance = redis.Redis.from_url(conn_info["URI"]) - def publish(self, message: Message) -> None: + def _publish(self, channel: str, message: dict) -> None: # FIXME Accept configuration for database and list name - self.redis_instance.rpush("messages", json.dumps(message.to_dict())) + self.redis_instance.rpush(channel, json.dumps(message)) - def get_next(self) -> Message: - bytes_message = self.redis_instance.lpop("messages") + def _get_next(self, channel: str) -> dict | None: + bytes_message = self.redis_instance.lpop(channel) if bytes_message: - return Message(json.loads(bytes_message)) + return json.loads(bytes_message) return None diff --git a/src/protean/core/subscriber.py b/src/protean/core/subscriber.py index b1b092ea..22c9d827 100644 --- a/src/protean/core/subscriber.py +++ b/src/protean/core/subscriber.py @@ -1,12 +1,16 @@ +from __future__ import annotations + import logging from abc import abstractmethod -from typing import Any, Optional +from typing import TYPE_CHECKING, Type -from protean.core.event import BaseEvent from protean.exceptions import IncorrectUsageError, NotSupportedError from protean.utils import DomainObjects, derive_element_class from protean.utils.container import Element, OptionsMixin +if TYPE_CHECKING: + from protean.domain import Domain + logger = logging.getLogger(__name__) @@ -26,18 +30,18 @@ def __new__(cls, *args, **kwargs): @classmethod def _default_options(cls): - return [("broker", "default"), ("event", None)] + return [("broker", "default"), ("channel", None)] @abstractmethod - def __call__(self, event: BaseEvent) -> Optional[Any]: + def __call__(self, payload: dict) -> None: """Placeholder method for receiving notifications on event""" raise NotImplementedError -def subscriber_factory(element_cls, domain, **opts): +def subscriber_factory(element_cls: Type[Element], domain: "Domain", **opts): element_cls = derive_element_class(element_cls, BaseSubscriber, **opts) - if not element_cls.meta_.event: + if not element_cls.meta_.channel: raise IncorrectUsageError( f"Subscriber `{element_cls.__name__}` needs to be associated with an Event" ) diff --git a/src/protean/core/unit_of_work.py b/src/protean/core/unit_of_work.py index 2df32675..1e48473a 100644 --- a/src/protean/core/unit_of_work.py +++ b/src/protean/core/unit_of_work.py @@ -1,4 +1,5 @@ import logging +from typing import Any from protean.exceptions import ( ConfigurationError, @@ -6,7 +7,7 @@ InvalidOperationError, ValidationError, ) -from protean.utils import EventProcessing +from protean.utils import Processing from protean.utils.globals import _uow_context_stack, current_domain from protean.utils.reflection import id_field @@ -70,9 +71,9 @@ def commit(self): # noqa: C901 # Push messages to all brokers # FIXME Send message to its designated broker? # FIXME Send messages through domain.brokers.publish? - for message in self._messages_to_dispatch: + for channel, message in self._messages_to_dispatch: for _, broker in self.domain.brokers.items(): - broker.publish(message) + broker.publish(channel, message) self._messages_to_dispatch = [] # Empty after dispatch events = [] @@ -84,7 +85,7 @@ def commit(self): # noqa: C901 item._events = [] # Iteratively consume all events produced in this session - if current_domain.config["event_processing"] == EventProcessing.SYNC.value: + if current_domain.config["event_processing"] == Processing.SYNC.value: # Handover events to process instantly for _, event in events: handler_classes = current_domain.handlers_for(event) @@ -161,5 +162,5 @@ def get_session(self, provider_name): else: return self._initialize_session(provider_name) - def register_message(self, message): # FIXME Add annotations - self._messages_to_dispatch.append(message) + def register_message(self, channel: str, message: dict[str, Any]): + self._messages_to_dispatch.append((channel, message)) diff --git a/src/protean/domain/__init__.py b/src/protean/domain/__init__.py index 2fb4ca3d..ceff1cb1 100644 --- a/src/protean/domain/__init__.py +++ b/src/protean/domain/__init__.py @@ -31,9 +31,8 @@ from protean.fields import HasMany, HasOne, Reference, ValueObject from protean.fields import List as ProteanList from protean.utils import ( - CommandProcessing, DomainObjects, - EventProcessing, + Processing, fqn, ) from protean.utils.container import Element @@ -989,25 +988,9 @@ def view(self, _cls=None, **kwargs): # Broker Functionality # ######################## - def publish(self, events: Union[BaseEvent, List[BaseEvent]]) -> None: - """Publish Events to all configured brokers. - Args: - events (BaseEvent): The Event object containing data to be pushed - """ - if not isinstance(events, list): - events = [events] - - for event in events: - # Persist event in Message Store - self.event_store.store.append(event) - - self.brokers.publish(event) - - if self.config["event_processing"] == EventProcessing.SYNC.value: - # Consume events right-away - handler_classes = self.handlers_for(event) - for handler_cls in handler_classes: - handler_cls._handle(event) + def publish(self, channel: str, message: dict) -> None: + """Publish messages to all configured brokers.""" + self.brokers.publish(channel, message) ##################### # Handling Commands # @@ -1079,7 +1062,7 @@ def process(self, command: BaseCommand, asynchronous: bool = True) -> Optional[A if ( not asynchronous - or self.config["command_processing"] == CommandProcessing.SYNC.value + or self.config["command_processing"] == Processing.SYNC.value ): handler_class = self.command_handler_for(command) if handler_class: diff --git a/src/protean/domain/config.py b/src/protean/domain/config.py index b8f31209..d59740c8 100644 --- a/src/protean/domain/config.py +++ b/src/protean/domain/config.py @@ -5,7 +5,7 @@ import tomllib from protean.exceptions import ConfigurationError -from protean.utils import CommandProcessing, EventProcessing +from protean.utils import Processing logger = logging.getLogger(__name__) @@ -31,8 +31,9 @@ def _default_config(): "default": {"provider": "memory"}, "memory": {"provider": "memory"}, }, - "event_processing": EventProcessing.ASYNC.value, - "command_processing": CommandProcessing.ASYNC.value, + "event_processing": Processing.ASYNC.value, + "command_processing": Processing.ASYNC.value, + "message_processing": Processing.ASYNC.value, "event_store": { "provider": "memory", }, diff --git a/src/protean/port/broker.py b/src/protean/port/broker.py index 70fcabdf..c5b4b5a8 100644 --- a/src/protean/port/broker.py +++ b/src/protean/port/broker.py @@ -4,48 +4,73 @@ import logging.config from abc import ABCMeta, abstractmethod from collections import defaultdict -from collections.abc import Iterable -from typing import Any, Dict, Union +from typing import TYPE_CHECKING, Type -from protean.core.command import BaseCommand -from protean.core.command_handler import BaseCommandHandler -from protean.core.event import BaseEvent from protean.core.subscriber import BaseSubscriber -from protean.utils import DomainObjects, fully_qualified_name +from protean.utils import Processing + +if TYPE_CHECKING: + from protean.domain import Domain logger = logging.getLogger(__name__) class BaseBroker(metaclass=ABCMeta): - """This class outlines the base broker functions, - to be satisfied by all implementing brokers. + """This class outlines the base broker functions, to be satisfied by all implementing brokers. - It is also a marker interface for registering broker - classes with the domain""" + It is also a marker interface for registering broker classes with the domain""" # FIXME Replace with typing.Protocol def __init__( - self, name: str, domain: Any, conn_info: Dict[str, str] - ) -> None: # FIXME Any should be Domain + self, name: str, domain: "Domain", conn_info: dict[str, str | bool] + ) -> None: self.name = name self.domain = domain self.conn_info = conn_info self._subscribers = defaultdict(set) - self._command_handlers = {} + + def publish(self, channel: str, message: dict) -> None: + """Publish a message to the broker. + + Args: + channel (str): The channel to which the message should be published + message (dict): The message payload to be published + """ + self._publish(channel, message) + + if ( + self.domain.config["message_processing"] == Processing.SYNC.value + and self._subscribers[channel] + ): + for subscriber_cls in self._subscribers[channel]: + subscriber = subscriber_cls() + subscriber(message) @abstractmethod - def publish(self, message: Dict) -> None: - """Publish a message with Protean-compatible payload to the configured Message bus. + def _publish(self, channel: str, message: dict) -> None: + """Overidden method to publish a message with payload to the configured broker. Args: - message (Dict): Command or Event payload + channel (str): The channel to which the message should be published + message (dict): The message payload to be published """ + def get_next(self, channel: str) -> dict | None: + """Retrieve the next message to process from broker. + + Args: + channel (str): The channel from which to retrieve the message + + Returns: + dict: The message payload + """ + return self._get_next(channel) + @abstractmethod - def get_next(self) -> Dict: - """Retrieve the next message to process from broker.""" + def _get_next(self, channel: str) -> dict | None: + """Overridden method to retrieve the next message to process from broker.""" @abstractmethod def _data_reset(self) -> None: @@ -54,25 +79,16 @@ def _data_reset(self) -> None: Useful for clearing cache and running tests. """ - def register( - self, - initiator_cls: Union[BaseCommand, BaseEvent], - consumer_cls: Union[BaseCommandHandler, BaseSubscriber], - ) -> None: - """Registers Events and Commands with Subscribers/Command Handlers + def register(self, subscriber_cls: Type[BaseSubscriber]) -> None: + """Registers subscribers to brokers against their channels. Arguments: - initiator_cls {list} -- One or more Events or Commands - consumer_cls {Subscriber/CommandHandler} -- The consumer class connected to the Event or Command + subscriber_cls {Subscriber} -- The subscriber class connected to the channel """ - if not isinstance(initiator_cls, Iterable): - initiator_cls = [initiator_cls] - - for initiator in initiator_cls: - if initiator.element_type == DomainObjects.EVENT: - self._subscribers[fully_qualified_name(initiator)].add(consumer_cls) - logger.debug( - f"Registered Subscriber {consumer_cls.__name__} with broker {self.name}" - ) - else: - self._command_handlers[fully_qualified_name(initiator)] = consumer_cls + channel = subscriber_cls.meta_.channel + + self._subscribers[channel].add(subscriber_cls) + + logger.debug( + f"Broker {self.name}: Registered Subscriber {subscriber_cls.__name__} for channel {channel}" + ) diff --git a/src/protean/template/copier.yml b/src/protean/template/copier.yml index a476a0dc..d76d7389 100644 --- a/src/protean/template/copier.yml +++ b/src/protean/template/copier.yml @@ -36,7 +36,6 @@ broker: choices: Memory: memory Redis: redis - Celery: celery default: memory cache: diff --git a/src/protean/utils/__init__.py b/src/protean/utils/__init__.py index b1f9f56c..079660a0 100644 --- a/src/protean/utils/__init__.py +++ b/src/protean/utils/__init__.py @@ -33,12 +33,7 @@ class IdentityType(Enum): UUID = "uuid" -class EventProcessing(Enum): - SYNC = "sync" - ASYNC = "async" - - -class CommandProcessing(Enum): +class Processing(Enum): SYNC = "sync" ASYNC = "async" @@ -113,7 +108,11 @@ class DomainObjects(Enum): VIEW = "VIEW" -def derive_element_class(element_cls, base_cls, **opts) -> Element: +def derive_element_class( + element_cls: Type[Element] | Type[Any], + base_cls: Type[Element], + **opts: dict[str, str | bool], +) -> Type[Element]: from protean.utils.container import Options # Ensure options being passed in are known @@ -193,17 +192,16 @@ def generate_identity( __all__ = [ "Cache", - "CommandProcessing", - "Database", - "DomainObjects", - "EventProcessing", - "IdentityStrategy", - "IdentityType", - "TypeMatcher", "convert_str_values_to_list", + "Database", "derive_element_class", + "DomainObjects", "fully_qualified_name", "generate_identity", "get_version", + "IdentityStrategy", + "IdentityType", + "Processing", + "TypeMatcher", "utcnow_func", ] diff --git a/src/protean/utils/mixins.py b/src/protean/utils/mixins.py index 60647389..1b2e9cb2 100644 --- a/src/protean/utils/mixins.py +++ b/src/protean/utils/mixins.py @@ -4,7 +4,7 @@ import logging from collections import defaultdict from enum import Enum -from typing import Callable, Dict, Union +from typing import Callable, Dict, Type, Union from protean import fields from protean.core.command import BaseCommand @@ -128,7 +128,7 @@ def to_message(cls, message_object: Union[BaseEvent, BaseCommand]) -> Message: class handle: """Class decorator to mark handler methods in EventHandler and CommandHandler classes.""" - def __init__(self, target_cls: Union[BaseEvent, BaseCommand]) -> None: + def __init__(self, target_cls: Type[BaseEvent] | Type[BaseCommand]) -> None: self._target_cls = target_cls def __call__(self, fn: Callable) -> Callable: diff --git a/tests/adapters/broker/celery_broker/__init__.py b/tests/adapters/broker/celery_broker/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/adapters/broker/celery_broker/conftest.py b/tests/adapters/broker/celery_broker/conftest.py deleted file mode 100644 index 8357ca7f..00000000 --- a/tests/adapters/broker/celery_broker/conftest.py +++ /dev/null @@ -1,12 +0,0 @@ -import pytest - -from tests.shared import initialize_domain - - -@pytest.fixture(autouse=True) -def test_domain(): - domain = initialize_domain(__file__, "Celery Broker Tests") - domain.init(traverse=False) - - with domain.domain_context(): - yield domain diff --git a/tests/adapters/broker/celery_broker/domain.toml b/tests/adapters/broker/celery_broker/domain.toml deleted file mode 100644 index 21b1d72e..00000000 --- a/tests/adapters/broker/celery_broker/domain.toml +++ /dev/null @@ -1,4 +0,0 @@ -[brokers.default] -provider = "celery" -URI = "redis://127.0.0.1:6379/2" -IS_ASYNC = true \ No newline at end of file diff --git a/tests/adapters/broker/celery_broker/elements.py b/tests/adapters/broker/celery_broker/elements.py deleted file mode 100644 index 58242942..00000000 --- a/tests/adapters/broker/celery_broker/elements.py +++ /dev/null @@ -1,51 +0,0 @@ -from protean.core.aggregate import BaseAggregate -from protean.core.event import BaseEvent -from protean.core.subscriber import BaseSubscriber -from protean.fields import Auto, Integer, String -from protean.utils.globals import current_domain - - -class Person(BaseAggregate): - first_name = String(max_length=50, required=True) - last_name = String(max_length=50, required=True) - age = Integer(default=21) - - @classmethod - def add_newcomer(cls, person_dict): - """Factory method to add a new Person to the system""" - newcomer = Person( - first_name=person_dict["first_name"], - last_name=person_dict["last_name"], - age=person_dict["age"], - ) - - # Publish Event via the domain - current_domain.publish( - PersonAdded( - id=newcomer.id, - first_name=newcomer.first_name, - last_name=newcomer.last_name, - age=newcomer.age, - ) - ) - - return newcomer - - -class PersonAdded(BaseEvent): - id = Auto(identifier=True) - first_name = String(max_length=50, required=True) - last_name = String(max_length=50, required=True) - age = Integer(default=21) - - -class NotifySSOSubscriber(BaseSubscriber): - """Subscriber that notifies an external SSO system - that a new person was added into the system - """ - - def __call__(self, domain_event_dict): - print("Received Event: ", domain_event_dict) - print("Event class: ", self.meta_.event) - - print("Current domain: ", current_domain.name) diff --git a/tests/adapters/broker/celery_broker/test_subscriber.py b/tests/adapters/broker/celery_broker/test_subscriber.py deleted file mode 100644 index 87e02558..00000000 --- a/tests/adapters/broker/celery_broker/test_subscriber.py +++ /dev/null @@ -1,44 +0,0 @@ -import pytest -from celery import Task - -from protean.adapters.broker.celery import CeleryBroker, ProteanTask -from tests.adapters.broker.celery_broker.elements import ( - NotifySSOSubscriber, - Person, - PersonAdded, -) - - -class TestSubscriberNotifications: - @pytest.fixture(autouse=True) - def register(self, test_domain): - test_domain.register(Person) - test_domain.register(PersonAdded, part_of=Person) - test_domain.register(NotifySSOSubscriber, event=PersonAdded) - test_domain.init(traverse=False) - - @pytest.fixture - def broker(self, test_domain): - return test_domain.brokers["default"] - - @pytest.fixture - def decorated_task_obj(self, broker): - return broker.construct_and_register_celery_task(NotifySSOSubscriber) - - def test_that_broker_is_celery(self, broker): - assert isinstance(broker, CeleryBroker) - - def test_task_class_construction_and_registration(self, broker, decorated_task_obj): - assert decorated_task_obj is not None - assert isinstance(decorated_task_obj, ProteanTask) - assert isinstance(decorated_task_obj, Task) - - assert ( - decorated_task_obj.name - == "tests.adapters.broker.celery_broker.elements.notify_sso_subscriber" - ) - assert decorated_task_obj.name in broker.celery_app.tasks - - @pytest.mark.skip(reason="Yet to implement") - def test_queue_associated_with_subscriber(self): - pass diff --git a/tests/adapters/broker/celery_broker/tests.py b/tests/adapters/broker/celery_broker/tests.py deleted file mode 100644 index a11e0115..00000000 --- a/tests/adapters/broker/celery_broker/tests.py +++ /dev/null @@ -1,37 +0,0 @@ -import pytest -from mock import patch - -from protean.adapters.broker.celery import CeleryBroker -from protean.utils.mixins import Message -from tests.adapters.broker.celery_broker.elements import ( - NotifySSOSubscriber, - Person, - PersonAdded, -) - - -@pytest.mark.redis -class TestRedisConnection: - def test_that_configured_broker_is_celery_with_redis(self, test_domain): - assert "default" in test_domain.brokers - broker = test_domain.brokers["default"] - - assert isinstance(broker, CeleryBroker) - assert broker.conn_info["URI"] == "redis://127.0.0.1:6379/2" - assert broker.celery_app is not None - - -@pytest.mark.redis -class TestEventProcessing: - @pytest.fixture(autouse=True) - def register(self, test_domain): - test_domain.register(Person) - test_domain.register(PersonAdded, part_of=Person) - test_domain.register(NotifySSOSubscriber, event=PersonAdded) - test_domain.init(traverse=False) - - @patch.object(CeleryBroker, "publish") - def test_that_an_event_is_published_to_the_broker(self, mock): - Person.add_newcomer({"first_name": "John", "last_name": "Doe", "age": 21}) - mock.assert_called_once() - assert isinstance(mock.call_args.args[0], Message) diff --git a/tests/adapters/broker/redis_broker/tests.py b/tests/adapters/broker/redis_broker/tests.py index 703725dc..0429ce3c 100644 --- a/tests/adapters/broker/redis_broker/tests.py +++ b/tests/adapters/broker/redis_broker/tests.py @@ -4,23 +4,18 @@ import redis from protean.adapters.broker.redis import RedisBroker -from protean.utils.globals import current_domain - -from .elements import Person, PersonAdded @pytest.fixture(autouse=True) -def register_elements(test_domain): - test_domain.register(Person) - test_domain.register(PersonAdded, part_of=Person) +def init_domain(test_domain): test_domain.init(traverse=False) @pytest.mark.redis class TestRedisConnection: - def test_that_redis_is_the_configured_broker(self): - assert "default" in current_domain.brokers - broker = current_domain.brokers["default"] + def test_that_redis_is_the_configured_broker(self, test_domain): + assert "default" in test_domain.brokers + broker = test_domain.brokers["default"] assert isinstance(broker, RedisBroker) assert broker.conn_info["URI"] == "redis://127.0.0.1:6379/0" @@ -31,75 +26,36 @@ def test_that_redis_is_the_configured_broker(self): @pytest.mark.redis class TestPublishingToRedis: def test_event_message_structure(self, test_domain): - # Publish event - event = PersonAdded( - id="1234", - first_name="John", - last_name="Doe", - age=24, - ) - test_domain.publish(event) + channel = "test_channel" + message = {"key": "value"} + + test_domain.brokers["default"].publish(channel, message) # Retrieve with an independent Redis instance r = redis.Redis.from_url(test_domain.config["brokers"]["default"]["URI"]) - message = r.lpop("messages") + message = r.lpop(channel) assert message is not None # Verify Structure json_message = json.loads(message) - assert all( - key in json_message - for key in [ - "global_position", - "position", - "time", - "id", - "stream_name", - "type", - "data", - "metadata", - ] - ) - assert json_message["type"] == "RedisBrokerTests.PersonAdded.v1" - assert json_message["metadata"]["kind"] == "EVENT" + assert json_message == {"key": "value"} @pytest.mark.redis class TestReceivingFromRedis: def test_for_no_error_on_no_message(self, test_domain): - message = test_domain.brokers["default"].get_next() + message = test_domain.brokers["default"].get_next("test_channel") assert message is None def test_retrieving_an_event_message(self, test_domain): - # Publish event - event = PersonAdded( - id="1234", - first_name="John", - last_name="Doe", - age=24, - ) - test_domain.publish(event) - - # Retrieve event - message = test_domain.brokers["default"].get_next() + channel = "test_channel" + message = {"key": "value"} - # Verify Payload - assert message is not None - assert message.data["id"] == event.id - - def test_reconstructing_an_event_object_from_message(self, test_domain): - # Publish event - event = PersonAdded( - id="1234", - first_name="John", - last_name="Doe", - age=24, - ) - test_domain.publish(event) + test_domain.brokers["default"].publish(channel, message) # Retrieve message - message = test_domain.brokers["default"].get_next() + message = test_domain.brokers["default"].get_next(channel) - # Verify reconstructed event object - retrieved_event = message.to_object() - assert retrieved_event == event + # Verify Payload + assert message is not None + assert message == {"key": "value"} diff --git a/tests/broker/test_initialization.py b/tests/broker/test_initialization.py new file mode 100644 index 00000000..e4837862 --- /dev/null +++ b/tests/broker/test_initialization.py @@ -0,0 +1,85 @@ +import pytest + +from protean.adapters.broker.inline import InlineBroker +from protean.exceptions import ConfigurationError +from protean.port.broker import BaseBroker + + +class TestBrokerInitialization: + def test_that_base_broker_class_cannot_be_instantiated(self): + with pytest.raises(TypeError): + BaseBroker() + + def test_that_a_concrete_broker_can_be_initialized_successfully(self, test_domain): + broker = InlineBroker("dummy_name", test_domain, {}) + + assert broker is not None + + def test_that_domain_initializes_broker_from_config(self, test_domain): + assert len(list(test_domain.brokers)) == 1 + assert isinstance(list(test_domain.brokers.values())[0], InlineBroker) + + def test_that_domain_initializes_broker_before_iteration(self, test_domain): + brokers = [broker for broker in test_domain.brokers] + assert len(brokers) == 1 + + def test_that_brokers_are_not_initialized_again_before_get_op_if_initialized_already( + self, mocker, test_domain + ): + # Initialize brokers + len(test_domain.brokers) + + spy = mocker.spy(test_domain.brokers, "_initialize") + test_domain.brokers["default"] # # Calls `__getitem__`, Should not reinitialize + assert spy.call_count == 0 + + def test_that_brokers_are_not_initialized_again_before_set_if_initialized_already( + self, mocker, test_domain + ): + # Initialize brokers + len(test_domain.brokers) + + dup_broker = InlineBroker("duplicate broker", test_domain, {}) + + spy = mocker.spy(test_domain.brokers, "_initialize") + test_domain.brokers["dup"] = dup_broker # Should not reinitialize + assert spy.call_count == 0 + + def test_that_brokers_are_not_initialized_again_before_del_if_initialized_already( + self, mocker, test_domain + ): + len(test_domain.brokers) + + spy = mocker.spy(test_domain.brokers, "_initialize") + del test_domain.brokers["default"] + assert spy.call_count == 0 + + def test_that_brokers_can_be_registered_manually(self, test_domain): + duplicate_broker = InlineBroker("duplicate broker", test_domain, {}) + + test_domain.brokers["duplicate"] = duplicate_broker + assert len(test_domain.brokers) == 2 + + def test_default_broker_is_mandatory(self, test_domain): + test_domain.config["brokers"]["secondary"] = {"provider": "inline"} + del test_domain.config["brokers"]["default"] + + with pytest.raises(ConfigurationError) as exc: + test_domain.init(traverse=False) + + assert str(exc.value) == "You must define a 'default' broker" + + def test_at_least_one_broker_must_be_configured(self, test_domain): + del test_domain.config["brokers"]["default"] + + with pytest.raises(ConfigurationError) as exc: + test_domain.init(traverse=False) + + assert str(exc.value) == "Configure at least one broker in the domain" + + def test_deleting_unknown_brokers_is_safe(self, test_domain): + try: + del test_domain.brokers["imaginary"] + except Exception: + pytest.fail("Deleting an unknown broker should not raise an exception") + assert len(test_domain.brokers) == 1 diff --git a/tests/broker/test_initializing_subscribers.py b/tests/broker/test_initializing_subscribers.py new file mode 100644 index 00000000..a94d9321 --- /dev/null +++ b/tests/broker/test_initializing_subscribers.py @@ -0,0 +1,31 @@ +import pytest + +from protean.core.subscriber import BaseSubscriber +from protean.exceptions import ConfigurationError + + +class DummySubscriber(BaseSubscriber): + def __call__(self, data: dict): + print("Received data: ", data) + + +def test_that_registered_subscribers_are_initialized(test_domain): + test_domain.register(DummySubscriber, channel="person_added") + test_domain.init(traverse=False) + + assert "person_added" in test_domain.brokers["default"]._subscribers + assert ( + DummySubscriber in test_domain.brokers["default"]._subscribers["person_added"] + ) + + +def test_that_subscribers_with_unknown_brokers_cannot_be_initialized(test_domain): + test_domain.register(DummySubscriber, channel="person_added", broker="unknown") + + with pytest.raises(ConfigurationError) as exc: + test_domain.init(traverse=False) + + assert "Broker `unknown` has not been configured." in str(exc.value) + + # Reset the broker after test + DummySubscriber.meta_.broker = "default" diff --git a/tests/broker/test_publish_to_all_brokers.py b/tests/broker/test_publish_to_all_brokers.py new file mode 100644 index 00000000..53716f2a --- /dev/null +++ b/tests/broker/test_publish_to_all_brokers.py @@ -0,0 +1,18 @@ +import pytest + + +@pytest.fixture(autouse=True) +def setup(test_domain): + test_domain.config["brokers"]["secondary"] = {"provider": "inline"} + test_domain.init(traverse=False) + + +def test_publish_to_channel(test_domain): + channel = "test_channel" + message = {"foo": "bar"} + + test_domain.brokers.publish(channel, message) + + # Verify message is stored + assert test_domain.brokers["default"]._messages[channel] == [message] + assert test_domain.brokers["secondary"]._messages[channel] == [message] diff --git a/tests/adapters/broker/redis_broker/elements.py b/tests/broker/test_publish_within_uow.py similarity index 50% rename from tests/adapters/broker/redis_broker/elements.py rename to tests/broker/test_publish_within_uow.py index 58242942..7aef3129 100644 --- a/tests/adapters/broker/redis_broker/elements.py +++ b/tests/broker/test_publish_within_uow.py @@ -1,8 +1,9 @@ +import pytest + from protean.core.aggregate import BaseAggregate from protean.core.event import BaseEvent -from protean.core.subscriber import BaseSubscriber +from protean.core.unit_of_work import UnitOfWork from protean.fields import Auto, Integer, String -from protean.utils.globals import current_domain class Person(BaseAggregate): @@ -14,13 +15,13 @@ class Person(BaseAggregate): def add_newcomer(cls, person_dict): """Factory method to add a new Person to the system""" newcomer = Person( + id=person_dict["id"], first_name=person_dict["first_name"], last_name=person_dict["last_name"], age=person_dict["age"], ) - # Publish Event via the domain - current_domain.publish( + newcomer.raise_( PersonAdded( id=newcomer.id, first_name=newcomer.first_name, @@ -28,7 +29,6 @@ def add_newcomer(cls, person_dict): age=newcomer.age, ) ) - return newcomer @@ -39,13 +39,28 @@ class PersonAdded(BaseEvent): age = Integer(default=21) -class NotifySSOSubscriber(BaseSubscriber): - """Subscriber that notifies an external SSO system - that a new person was added into the system - """ +@pytest.fixture(autouse=True) +def register_elements(test_domain): + test_domain.register(Person) + test_domain.register(PersonAdded, part_of=Person) + test_domain.init(traverse=False) + + +def test_message_push_after_uow_exit(test_domain): + with UnitOfWork(): + person = Person.add_newcomer( + {"id": "1", "first_name": "John", "last_name": "Doe", "age": 25} + ) + + test_domain.repository_for(Person).add(person) + test_domain.publish("person_added", person._events[0].to_dict()) - def __call__(self, domain_event_dict): - print("Received Event: ", domain_event_dict) - print("Event class: ", self.meta_.event) + assert test_domain.brokers["default"].get_next("person_added") is None - print("Current domain: ", current_domain.name) + message = test_domain.brokers["default"].get_next("person_added") + assert message is not None + assert message["id"] == "1" + assert message["first_name"] == "John" + assert message["last_name"] == "Doe" + assert message["age"] == 25 + assert "_metadata" in message diff --git a/tests/broker/test_publishing_and_retrieving_messages.py b/tests/broker/test_publishing_and_retrieving_messages.py new file mode 100644 index 00000000..983dc094 --- /dev/null +++ b/tests/broker/test_publishing_and_retrieving_messages.py @@ -0,0 +1,55 @@ +def test_publish_to_channel(test_domain): + channel = "test_channel" + message = {"foo": "bar"} + + test_domain.brokers["default"].publish(channel, message) + + # Verify message is stored + assert test_domain.brokers["default"]._messages[channel] == [message] + + +def test_get_next_message(test_domain): + channel = "test_channel" + message1 = {"key1": "value1"} + message2 = {"key2": "value2"} + + test_domain.brokers["default"].publish(channel, message1) + test_domain.brokers["default"].publish(channel, message2) + + # Retrieve the first message + retrieved_message = test_domain.brokers["default"].get_next(channel) + assert retrieved_message == message1 + + # Retrieve the second message + retrieved_message = test_domain.brokers["default"].get_next(channel) + assert retrieved_message == message2 + + # No more messages, should return an empty dict + retrieved_message = test_domain.brokers["default"].get_next(channel) + assert retrieved_message is None + + +def test_data_reset(test_domain): + channel1 = "test_channel1" + channel2 = "test_channel2" + message1 = {"key1": "value1"} + message2 = {"key2": "value2"} + + test_domain.brokers["default"].publish(channel1, message1) + test_domain.brokers["default"].publish(channel2, message2) + + # Reset the broker data + test_domain.brokers["default"]._data_reset() + + # Verify all messages are cleared + assert test_domain.brokers["default"]._messages[channel1] == [] + assert test_domain.brokers["default"]._messages[channel2] == [] + assert test_domain.brokers["default"]._messages == { + "test_channel1": [], + "test_channel2": [], + } + + +def test_is_async_flag(test_domain): + # Verify that the IS_ASYNC flag is set to False + assert test_domain.brokers["default"].conn_info["IS_ASYNC"] is False diff --git a/tests/broker/test_sync_processing.py b/tests/broker/test_sync_processing.py new file mode 100644 index 00000000..21a04b7a --- /dev/null +++ b/tests/broker/test_sync_processing.py @@ -0,0 +1,28 @@ +from protean.core.subscriber import BaseSubscriber +from protean.utils import Processing + +counter = 0 + + +def count_up(): + global counter + counter += 1 + + +class DummySubscriber(BaseSubscriber): + def __call__(self, data: dict): + count_up() + + +def test_subscriber_sync_invocation(test_domain): + test_domain.config["message_processing"] = Processing.SYNC.value + test_domain.register(DummySubscriber, channel="test_channel") + test_domain.init(traverse=False) + + channel = "test_channel" + message = {"foo": "bar"} + + test_domain.brokers["default"].publish(channel, message) + + global counter + assert counter == 1 diff --git a/tests/command_handler/test_inline_command_processing.py b/tests/command_handler/test_inline_command_processing.py index a1e3289a..76d2e9b2 100644 --- a/tests/command_handler/test_inline_command_processing.py +++ b/tests/command_handler/test_inline_command_processing.py @@ -7,7 +7,7 @@ from protean.core.command_handler import BaseCommandHandler from protean.exceptions import ConfigurationError from protean.fields import Identifier, String -from protean.utils import CommandProcessing +from protean.utils import Processing from protean.utils.mixins import handle counter = 0 @@ -20,7 +20,7 @@ class User(BaseAggregate): class Register(BaseCommand): - user_id = Identifier() + user_id = Identifier(identifier=True) email = String() @@ -54,7 +54,7 @@ def test_that_command_can_be_processed_inline(test_domain): test_domain.register(UserCommandHandlers, part_of=User) test_domain.init(traverse=False) - assert test_domain.config["command_processing"] == CommandProcessing.SYNC.value + assert test_domain.config["command_processing"] == Processing.SYNC.value test_domain.process(Register(user_id=str(uuid4()), email="john.doe@gmail.com")) assert counter == 1 @@ -67,4 +67,4 @@ def test_that_command_is_persisted_in_message_store(test_domain): messages = test_domain.event_store.store.read("user:command") assert len(messages) == 1 - messages[0].stream_name == f"user:command-{identifier}" + assert messages[0].stream_name == f"test::user:command-{identifier}" diff --git a/tests/event_store/test_inline_event_processing_on_publish.py b/tests/event_store/test_inline_event_processing_on_publish.py deleted file mode 100644 index b2649ffd..00000000 --- a/tests/event_store/test_inline_event_processing_on_publish.py +++ /dev/null @@ -1,71 +0,0 @@ -""" -For tests related to inline processing of events raised in event-sourced aggregates, -check tests/unit_of_work/test_inline_event_processing.py -""" - -from __future__ import annotations - -from uuid import uuid4 - -import pytest - -from protean.core.aggregate import BaseAggregate -from protean.core.event import BaseEvent -from protean.core.event_handler import BaseEventHandler -from protean.fields import Identifier, String -from protean.utils.globals import current_domain -from protean.utils.mixins import handle - -counter = 0 - - -def count_up(): - global counter - counter += 1 - - -class User(BaseAggregate): - user_id = Identifier(identifier=True) - email = String() - name = String() - password_hash = String() - - -class Registered(BaseEvent): - user_id = Identifier() - email = String() - name = String() - password_hash = String() - - -class UserEventHandler(BaseEventHandler): - @handle(Registered) - def registered(self, _: Registered) -> None: - count_up() - - -@pytest.mark.eventstore -def test_inline_event_processing_on_publish_in_sync_mode(test_domain): - test_domain.register(User, is_event_sourced=True, stream_category="user") - test_domain.register(Registered, part_of=User) - test_domain.register(UserEventHandler, stream_category="test::user") - test_domain.init(traverse=False) - - user = User( - user_id=str(uuid4()), - email="john.doe@example.com", - name="John Doe", - password_hash="hash", - ) - user.raise_( - Registered( - user_id=user.user_id, - email=user.email, - name=user.name, - password_hash=user.password_hash, - ) - ) - current_domain.publish(user._events[0]) - - global counter - assert counter == 1 diff --git a/tests/server/test_engine_run.py b/tests/server/test_engine_run.py index b2f7c8ad..694c5c60 100644 --- a/tests/server/test_engine_run.py +++ b/tests/server/test_engine_run.py @@ -8,7 +8,7 @@ from protean.core.event_handler import BaseEventHandler from protean.fields import Identifier from protean.server.engine import Engine -from protean.utils import EventProcessing +from protean.utils import Processing from protean.utils.mixins import handle counter = 0 @@ -49,7 +49,7 @@ def auto_set_and_close_loop(): @pytest.fixture(autouse=True) def register_elements(test_domain): - test_domain.config["event_processing"] = EventProcessing.ASYNC.value + test_domain.config["event_processing"] = Processing.ASYNC.value test_domain.register(User, stream_category="authentication") test_domain.register(UserLoggedIn, part_of=User) test_domain.register(UserEventHandler, stream_category="authentication") diff --git a/tests/test_brokers.py b/tests/test_brokers.py deleted file mode 100644 index 857f4bd3..00000000 --- a/tests/test_brokers.py +++ /dev/null @@ -1,180 +0,0 @@ -import pytest - -from protean.adapters.broker.inline import InlineBroker -from protean.core.aggregate import BaseAggregate -from protean.core.command import BaseCommand -from protean.core.event import BaseEvent -from protean.core.subscriber import BaseSubscriber -from protean.exceptions import ConfigurationError -from protean.fields import Auto, Integer, String -from protean.port.broker import BaseBroker - - -class Person(BaseAggregate): - first_name = String(max_length=50, required=True) - last_name = String(max_length=50, required=True) - age = Integer(default=21) - - -class PersonAdded(BaseEvent): - id = Auto(identifier=True) - first_name = String(max_length=50, required=True) - last_name = String(max_length=50, required=True) - age = Integer(default=21) - - -class NotifySSOSubscriber(BaseSubscriber): - def __call__(self, domain_event_dict): - pass - - -class AddPersonCommand(BaseCommand): - first_name = String(max_length=50, required=True) - last_name = String(max_length=50, required=True) - age = Integer(default=21) - - -@pytest.fixture(autouse=True) -def register_elements(test_domain): - test_domain.register(Person) - test_domain.register(PersonAdded, part_of=Person) - test_domain.register(NotifySSOSubscriber, event=PersonAdded) - test_domain.init(traverse=False) - - -class TestBrokerInitialization: - def test_that_base_broker_class_cannot_be_instantiated(self): - with pytest.raises(TypeError): - BaseBroker() - - def test_that_a_concrete_broker_can_be_initialized_successfully(self, test_domain): - broker = InlineBroker("dummy_name", test_domain, {}) - - assert broker is not None - - def test_that_domain_initializes_broker_from_config(self, test_domain): - assert len(list(test_domain.brokers)) == 1 - assert isinstance(list(test_domain.brokers.values())[0], InlineBroker) - - def test_that_domain_initializes_broker_before_iteration(self, test_domain): - brokers = [broker for broker in test_domain.brokers] - assert len(brokers) == 1 - - def test_that_brokers_are_not_initialized_again_before_get_op_if_initialized_already( - self, mocker, test_domain - ): - # Initialize brokers - len(test_domain.brokers) - - spy = mocker.spy(test_domain.brokers, "_initialize") - test_domain.brokers["default"] # # Calls `__getitem__`, Should not reinitialize - assert spy.call_count == 0 - - def test_that_brokers_are_not_initialized_again_before_set_if_initialized_already( - self, mocker, test_domain - ): - # Initialize brokers - len(test_domain.brokers) - - dup_broker = InlineBroker("duplicate broker", test_domain, {}) - - spy = mocker.spy(test_domain.brokers, "_initialize") - test_domain.brokers["dup"] = dup_broker # Should not reinitialize - assert spy.call_count == 0 - - def test_that_brokers_are_not_initialized_again_before_del_if_initialized_already( - self, mocker, test_domain - ): - len(test_domain.brokers) - - spy = mocker.spy(test_domain.brokers, "_initialize") - del test_domain.brokers["default"] - assert spy.call_count == 0 - - def test_that_brokers_can_be_registered_manually(self, test_domain): - duplicate_broker = InlineBroker("duplicate broker", test_domain, {}) - - test_domain.brokers["duplicate"] = duplicate_broker - assert len(test_domain.brokers) == 2 - - -class TestEventPublish: - @pytest.mark.eventstore - def test_that_event_is_persisted_on_publish(self, mocker, test_domain): - person = Person(first_name="John", last_name="Doe", age=24) - person.raise_( - PersonAdded( - id=person.id, - first_name=person.first_name, - last_name=person.last_name, - age=person.age, - ) - ) - test_domain.publish(person._events[0]) - - messages = test_domain.event_store.store.read("test::person") - - assert len(messages) == 1 - messages[0].stream_name == "person-1234" - - @pytest.mark.eventstore - def test_that_multiple_events_are_persisted_on_publish(self, mocker, test_domain): - person1 = Person(id="1234", first_name="John", last_name="Doe", age=24) - person1.raise_( - PersonAdded( - id=person1.id, - first_name=person1.first_name, - last_name=person1.last_name, - age=person1.age, - ) - ) - person2 = Person(id="1235", first_name="Jane", last_name="Doe", age=23) - person2.raise_( - PersonAdded( - id=person2.id, - first_name=person2.first_name, - last_name=person2.last_name, - age=person2.age, - ) - ) - test_domain.publish( - [ - person1._events[0], - person2._events[0], - ] - ) - - messages = test_domain.event_store.store.read("test::person") - - assert len(messages) == 2 - assert messages[0].stream_name == "test::person-1234" - assert messages[1].stream_name == "test::person-1235" - - -class TestBrokerSubscriberInitialization: - def test_that_registered_subscribers_are_initialized(self, test_domain): - test_domain._initialize() - - assert ( - "tests.test_brokers.PersonAdded" - in test_domain.brokers["default"]._subscribers - ) - assert ( - NotifySSOSubscriber - in test_domain.brokers["default"]._subscribers[ - "tests.test_brokers.PersonAdded" - ] - ) - - def test_that_subscribers_with_unknown_brokers_cannot_be_initialized( - self, test_domain - ): - test_domain.register(NotifySSOSubscriber, event=PersonAdded, broker="unknown") - - with pytest.raises(ConfigurationError) as exc: - test_domain._initialize() - - assert "Broker `unknown` has not been configured." in str(exc.value) - - # Reset the broker after test - NotifySSOSubscriber.meta_.broker = "default" diff --git a/tests/test_subscribers.py b/tests/test_subscribers.py index fce842d3..b75538ac 100644 --- a/tests/test_subscribers.py +++ b/tests/test_subscribers.py @@ -1,26 +1,17 @@ import pytest -from protean.core.event import BaseEvent from protean.core.subscriber import BaseSubscriber from protean.exceptions import NotSupportedError -from protean.fields import Identifier, Integer, String from protean.utils import fully_qualified_name -class PersonAdded(BaseEvent): - id = Identifier(required=True) - first_name = String(max_length=50, required=True) - last_name = String(max_length=50, required=True) - age = Integer(default=21) - - class NotifySSOSubscriber(BaseSubscriber): """Subscriber that notifies an external SSO system that a new person was added into the system """ - def notify(self, event): - print("Received Event: ", event) + def notify(self, data: dict): + print("Received data: ", data) class TestSubscriberInitialization: @@ -35,7 +26,7 @@ def test_that_subscriber_can_be_instantiated(self, test_domain): class TestSubscriberRegistration: def test_that_domain_event_can_be_registered_with_domain(self, test_domain): - test_domain.register(NotifySSOSubscriber, event=PersonAdded) + test_domain.register(NotifySSOSubscriber, channel="person_added") assert ( fully_qualified_name(NotifySSOSubscriber) @@ -43,7 +34,7 @@ def test_that_domain_event_can_be_registered_with_domain(self, test_domain): ) def test_that_domain_event_can_be_registered_via_annotations(self, test_domain): - @test_domain.subscriber(event=PersonAdded) + @test_domain.subscriber(channel="person_added") class AnnotatedSubscriber: def special_method(self): pass diff --git a/tests/workflows/test_event_flows.py b/tests/workflows/test_event_flows.py index 5d7975da..644892d3 100644 --- a/tests/workflows/test_event_flows.py +++ b/tests/workflows/test_event_flows.py @@ -32,7 +32,7 @@ ValueObject, ) from protean.server import Engine -from protean.utils import CommandProcessing, EventProcessing +from protean.utils import Processing from protean.utils.globals import current_domain from protean.utils.mixins import handle @@ -169,8 +169,8 @@ def test_domain(): "provider": "message_db", "database_uri": "postgresql://message_store@localhost:5433/message_store", } - test_domain.config["command_processing"] = CommandProcessing.ASYNC.value - test_domain.config["event_processing"] = EventProcessing.ASYNC.value + test_domain.config["command_processing"] = Processing.ASYNC.value + test_domain.config["event_processing"] = Processing.ASYNC.value test_domain.register(Order) test_domain.register(OrderItem, part_of=Order) @@ -198,8 +198,8 @@ def shipment_domain(): "provider": "message_db", "database_uri": "postgresql://message_store@localhost:5433/message_store", } - shipment_domain.config["command_processing"] = CommandProcessing.ASYNC.value - shipment_domain.config["event_processing"] = EventProcessing.ASYNC.value + shipment_domain.config["command_processing"] = Processing.ASYNC.value + shipment_domain.config["event_processing"] = Processing.ASYNC.value shipment_domain.register(Shipment) shipment_domain.register(