From 8c66a89cbad1c4ddc83d82b33872365762014e7c Mon Sep 17 00:00:00 2001 From: Subhash Bhushan Date: Tue, 10 Aug 2021 14:54:03 -0700 Subject: [PATCH] Fix to bring celery broker to new message format --- CHANGELOG.rst | 5 +++++ src/protean/adapters/broker/celery.py | 8 ++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index c1b21050..a0b79960 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,11 @@ Release History =============== +0.6.1 +----- + +* Fix to adapt celery broker to new message payload format + 0.6.0 ----- diff --git a/src/protean/adapters/broker/celery.py b/src/protean/adapters/broker/celery.py index ee20573f..f3227254 100644 --- a/src/protean/adapters/broker/celery.py +++ b/src/protean/adapters/broker/celery.py @@ -14,7 +14,7 @@ fetch_element_cls_from_registry, fully_qualified_name, ) -from protean.utils.inflection import camelize, underscore +from protean.utils.inflection import underscore logger = logging.getLogger("protean.adapters.celery") @@ -97,12 +97,12 @@ def register(self, initiator_cls, consumer_cls): def publish(self, message: Dict): if message["type"] == MessageType.EVENT.value: event_cls = fetch_element_cls_from_registry( - camelize(message["name"]), (DomainObjects.EVENT,) + message["name"], (DomainObjects.EVENT,) ) for subscriber in self._subscribers[fully_qualified_name(event_cls)]: if self.conn_info["IS_ASYNC"]: - subscriber.apply_async([message], queue=subscriber.name) + subscriber.apply_async([message["payload"]], queue=subscriber.name) else: - subscriber.apply([message]) + subscriber.apply([message["payload"]]) else: raise NotImplementedError