diff --git a/setup.py b/setup.py index 20c8078e..243c06d8 100644 --- a/setup.py +++ b/setup.py @@ -70,6 +70,7 @@ def read(*names, **kwargs): 'psycopg2>=2.8.4', 'python-dateutil>=2.8.1', 'rq>=1.3.0', + 'celery[redis]>=4.4.2', 'sendgrid>=6.1.3', 'sqlalchemy>=1.3.15', 'werkzeug>=1.0.0', diff --git a/src/protean/core/broker/base.py b/src/protean/core/broker/base.py index 8216d81c..3e61de7a 100644 --- a/src/protean/core/broker/base.py +++ b/src/protean/core/broker/base.py @@ -1,4 +1,7 @@ # Standard Library Imports +import logging +import logging.config + from abc import abstractmethod from collections import defaultdict from collections.abc import Iterable @@ -6,6 +9,8 @@ from protean.domain import DomainObjects from protean.utils import fully_qualified_name +logger = logging.getLogger('protean.core.broker.base') + class _BrokerMetaclass(type): """ @@ -89,5 +94,6 @@ def register(self, initiator_cls, consumer_cls): for initiator in initiator_cls: if initiator.element_type == DomainObjects.DOMAIN_EVENT: self._subscribers[fully_qualified_name(initiator)].add(consumer_cls) + logger.debug(f"Registered Subscriber {consumer_cls.__name__} with broker {self.name}") else: self._command_handlers[fully_qualified_name(initiator)] = consumer_cls diff --git a/src/protean/domain.py b/src/protean/domain.py index e6b7dbd6..05e4dc79 100644 --- a/src/protean/domain.py +++ b/src/protean/domain.py @@ -893,6 +893,7 @@ def _initialize_brokers(self): configured_brokers = self.config['BROKERS'] broker_objects = {} + logger.debug("Initializing brokers...") if configured_brokers and isinstance(configured_brokers, dict): if 'default' not in configured_brokers: raise ConfigurationError( diff --git a/src/protean/impl/broker/celery_broker.py b/src/protean/impl/broker/celery_broker.py new file mode 100644 index 00000000..0afd6027 --- /dev/null +++ b/src/protean/impl/broker/celery_broker.py @@ -0,0 +1,97 @@ +# Standard Library Imports +import logging +import logging.config + +from collections.abc import Iterable + +from celery import Celery, Task +from kombu import Queue + +# Protean +from protean.core.broker.base import BaseBroker +from protean.core.domain_event import BaseDomainEvent +from protean.domain import DomainObjects +from protean.utils import fully_qualified_name +from protean.utils.inflection import underscore + +logger = logging.getLogger('protean.impl.broker.celery') + + +class ProteanTask(Task): + """The default base class for all Task classes constructed from Subscribers/Command Handlers. + """ + pass + + +class CeleryBroker(BaseBroker): + def __init__(self, name, domain, conn_info): + super().__init__(name, domain, conn_info) + self.celery_app = Celery( + broker=conn_info['URI'], + backend=conn_info['URI'], + ) + + self.celery_app.conf.update(enable_utc=True) + + # We construct queues dynamically when subscribers register + self.queues = [] + + def construct_and_register_celery_task(self, consumer_cls): + """Constructs a Celery-compliant Task class and also registers + Task with Celery App + + Arguments: + consumer_cls {BaseSubscriber} -- The Subscriber or Command Handler class + to be converted into a Celery Task + + Returns: + ProteanTask -- Decorated and Registered Celery Task class + """ + attrs = consumer_cls.__dict__ + custom_attrs = { + 'run': attrs['notify'], # `notify` is the method to run on event + 'name': underscore(fully_qualified_name(consumer_cls)), # `name` will be the same as the task's queue + } + attrs = {**attrs, **custom_attrs} + + # Construct `decorated_cls` dynamically from `ProteanTask`. + # `ProteanTask` acts as the base class for all celery tasks. + decorated_cls = type(consumer_cls.__name__ + 'Task', (ProteanTask, ), {**attrs}) + + # Register Task class with Celery app + decorated_cls_instance = self.celery_app.register_task(decorated_cls()) + + # Add to Queue so that workers pick it up automatically + self.queues.append(Queue(decorated_cls.name)) + + return decorated_cls_instance + + def register(self, initiator_cls, consumer_cls): + """Registers Domain Events and Commands with Subscribers/Command Handlers + + Arguments: + initiator_cls {list} -- One or more Domain Events or Commands + consumer_cls {Subscriber/CommandHandler} -- The consumer class connected to the Domain Event or Command + """ + if not isinstance(initiator_cls, Iterable): + initiator_cls = [initiator_cls] + + decorated_cls_instance = self.construct_and_register_celery_task(consumer_cls) + + for initiator in initiator_cls: + if initiator.element_type == DomainObjects.DOMAIN_EVENT: + self._subscribers[fully_qualified_name(initiator)].add(decorated_cls_instance) + logger.debug(f"Registered Subscriber {decorated_cls_instance.__class__.__name__} with queue " + "{self.celery_app.tasks} as Celery Task") + else: + self._command_handlers[fully_qualified_name(initiator)] = decorated_cls_instance + + def send_message(self, initiator_obj): + if isinstance(initiator_obj, BaseDomainEvent): + for subscriber in self._subscribers[fully_qualified_name(initiator_obj.__class__)]: + if self.conn_info['IS_ASYNC']: + subscriber.apply_async([initiator_obj.to_dict()], queue=subscriber.name) + else: + subscriber.apply([initiator_obj.to_dict()]) + else: + raise NotImplementedError diff --git a/src/protean/impl/broker/memory_broker.py b/src/protean/impl/broker/memory_broker.py index 2b4a79c4..b4ecc570 100644 --- a/src/protean/impl/broker/memory_broker.py +++ b/src/protean/impl/broker/memory_broker.py @@ -16,7 +16,7 @@ def send_message(self, initiator_obj): if isinstance(initiator_obj, BaseDomainEvent): for subscriber in self._subscribers[fully_qualified_name(initiator_obj.__class__)]: subscriber_object = subscriber(current_domain, initiator_obj.__class__) - subscriber_object.notify(initiator_obj) + subscriber_object.notify(initiator_obj.to_dict()) else: command_handler = self._command_handlers[fully_qualified_name(initiator_obj.__class__)] - command_handler.notify(initiator_obj) + command_handler.notify(initiator_obj.to_dict()) diff --git a/tests/command_handler/tests.py b/tests/command_handler/tests.py index 9186bdf2..91d850a1 100644 --- a/tests/command_handler/tests.py +++ b/tests/command_handler/tests.py @@ -43,4 +43,4 @@ def test_that_domain_event_is_received_from_aggregate_command_method(self, mock, command = AddPersonCommand(first_name='John', last_name='Doe', age=21) test_domain.publish_command(command) - mock.assert_called_once_with(command) + mock.assert_called_once_with(command.to_dict()) diff --git a/tests/email_provider/elements.py b/tests/email_provider/elements.py index f1f35cc2..644bddd3 100644 --- a/tests/email_provider/elements.py +++ b/tests/email_provider/elements.py @@ -66,5 +66,5 @@ class Meta: domain_event_cls = PersonAdded def notify(self, domain_event): - email = WelcomeEmail(to=domain_event.person['email'], data=domain_event.person) + email = WelcomeEmail(to=domain_event['person']['email'], data=domain_event['person']) current_domain.send_email(email) diff --git a/tests/impl/broker/celery_broker/__init__.py b/tests/impl/broker/celery_broker/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/impl/broker/celery_broker/config.py b/tests/impl/broker/celery_broker/config.py new file mode 100644 index 00000000..86456d21 --- /dev/null +++ b/tests/impl/broker/celery_broker/config.py @@ -0,0 +1,76 @@ +# Protean +from protean.utils import Database, IdentityStrategy, IdentityType + +DEBUG = True +TESTING = True +ENV = 'development' + +# A secret key for this particular Protean installation. Used in secret-key +# hashing algorithms. +SECRET_KEY = 'tvTpk3PAfkGr5x9!2sFU%XpW7bR8cwKA' + +# Database Configuration +DATABASES = { + 'default': { + 'PROVIDER': 'protean.impl.repository.dict_repo.DictProvider', + }, + 'sqlite': { + 'PROVIDER': 'protean.impl.repository.sqlalchemy_repo.SAProvider', + 'DATABASE': Database.SQLITE.value, + 'DATABASE_URI': 'sqlite:///test.db', + }, +} + +# Identity strategy to use when persisting Entities/Aggregates. +# +# Options: +# +# * IdentityStrategy.UUID: Default option, and preferred. Identity is a UUID and generated during `build` time. +# Persisted along with other details into the data store. +# * IdentityStrategy.DATABASE: Let the database generate unique identity during persistence +# * IdentityStrategy.FUNCTION: Special function that returns a unique identifier +IDENTITY_STRATEGY = IdentityStrategy.UUID + +# Data type of Auto-Generated Identity Values +# +# Options: +# +# * INTEGER +# * STRING (Default) +IDENTITY_TYPE = IdentityType.STRING + +# Messaging Mediums +BROKERS = { + 'default': { + 'PROVIDER': 'protean.impl.broker.celery_broker.CeleryBroker', + 'URI': 'redis://127.0.0.1:6379/2', + 'IS_ASYNC': True, + }, +} + +LOGGING_CONFIG = { + 'version': 1, + 'disable_existing_loggers': False, + 'formatters': { + 'console': { + 'format': '%(asctime)s %(name)-12s %(levelname)-8s %(message)s', + }, + }, + 'handlers': { + 'console': { + 'level': 'DEBUG', + 'class': 'logging.StreamHandler', + 'formatter': 'console', + }, + }, + 'loggers': { + 'protean': { + 'handlers': ['console'], + 'level': 'DEBUG', + }, + 'rq.worker': { + 'handlers': ['console'], + 'level': 'DEBUG', + }, + }, +} diff --git a/tests/impl/broker/celery_broker/conftest.py b/tests/impl/broker/celery_broker/conftest.py new file mode 100644 index 00000000..457a26aa --- /dev/null +++ b/tests/impl/broker/celery_broker/conftest.py @@ -0,0 +1,59 @@ +# Standard Library Imports +import os + +# Protean +import pytest + +from redis import Redis + + +def initialize_domain(): + from protean.domain import Domain + domain = Domain('RQ Tests') + + # Construct relative path to config file + current_path = os.path.abspath(os.path.dirname(__file__)) + config_path = os.path.join(current_path, "./config.py") + + if os.path.exists(config_path): + domain.config.from_pyfile(config_path) + + domain.domain_context().push() + return domain + + +@pytest.fixture(autouse=True) +def test_domain(): + domain = initialize_domain() + + yield domain + + +@pytest.fixture(scope="module") +def test_domain_for_worker(): + domain = initialize_domain() + + yield domain + + +@pytest.fixture(scope="session", autouse=True) +def setup_redis(): + # Initialize Redis + # FIXME + + yield + + # Close connection to Redis + # FIXME + + +@pytest.fixture(autouse=True) +def run_around_tests(test_domain): + + yield + + # Flush all in Redis + # FIXME + + if test_domain.has_provider('default'): + test_domain.get_provider('default')._data_reset() diff --git a/tests/impl/broker/celery_broker/elements.py b/tests/impl/broker/celery_broker/elements.py new file mode 100644 index 00000000..5706ead5 --- /dev/null +++ b/tests/impl/broker/celery_broker/elements.py @@ -0,0 +1,54 @@ +# Protean +from protean.core.aggregate import BaseAggregate +from protean.core.broker.subscriber import BaseSubscriber +from protean.core.domain_event import BaseDomainEvent +from protean.core.field.basic import Auto, Integer, String +from protean.globals import current_domain + + +class Person(BaseAggregate): + first_name = String(max_length=50, required=True) + last_name = String(max_length=50, required=True) + age = Integer(default=21) + + @classmethod + def add_newcomer(cls, person_dict): + """Factory method to add a new Person to the system""" + newcomer = Person( + first_name=person_dict['first_name'], + last_name=person_dict['last_name'], + age=person_dict['age'], + ) + + # Publish Event via the domain + current_domain.publish( + PersonAdded( + id=newcomer.id, + first_name=newcomer.first_name, + last_name=newcomer.last_name, + age=newcomer.age, + )) + + return newcomer + + +class PersonAdded(BaseDomainEvent): + id = Auto(identifier=True) + first_name = String(max_length=50, required=True) + last_name = String(max_length=50, required=True) + age = Integer(default=21) + + +class NotifySSOSubscriber(BaseSubscriber): + """Subscriber that notifies an external SSO system + that a new person was added into the system + """ + + class Meta: + domain_event_cls = PersonAdded + + def notify(self, domain_event_dict): + print("Received Domain Event: ", domain_event_dict) + print("Domain Event class: ", self.meta_.domain_event_cls) + + print("Current domain: ", current_domain.domain_name) diff --git a/tests/impl/broker/celery_broker/test_subscriber.py b/tests/impl/broker/celery_broker/test_subscriber.py new file mode 100644 index 00000000..5cc390c7 --- /dev/null +++ b/tests/impl/broker/celery_broker/test_subscriber.py @@ -0,0 +1,38 @@ +import pytest + +from celery import Task + +from protean.globals import current_domain +from protean.impl.broker.celery_broker import CeleryBroker, ProteanTask + +from tests.impl.broker.celery_broker.elements import NotifySSOSubscriber, Person + + +class TestSubscriberNotifications: + @pytest.fixture(autouse=True) + def register(self): + current_domain.register(Person) + current_domain.register(NotifySSOSubscriber) + + @pytest.fixture + def broker(self): + return current_domain.get_broker('default') + + @pytest.fixture + def decorated_task_obj(self, broker): + return broker.construct_and_register_celery_task(NotifySSOSubscriber) + + def test_that_broker_is_celery(self, broker): + assert isinstance(broker, CeleryBroker) + + def test_task_class_construction_and_registration(self, broker, decorated_task_obj): + assert decorated_task_obj is not None + assert isinstance(decorated_task_obj, ProteanTask) + assert isinstance(decorated_task_obj, Task) + + assert decorated_task_obj.name == 'tests.impl.broker.celery_broker.elements.notify_sso_subscriber' + assert decorated_task_obj.name in broker.celery_app.tasks + + @pytest.mark.skip(reason="Yet to implement") + def test_queue_associated_with_subscriber(self): + pass diff --git a/tests/impl/broker/celery_broker/tests.py b/tests/impl/broker/celery_broker/tests.py new file mode 100644 index 00000000..75d28f7e --- /dev/null +++ b/tests/impl/broker/celery_broker/tests.py @@ -0,0 +1,38 @@ +# Protean +import pytest + +from mock import patch +from protean.globals import current_domain +from protean.impl.broker.celery_broker import CeleryBroker +from tests.impl.broker.celery_broker.elements import NotifySSOSubscriber, Person, PersonAdded + + +class TestRedisConnection(): + def test_that_configured_broker_is_celery_with_redis(self): + assert current_domain.has_broker('default') + broker = current_domain.get_broker('default') + + assert isinstance(broker, CeleryBroker) + assert broker.conn_info['URI'] == 'redis://127.0.0.1:6379/2' + assert broker.celery_app is not None + + +class TestEventProcessing: + + @pytest.fixture(autouse=True) + def register(self): + current_domain.register(Person) + current_domain.register(NotifySSOSubscriber) + + @patch.object(CeleryBroker, 'send_message') + def test_that_an_event_is_published_to_the_broker(self, mock): + newcomer = Person.add_newcomer({'first_name': 'John', 'last_name': 'Doe', 'age': 21}) + mock.assert_called_once_with( + PersonAdded( + id=newcomer.id, + first_name='John', + last_name='Doe', + age=21)) + + def test_that_events_are_available_on_queue_after_publish(self): + Person.add_newcomer({'first_name': 'John', 'last_name': 'Doe', 'age': 21}) diff --git a/tests/subscriber/tests.py b/tests/subscriber/tests.py index f84715c4..b82681c5 100644 --- a/tests/subscriber/tests.py +++ b/tests/subscriber/tests.py @@ -40,4 +40,4 @@ def test_that_domain_event_is_received_from_aggregate_command_method(self, mock, test_domain.register(NotifySSOSubscriber) newcomer = Person.add_newcomer({'first_name': 'John', 'last_name': 'Doe', 'age': 21}) - mock.assert_called_once_with(PersonAdded(person=newcomer)) + mock.assert_called_once_with(PersonAdded(person=newcomer).to_dict())