Skip to content

Commit

Permalink
Add support for Celery as Broker (#318)
Browse files Browse the repository at this point in the history
This PR adds support for using Celery as the background worker
for processing events and commands.
  • Loading branch information
subhashb authored May 1, 2020
1 parent 1b29014 commit 145a712
Show file tree
Hide file tree
Showing 14 changed files with 375 additions and 5 deletions.
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def read(*names, **kwargs):
'psycopg2>=2.8.4',
'python-dateutil>=2.8.1',
'rq>=1.3.0',
'celery[redis]>=4.4.2',
'sendgrid>=6.1.3',
'sqlalchemy>=1.3.15',
'werkzeug>=1.0.0',
Expand Down
6 changes: 6 additions & 0 deletions src/protean/core/broker/base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
# Standard Library Imports
import logging
import logging.config

from abc import abstractmethod
from collections import defaultdict
from collections.abc import Iterable

from protean.domain import DomainObjects
from protean.utils import fully_qualified_name

logger = logging.getLogger('protean.core.broker.base')


class _BrokerMetaclass(type):
"""
Expand Down Expand Up @@ -89,5 +94,6 @@ def register(self, initiator_cls, consumer_cls):
for initiator in initiator_cls:
if initiator.element_type == DomainObjects.DOMAIN_EVENT:
self._subscribers[fully_qualified_name(initiator)].add(consumer_cls)
logger.debug(f"Registered Subscriber {consumer_cls.__name__} with broker {self.name}")
else:
self._command_handlers[fully_qualified_name(initiator)] = consumer_cls
1 change: 1 addition & 0 deletions src/protean/domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,7 @@ def _initialize_brokers(self):
configured_brokers = self.config['BROKERS']
broker_objects = {}

logger.debug("Initializing brokers...")
if configured_brokers and isinstance(configured_brokers, dict):
if 'default' not in configured_brokers:
raise ConfigurationError(
Expand Down
97 changes: 97 additions & 0 deletions src/protean/impl/broker/celery_broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Standard Library Imports
import logging
import logging.config

from collections.abc import Iterable

from celery import Celery, Task
from kombu import Queue

# Protean
from protean.core.broker.base import BaseBroker
from protean.core.domain_event import BaseDomainEvent
from protean.domain import DomainObjects
from protean.utils import fully_qualified_name
from protean.utils.inflection import underscore

logger = logging.getLogger('protean.impl.broker.celery')


class ProteanTask(Task):
"""The default base class for all Task classes constructed from Subscribers/Command Handlers.
"""
pass


class CeleryBroker(BaseBroker):
def __init__(self, name, domain, conn_info):
super().__init__(name, domain, conn_info)
self.celery_app = Celery(
broker=conn_info['URI'],
backend=conn_info['URI'],
)

self.celery_app.conf.update(enable_utc=True)

# We construct queues dynamically when subscribers register
self.queues = []

def construct_and_register_celery_task(self, consumer_cls):
"""Constructs a Celery-compliant Task class and also registers
Task with Celery App
Arguments:
consumer_cls {BaseSubscriber} -- The Subscriber or Command Handler class
to be converted into a Celery Task
Returns:
ProteanTask -- Decorated and Registered Celery Task class
"""
attrs = consumer_cls.__dict__
custom_attrs = {
'run': attrs['notify'], # `notify` is the method to run on event
'name': underscore(fully_qualified_name(consumer_cls)), # `name` will be the same as the task's queue
}
attrs = {**attrs, **custom_attrs}

# Construct `decorated_cls` dynamically from `ProteanTask`.
# `ProteanTask` acts as the base class for all celery tasks.
decorated_cls = type(consumer_cls.__name__ + 'Task', (ProteanTask, ), {**attrs})

# Register Task class with Celery app
decorated_cls_instance = self.celery_app.register_task(decorated_cls())

# Add to Queue so that workers pick it up automatically
self.queues.append(Queue(decorated_cls.name))

return decorated_cls_instance

def register(self, initiator_cls, consumer_cls):
"""Registers Domain Events and Commands with Subscribers/Command Handlers
Arguments:
initiator_cls {list} -- One or more Domain Events or Commands
consumer_cls {Subscriber/CommandHandler} -- The consumer class connected to the Domain Event or Command
"""
if not isinstance(initiator_cls, Iterable):
initiator_cls = [initiator_cls]

decorated_cls_instance = self.construct_and_register_celery_task(consumer_cls)

for initiator in initiator_cls:
if initiator.element_type == DomainObjects.DOMAIN_EVENT:
self._subscribers[fully_qualified_name(initiator)].add(decorated_cls_instance)
logger.debug(f"Registered Subscriber {decorated_cls_instance.__class__.__name__} with queue "
"{self.celery_app.tasks} as Celery Task")
else:
self._command_handlers[fully_qualified_name(initiator)] = decorated_cls_instance

def send_message(self, initiator_obj):
if isinstance(initiator_obj, BaseDomainEvent):
for subscriber in self._subscribers[fully_qualified_name(initiator_obj.__class__)]:
if self.conn_info['IS_ASYNC']:
subscriber.apply_async([initiator_obj.to_dict()], queue=subscriber.name)
else:
subscriber.apply([initiator_obj.to_dict()])
else:
raise NotImplementedError
4 changes: 2 additions & 2 deletions src/protean/impl/broker/memory_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def send_message(self, initiator_obj):
if isinstance(initiator_obj, BaseDomainEvent):
for subscriber in self._subscribers[fully_qualified_name(initiator_obj.__class__)]:
subscriber_object = subscriber(current_domain, initiator_obj.__class__)
subscriber_object.notify(initiator_obj)
subscriber_object.notify(initiator_obj.to_dict())
else:
command_handler = self._command_handlers[fully_qualified_name(initiator_obj.__class__)]
command_handler.notify(initiator_obj)
command_handler.notify(initiator_obj.to_dict())
2 changes: 1 addition & 1 deletion tests/command_handler/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ def test_that_domain_event_is_received_from_aggregate_command_method(self, mock,

command = AddPersonCommand(first_name='John', last_name='Doe', age=21)
test_domain.publish_command(command)
mock.assert_called_once_with(command)
mock.assert_called_once_with(command.to_dict())
2 changes: 1 addition & 1 deletion tests/email_provider/elements.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,5 @@ class Meta:
domain_event_cls = PersonAdded

def notify(self, domain_event):
email = WelcomeEmail(to=domain_event.person['email'], data=domain_event.person)
email = WelcomeEmail(to=domain_event['person']['email'], data=domain_event['person'])
current_domain.send_email(email)
Empty file.
76 changes: 76 additions & 0 deletions tests/impl/broker/celery_broker/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Protean
from protean.utils import Database, IdentityStrategy, IdentityType

DEBUG = True
TESTING = True
ENV = 'development'

# A secret key for this particular Protean installation. Used in secret-key
# hashing algorithms.
SECRET_KEY = 'tvTpk3PAfkGr5x9!2sFU%XpW7bR8cwKA'

# Database Configuration
DATABASES = {
'default': {
'PROVIDER': 'protean.impl.repository.dict_repo.DictProvider',
},
'sqlite': {
'PROVIDER': 'protean.impl.repository.sqlalchemy_repo.SAProvider',
'DATABASE': Database.SQLITE.value,
'DATABASE_URI': 'sqlite:///test.db',
},
}

# Identity strategy to use when persisting Entities/Aggregates.
#
# Options:
#
# * IdentityStrategy.UUID: Default option, and preferred. Identity is a UUID and generated during `build` time.
# Persisted along with other details into the data store.
# * IdentityStrategy.DATABASE: Let the database generate unique identity during persistence
# * IdentityStrategy.FUNCTION: Special function that returns a unique identifier
IDENTITY_STRATEGY = IdentityStrategy.UUID

# Data type of Auto-Generated Identity Values
#
# Options:
#
# * INTEGER
# * STRING (Default)
IDENTITY_TYPE = IdentityType.STRING

# Messaging Mediums
BROKERS = {
'default': {
'PROVIDER': 'protean.impl.broker.celery_broker.CeleryBroker',
'URI': 'redis://127.0.0.1:6379/2',
'IS_ASYNC': True,
},
}

LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'console': {
'format': '%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
},
},
'handlers': {
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler',
'formatter': 'console',
},
},
'loggers': {
'protean': {
'handlers': ['console'],
'level': 'DEBUG',
},
'rq.worker': {
'handlers': ['console'],
'level': 'DEBUG',
},
},
}
59 changes: 59 additions & 0 deletions tests/impl/broker/celery_broker/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Standard Library Imports
import os

# Protean
import pytest

from redis import Redis


def initialize_domain():
from protean.domain import Domain
domain = Domain('RQ Tests')

# Construct relative path to config file
current_path = os.path.abspath(os.path.dirname(__file__))
config_path = os.path.join(current_path, "./config.py")

if os.path.exists(config_path):
domain.config.from_pyfile(config_path)

domain.domain_context().push()
return domain


@pytest.fixture(autouse=True)
def test_domain():
domain = initialize_domain()

yield domain


@pytest.fixture(scope="module")
def test_domain_for_worker():
domain = initialize_domain()

yield domain


@pytest.fixture(scope="session", autouse=True)
def setup_redis():
# Initialize Redis
# FIXME

yield

# Close connection to Redis
# FIXME


@pytest.fixture(autouse=True)
def run_around_tests(test_domain):

yield

# Flush all in Redis
# FIXME

if test_domain.has_provider('default'):
test_domain.get_provider('default')._data_reset()
54 changes: 54 additions & 0 deletions tests/impl/broker/celery_broker/elements.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Protean
from protean.core.aggregate import BaseAggregate
from protean.core.broker.subscriber import BaseSubscriber
from protean.core.domain_event import BaseDomainEvent
from protean.core.field.basic import Auto, Integer, String
from protean.globals import current_domain


class Person(BaseAggregate):
first_name = String(max_length=50, required=True)
last_name = String(max_length=50, required=True)
age = Integer(default=21)

@classmethod
def add_newcomer(cls, person_dict):
"""Factory method to add a new Person to the system"""
newcomer = Person(
first_name=person_dict['first_name'],
last_name=person_dict['last_name'],
age=person_dict['age'],
)

# Publish Event via the domain
current_domain.publish(
PersonAdded(
id=newcomer.id,
first_name=newcomer.first_name,
last_name=newcomer.last_name,
age=newcomer.age,
))

return newcomer


class PersonAdded(BaseDomainEvent):
id = Auto(identifier=True)
first_name = String(max_length=50, required=True)
last_name = String(max_length=50, required=True)
age = Integer(default=21)


class NotifySSOSubscriber(BaseSubscriber):
"""Subscriber that notifies an external SSO system
that a new person was added into the system
"""

class Meta:
domain_event_cls = PersonAdded

def notify(self, domain_event_dict):
print("Received Domain Event: ", domain_event_dict)
print("Domain Event class: ", self.meta_.domain_event_cls)

print("Current domain: ", current_domain.domain_name)
38 changes: 38 additions & 0 deletions tests/impl/broker/celery_broker/test_subscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import pytest

from celery import Task

from protean.globals import current_domain
from protean.impl.broker.celery_broker import CeleryBroker, ProteanTask

from tests.impl.broker.celery_broker.elements import NotifySSOSubscriber, Person


class TestSubscriberNotifications:
@pytest.fixture(autouse=True)
def register(self):
current_domain.register(Person)
current_domain.register(NotifySSOSubscriber)

@pytest.fixture
def broker(self):
return current_domain.get_broker('default')

@pytest.fixture
def decorated_task_obj(self, broker):
return broker.construct_and_register_celery_task(NotifySSOSubscriber)

def test_that_broker_is_celery(self, broker):
assert isinstance(broker, CeleryBroker)

def test_task_class_construction_and_registration(self, broker, decorated_task_obj):
assert decorated_task_obj is not None
assert isinstance(decorated_task_obj, ProteanTask)
assert isinstance(decorated_task_obj, Task)

assert decorated_task_obj.name == 'tests.impl.broker.celery_broker.elements.notify_sso_subscriber'
assert decorated_task_obj.name in broker.celery_app.tasks

@pytest.mark.skip(reason="Yet to implement")
def test_queue_associated_with_subscriber(self):
pass
Loading

0 comments on commit 145a712

Please sign in to comment.