Skip to content

Commit

Permalink
Fix to bring celery broker to new message format
Browse files Browse the repository at this point in the history
  • Loading branch information
subhashb committed Aug 10, 2021
1 parent 8b2039e commit 8c66a89
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 4 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
Release History
===============

0.6.1
-----

* Fix to adapt celery broker to new message payload format

0.6.0
-----

Expand Down
8 changes: 4 additions & 4 deletions src/protean/adapters/broker/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
fetch_element_cls_from_registry,
fully_qualified_name,
)
from protean.utils.inflection import camelize, underscore
from protean.utils.inflection import underscore

logger = logging.getLogger("protean.adapters.celery")

Expand Down Expand Up @@ -97,12 +97,12 @@ def register(self, initiator_cls, consumer_cls):
def publish(self, message: Dict):
if message["type"] == MessageType.EVENT.value:
event_cls = fetch_element_cls_from_registry(
camelize(message["name"]), (DomainObjects.EVENT,)
message["name"], (DomainObjects.EVENT,)
)
for subscriber in self._subscribers[fully_qualified_name(event_cls)]:
if self.conn_info["IS_ASYNC"]:
subscriber.apply_async([message], queue=subscriber.name)
subscriber.apply_async([message["payload"]], queue=subscriber.name)
else:
subscriber.apply([message])
subscriber.apply([message["payload"]])
else:
raise NotImplementedError

0 comments on commit 8c66a89

Please sign in to comment.