Skip to content

Commit

Permalink
Merge pull request #321 from phenobarbital/dev
Browse files Browse the repository at this point in the history
New Message Brokers
  • Loading branch information
phenobarbital authored Dec 2, 2024
2 parents e0b7bfa + 25863bb commit d82808a
Show file tree
Hide file tree
Showing 23 changed files with 1,180 additions and 338 deletions.
22 changes: 22 additions & 0 deletions examples/brokers/nav_rabbitmq_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from navigator import Application
from navigator.brokers.rabbitmq import RMQConsumer


async def rabbit_callback(*args, **kwargs):
# Handle your SQS callback here
print('Received Message:', args, kwargs)

app = Application(
port=5001
)

rmq = RMQConsumer(
callback=rabbit_callback
)
rmq.setup(app)

if __name__ == '__main__':
try:
app.run()
except KeyboardInterrupt:
print('EXIT FROM APP =========')
22 changes: 22 additions & 0 deletions examples/brokers/nav_redis_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from navigator import Application
from navigator.brokers.redis import RedisConsumer


async def redis_callback(*args, **kwargs):
# Handle your SQS callback here
print('Received Message:', args, kwargs)

app = Application(
port=5001
)

rmq = RedisConsumer(
callback=redis_callback
)
rmq.setup(app)

if __name__ == '__main__':
try:
app.run()
except KeyboardInterrupt:
print('EXIT FROM APP =========')
22 changes: 22 additions & 0 deletions examples/brokers/nav_sqs_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from navigator import Application
from navigator.brokers.sqs import SQSConsumer


async def sqs_callback(*args, **kwargs):
# Handle your SQS callback here
print('Received Message:', args, kwargs)

app = Application(
port=5001
)

sqs = SQSConsumer(
callback=sqs_callback
)
sqs.setup(app)

if __name__ == '__main__':
try:
app.run()
except KeyboardInterrupt:
print('EXIT FROM APP =========')
12 changes: 6 additions & 6 deletions examples/test_sqs_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,27 @@ async def main():
)
async with connection as sqs:
# Create an SQS Queue
queue_name = "MainEvent"
queue_name = "navigator"
print(f"Creating queue: {queue_name}")
queue = await sqs.create_queue(queue_name)
queue_url = queue.url
print(f"Queue URL: {queue_url}")
# # Publish a JSON Message
# await sqs.publish_message("MyTestQueue", {"key": "value"})
# await sqs.publish_message({"key": "value"}, "MyTestQueue")
# # Publish JSONPickle
# model = ExampleModel(name="John Doe", age=30)
# await sqs.publish_message("MyTestQueue", model)
# await sqs.publish_message(model, "MyTestQueue")
# # Dataclasses:
# mdl = Example(name="John Doe", age=30)
# await sqs.publish_message("MyTestQueue", mdl)
# await sqs.publish_message(mdl, "MyTestQueue")

# # Publish CloudPickle
# class CustomWrapper:
# def __init__(self, data):
# self.data = data

# wrapper = CustomWrapper(data={"nested_key": "nested_value"})
# await sqs.publish_message("MyTestQueue", wrapper)
# await sqs.publish_message(wrapper, "MyTestQueue")

form = {
"metadata": {
Expand All @@ -70,7 +70,7 @@ async def main():
}
}
# Publish plain text
await sqs.publish_message("MainEvent", form)
await sqs.publish_message(form, "navigator")

if __name__ == "__main__":
try:
Expand Down
4 changes: 2 additions & 2 deletions navigator/applications/base.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ cdef class BaseApplication:
### Application handler:
self.handler = None
self.description: str = description
self.host = APP_HOST
self.port = APP_PORT
self.host = kwargs.pop('host', APP_HOST)
self.port = kwargs.pop('port', APP_PORT)
self.path = None
self.title = title if title else APP_NAME
self.contact = contact
Expand Down
33 changes: 30 additions & 3 deletions navigator/brokers/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
from collections.abc import Awaitable, Callable
from abc import ABC, abstractmethod
import asyncio
from aiohttp import web
from navconfig.logging import logging
from navigator.applications.base import BaseApplication
from .pickle import DataSerializer


Expand All @@ -16,7 +18,8 @@ class BaseConnection(ABC):

def __init__(
self,
credentials: Union[str, dict],
*args,
credentials: Union[str, dict] = None,
timeout: Optional[int] = 5,
**kwargs
):
Expand All @@ -33,6 +36,7 @@ def __init__(
self.reconnect_delay = 1 # Initial delay in seconds
self._lock = asyncio.Lock()
self._serializer = DataSerializer()
super().__init__(*args, **kwargs)

def get_connection(self) -> Optional[Union[Callable, Awaitable]]:
if not self._connection:
Expand Down Expand Up @@ -73,9 +77,8 @@ async def ensure_connection(self) -> None:
@abstractmethod
async def publish_message(
self,
exchange_name: str,
routing_key: str,
body: Union[str, list, dict, Any],
queue_name: Optional[str] = None,
**kwargs
) -> None:
"""
Expand Down Expand Up @@ -105,3 +108,27 @@ async def process_message(
Process a message from the Broker Service.
"""
raise NotImplementedError

async def start(self, app: web.Application) -> None:
await self.connect()

async def stop(self, app: web.Application) -> None:
# close the RabbitMQ connection
await self.disconnect()

def setup(self, app: web.Application = None) -> None:
"""
Setup Broker Connection.
"""
if isinstance(app, BaseApplication):
self.app = app.get_app()
else:
self.app = app
if self.app is None:
raise ValueError(
'App is not defined.'
)
# Initialize the Producer instance.
self.app.on_startup.append(self.start)
self.app.on_shutdown.append(self.stop)
self.app[self._name_] = self
53 changes: 53 additions & 0 deletions navigator/brokers/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from abc import ABC, abstractmethod
from typing import Awaitable, Callable, Union, Optional, Any
from navconfig.logging import logging


class BrokerConsumer(ABC):
"""
Broker Consumer Interface.
"""
_name_: str = "broker_consumer"

def __init__(
self,
callback: Optional[Union[Awaitable, Callable]] = None,
**kwargs
):
self._queue_name = kwargs.get('queue_name', 'navigator')
self.logger = logging.getLogger('Broker.Consumer')
self._callback_ = callback if callback else self.subscriber_callback

@abstractmethod
async def event_subscribe(
self,
queue_name: str,
callback: Union[Callable, Awaitable]
) -> None:
"""
Subscribe to a Queue and consume messages.
"""
pass

@abstractmethod
async def subscriber_callback(
self,
message: Any,
body: str
) -> None:
"""
Default Callback for Event Subscription.
"""
pass

@abstractmethod
def wrap_callback(
self,
callback: Callable[[Any, str], Awaitable[None]],
requeue_on_fail: bool = False,
max_retries: int = 3
) -> Callable:
"""
Wrap the user-provided callback to handle message decoding and
acknowledgment.
"""
Loading

0 comments on commit d82808a

Please sign in to comment.