From 5a20f5fd2f426a2f742e0013e2ca425ce009c529 Mon Sep 17 00:00:00 2001 From: Jesus Lara Date: Sun, 1 Dec 2024 23:31:10 +0100 Subject: [PATCH 1/3] re-organize code for broker messages services (redis, rabbitmq, sqs) --- navigator/brokers/consumer.py | 97 ++++++ navigator/brokers/producer.py | 285 ++++++++++++++++++ navigator/brokers/rabbitmq/__init__.py | 1 + .../rabbitmq/{rabbit.py => connection.py} | 10 +- navigator/brokers/rabbitmq/consumer.py | 47 +-- navigator/brokers/rabbitmq/producer.py | 283 +---------------- navigator/brokers/sqs/__init__.py | 2 +- .../brokers/sqs/{sqs.py => connection.py} | 4 +- 8 files changed, 416 insertions(+), 313 deletions(-) create mode 100644 navigator/brokers/consumer.py create mode 100644 navigator/brokers/producer.py rename navigator/brokers/rabbitmq/{rabbit.py => connection.py} (97%) rename navigator/brokers/sqs/{sqs.py => connection.py} (99%) diff --git a/navigator/brokers/consumer.py b/navigator/brokers/consumer.py new file mode 100644 index 0000000..a06443b --- /dev/null +++ b/navigator/brokers/consumer.py @@ -0,0 +1,97 @@ +from abc import ABC, abstractmethod +from typing import Awaitable, Callable, Union, Optional, Any +from aiohttp import web +from navconfig.logging import logging +from navigator.applications.base import BaseApplication +from .connection import BaseConnection + + +class BrokerConsumer(BaseConnection, ABC): + """ + Broker Consumer Interface. + """ + _name_: str = "broker_consumer" + + def __init__( + self, + credentials: Union[str, dict], + timeout: Optional[int] = 5, + callback: Optional[Union[Awaitable, Callable]] = None, + **kwargs + ): + self._queue_name = kwargs.get('queue_name', 'navigator') + super(BrokerConsumer, self).__init__(credentials, timeout, **kwargs) + self.logger = logging.getLogger('Broker.Consumer') + self._callback_ = callback if callback else self.subscriber_callback + + @abstractmethod + async def connect(self): + """ + Connect to Broker. + """ + pass + + @abstractmethod + async def disconnect(self): + """ + Disconnect from Broker. + """ + pass + + async def start(self, app: web.Application) -> None: + await self.connect() + + async def stop(self, app: web.Application) -> None: + # close the RabbitMQ connection + await self.disconnect() + + def setup(self, app: web.Application = None) -> None: + """ + Setup BrokerManager. + """ + if isinstance(app, BaseApplication): + self.app = app.get_app() + else: + self.app = app + if self.app is None: + raise ValueError( + 'App is not defined.' + ) + # Initialize the Producer instance. + self.app.on_startup.append(self.start) + self.app.on_shutdown.append(self.stop) + self.app[self._name_] = self + + @abstractmethod + async def event_subscribe( + self, + queue_name: str, + callback: Union[Callable, Awaitable] + ) -> None: + """ + Subscribe to a Queue and consume messages. + """ + pass + + @abstractmethod + async def subscriber_callback( + self, + message: Any, + body: str + ) -> None: + """ + Default Callback for Event Subscription. + """ + pass + + @abstractmethod + def wrap_callback( + self, + callback: Callable[[Any, str], Awaitable[None]], + requeue_on_fail: bool = False, + max_retries: int = 3 + ) -> Callable: + """ + Wrap the user-provided callback to handle message decoding and + acknowledgment. + """ diff --git a/navigator/brokers/producer.py b/navigator/brokers/producer.py new file mode 100644 index 0000000..a1799cc --- /dev/null +++ b/navigator/brokers/producer.py @@ -0,0 +1,285 @@ +from abc import ABC, abstractmethod +from typing import Awaitable, Callable, Union, Optional, Any +import asyncio +from functools import wraps +from aiohttp import web +from navconfig.logging import logging +from navigator_session import get_session +from navigator_auth.conf import ( + AUTH_SESSION_OBJECT +) +from navigator.applications.base import BaseApplication +from .connection import BaseConnection +from ..conf import BROKER_MANAGER_QUEUE_SIZE + + +class BrokerProducer(BaseConnection, ABC): + """ + Broker Producer Interface. + + Args: + credentials: Message Queue Credentials. + queue_size: Size of Asyncio Queue for enqueuing messages before send. + num_workers: Number of workers to process the queue. + timeout: Timeout for MQ Connection. + + """ + _name_: str = "broker_producer" + + def __init__( + self, + credentials: Union[str, dict], + queue_size: Optional[int] = None, + num_workers: Optional[int] = 4, + timeout: Optional[int] = 5, + **kwargs + ): + self.queue_size = queue_size if queue_size else BROKER_MANAGER_QUEUE_SIZE + self.app: Optional[BaseApplication] = None + self.timeout: int = timeout + self.logger = logging.getLogger('Broker.Producer') + self.event_queue = asyncio.Queue(maxsize=self.queue_size) + self._num_workers = num_workers + self._workers = [] + self._broker_service: str = kwargs.get('broker_service', 'rabbitmq') + super(BrokerProducer, self).__init__(credentials, timeout, **kwargs) + + def setup(self, app: web.Application = None) -> None: + """ + Setup BrokerManager. + """ + if isinstance(app, BaseApplication): + self.app = app.get_app() + else: + self.app = app + if self.app is None: + raise ValueError( + 'App is not defined.' + ) + # Initialize the Producer instance. + self.app.on_startup.append(self.start) + self.app.on_shutdown.append(self.stop) + self.app[self._name_] = self + ## Generic Event Subscription: + self.app.router.add_post( + f'/api/v1/broker/{self._broker_service}/publish_event', + self.event_publisher + ) + self.logger.notice( + ":: Starting Message Queue Producer ::" + ) + + async def start_workers(self): + """ + Start the Queue workers. + """ + for i in range(self._num_workers): + task = asyncio.create_task( + self._event_broker(i) + ) + self._workers.append(task) + + async def start(self, app: web.Application) -> None: + """Signal Function to be called when the application is started. + + Connect to Message Queue, starts the Queue and start publishing. + """ + await self.connect() + # Start the Queue workers + await self.start_workers() + + async def stop(self, app: web.Application) -> None: + # Wait for all events to be processed + await self.event_queue.join() + + # Cancel worker tasks + for task in self._workers: + try: + task.cancel() + except asyncio.CancelledError: + pass + + # Wait for worker tasks to finish + await asyncio.gather(*self._workers, return_exceptions=True) + + # then, close the Message Queue connection + await self.disconnect() + + async def queue_event( + self, + exchange: str, + routing_key: str, + body: str, + **kwargs + ) -> None: + """ + Puts an Event into the Queue to be processed for Producer later. + """ + try: + self.event_queue.put_nowait( + { + 'exchange': exchange, + 'routing_key': routing_key, + 'body': body, + **kwargs + } + ) + await asyncio.sleep(.1) + except asyncio.QueueFull: + self.logger.error( + "Event queue is full. Event will not published." + ) + raise + + async def publish_event( + self, + exchange: str, + routing_key: str, + body: str, + **kwargs + ) -> None: + """ + Publish Event on a Message Queue Exchange. + """ + # Ensure the exchange exists before publishing + await self.publish_message( + exchange=exchange, + routing_key=routing_key, + body=body, + **kwargs + ) + + # Event Publisher: POST API for dispatching events to RabbitMQ. + async def get_userid(self, session, idx: str = 'user_id') -> int: + try: + if AUTH_SESSION_OBJECT in session: + return session[AUTH_SESSION_OBJECT][idx] + else: + return session[idx] + except KeyError: + raise RuntimeError( + 'User ID is not found in the session.' + ) + + @staticmethod + def service_auth( + fn: Callable[..., Awaitable[Any]] + ) -> Callable[..., Awaitable[Any]]: + @wraps(fn) + async def _wrap(self, request: web.Request, *args, **kwargs) -> Any: + ## get User Session: + try: + session = await get_session(request) + except (ValueError, RuntimeError) as err: + raise web.HTTPUnauthorized( + reason=str(err) + ) + if session: + self._userid = await self.get_userid(session) + # Perform your session and user ID checks here + if not self._userid: + raise web.HTTPUnauthorized( + reason="User ID not found in session" + ) + # TODO: Checking User Permissions: + return await fn(self, request, *args, **kwargs) + return _wrap + + @service_auth + async def event_publisher( + self, + request: web.Request + ) -> web.Response: + """ + Event Publisher. + + Uses as an REST API to send events to RabbitMQ. + """ + data = await request.json() + exc = data.get('exchange', 'navigator') + routing_key = data.get('routing_key') + if not routing_key: + return web.json_response( + { + 'status': 'error', + 'message': 'routing_key is required.' + }, + status=422 + ) + body = data.get('body') + if not body: + return web.json_response( + { + 'status': 'error', + 'message': 'Message Body for Broker is required.' + }, + status=422 + ) + try: + await self.queue_event(exc, routing_key, body) + return web.json_response({ + 'status': 'success', + 'message': f'Event {exc}.{routing_key} Published Successfully.' + }) + except asyncio.QueueFull: + return web.json_response( + { + 'status': 'error', + 'message': 'Event queue is full. Please try again later.' + }, + status=429 + ) + + async def _event_broker(self, worker_id: int): + """ + Creates an Event publisher. + Worker function to publish events from the queue to RabbitMQ. + + will check if there is something available in the temporal queue. + Queue is used to avoid losing things if the publisher is not ready. + Implements backpressure handling by retrying failed publishes. + """ + while True: + # Wait for an event to be available in the queue + event = await self.event_queue.get() + try: + # data: + routing = event.get('routing_key') + exchange = event.get('exchange') + body = event.get('body') + max_retries = event.get('max_retries', 5) + retry_count = 0 + retry_delay = 1 + while True: + try: + # Publish the event to RabbitMQ + await self.publish_message( + exchange_name=exchange, + routing_key=routing, + body=body + ) + self.logger.info( + f"Worker {worker_id} published event: {routing}" + ) + # TODO: Optionally, add the event to a dead-letter queue + # await self.dead_letter_queue.put(event) + break # Exit the retry loop on success + except Exception as e: + retry_count += 1 + if retry_count >= max_retries: + self.logger.error( + f"Worker {worker_id} failed to publish event: {e}" + ) + break # Exit the retry loop on max retries + self.logger.warning( + f"Worker {worker_id} failed to publish event: {e}. " + f"Retrying in {retry_delay} seconds..." + ) + await asyncio.sleep(retry_delay) + retry_delay *= 2 # Exponential backoff + except Exception as e: + self.logger.error( + f"Error publishing event: {e}" + ) + finally: + self.event_queue.task_done() diff --git a/navigator/brokers/rabbitmq/__init__.py b/navigator/brokers/rabbitmq/__init__.py index c1e4edc..414bdb0 100644 --- a/navigator/brokers/rabbitmq/__init__.py +++ b/navigator/brokers/rabbitmq/__init__.py @@ -1,3 +1,4 @@ """ Using RabbitMQ as Message Broker. """ +from .connection import RabbitMQConnection diff --git a/navigator/brokers/rabbitmq/rabbit.py b/navigator/brokers/rabbitmq/connection.py similarity index 97% rename from navigator/brokers/rabbitmq/rabbit.py rename to navigator/brokers/rabbitmq/connection.py index 7845f26..a50c318 100644 --- a/navigator/brokers/rabbitmq/rabbit.py +++ b/navigator/brokers/rabbitmq/connection.py @@ -24,7 +24,7 @@ def __init__( timeout: Optional[int] = 5, **kwargs ): - self._dsn = rabbitmq_dsn if credentials is None else credentials + self._dsn = credentials if credentials is not None else rabbitmq_dsn super().__init__(credentials, timeout=timeout, **kwargs) self._connection: Optional[AbstractConnection] = None self._channel: Optional[AbstractChannel] = None @@ -354,7 +354,7 @@ async def wrapped_callback(message: aiormq.abc.DeliveredMessage): async def consume_messages( self, - queue: str, + queue_name: str, callback: Callable[[aiormq.abc.DeliveredMessage, str], Awaitable[None]], prefetch_count: int = 1 ) -> None: @@ -364,18 +364,18 @@ async def consume_messages( await self.ensure_connection() try: # Ensure the queue exists - await self._channel.queue_declare(queue=queue, durable=True) + await self._channel.queue_declare(queue=queue_name, durable=True) # Set QoS (Quality of Service) settings await self._channel.basic_qos(prefetch_count=prefetch_count) # Start consuming messages from the queue await self._channel.basic_consume( - queue=queue, + queue=queue_name, consumer_callback=self.wrap_callback(callback), ) self.logger.info( - f"Started consuming messages from queue '{queue}'." + f"Started consuming messages from queue '{queue_name}'." ) except Exception as e: self.logger.error( diff --git a/navigator/brokers/rabbitmq/consumer.py b/navigator/brokers/rabbitmq/consumer.py index 4441473..96222e3 100644 --- a/navigator/brokers/rabbitmq/consumer.py +++ b/navigator/brokers/rabbitmq/consumer.py @@ -8,24 +8,25 @@ from aiohttp import web import aiormq from navconfig.logging import logging -from navigator.applications.base import BaseApplication -from .rabbit import RabbitMQConnection -from ..pickle import DataSerializer +from .connection import RabbitMQConnection +from ..consumer import BrokerConsumer # Disable Debug Logging for AIORMQ logging.getLogger('aiormq').setLevel(logging.INFO) -class BrokerConsumer(RabbitMQConnection): +class RMQConsumer(BrokerConsumer, RabbitMQConnection): """ + RMQConsumer. + Broker Client (Consumer) using RabbitMQ. """ - _name_: str = "broker_consumer" + _name_: str = "rabbitmq_consumer" def __init__( self, - dsn: Optional[str] = None, + credentials: Union[str, dict], timeout: Optional[int] = 5, callback: Optional[Union[Awaitable, Callable]] = None, **kwargs @@ -33,11 +34,8 @@ def __init__( self._routing_key = kwargs.get('routing_key', '*') self._exchange_type = kwargs.get('exchange_type', 'topic') self._exchange_name = kwargs.get('exchange_name', 'navigator') - self._queue_name = kwargs.get('queue_name', 'navigator') - super(BrokerConsumer, self).__init__(dsn, timeout, **kwargs) - self.logger = logging.getLogger('BrokerConsumer') - self._serializer = DataSerializer() - self._callback_ = callback if callback else self.subscriber_callback + super(BrokerConsumer, self).__init__(credentials, timeout, **kwargs) + self.logger = logging.getLogger('RMQConsumer') async def subscriber_callback( self, @@ -59,13 +57,13 @@ async def subscriber_callback( async def event_subscribe( self, - queue: str, + queue_name: str, callback: Union[Callable, Awaitable] ) -> None: """Event Subscribe. """ await self.consume_messages( - queue=queue, + queue_name=queue_name, callback=self.wrap_callback(callback) ) @@ -123,7 +121,7 @@ async def start(self, app: web.Application) -> None: Connect to RabbitMQ, and start consuming. """ - await self.connect() + await super().start(app) await self.subscribe_to_events( exchange=self._exchange_name, queue_name=self._queue_name, @@ -134,24 +132,3 @@ async def start(self, app: web.Application) -> None: prefetch_count=1, requeue_on_fail=True, ) - - async def stop(self, app: web.Application) -> None: - # close the RabbitMQ connection - await self.disconnect() - - def setup(self, app: web.Application = None) -> None: - """ - Setup BrokerManager. - """ - if isinstance(app, BaseApplication): - self.app = app.get_app() - else: - self.app = app - if self.app is None: - raise ValueError( - 'App is not defined.' - ) - # Initialize the Producer instance. - self.app.on_startup.append(self.start) - self.app.on_shutdown.append(self.stop) - self.app[self._name_] = self diff --git a/navigator/brokers/rabbitmq/producer.py b/navigator/brokers/rabbitmq/producer.py index 1cce95e..3428266 100644 --- a/navigator/brokers/rabbitmq/producer.py +++ b/navigator/brokers/rabbitmq/producer.py @@ -3,297 +3,40 @@ can be used to send messages to RabbitMQ. """ -from typing import Any, Optional -from collections.abc import Callable, Awaitable -import asyncio -from functools import wraps -from aiohttp import web +from typing import Union, Optional from navconfig.logging import logging -from navconfig.utils.types import Singleton -from navigator.applications.base import BaseApplication -from navigator_session import get_session -from navigator_auth.conf import ( - AUTH_SESSION_OBJECT -) -from ...conf import BROKER_MANAGER_QUEUE_SIZE -from .rabbit import RabbitMQConnection -from ..pickle import DataSerializer +from .connection import RabbitMQConnection +from .producer import BrokerProducer # Disable Debug Logging for AIORMQ logging.getLogger('aiormq').setLevel(logging.INFO) +class RMQProducer(BrokerProducer, RabbitMQConnection): + """RMQProducer. -class BrokerManager(RabbitMQConnection, metaclass=Singleton): - """BrokerManager. - - BrokerManager is the Producer functionality for RabbitMQ using aiormq. - + RMQProducer is the Producer functionality for RabbitMQ using aiormq. Args: dsn: RabbitMQ DSN. queue_size: Size of Asyncio Queue for enqueuing messages before send. num_workers: Number of workers to process the queue. timeout: Timeout for RabbitMQ Connection. - """ - _name_: str = "broker_manager" - __initialized__: bool = False + _name_: str = "rabbitmq_producer" def __init__( self, - dsn: Optional[str] = None, + credentials: Union[str, dict], queue_size: Optional[int] = None, num_workers: Optional[int] = 4, timeout: Optional[int] = 5, **kwargs ): - if self.__initialized__ is True: - return - self.queue_size = queue_size if queue_size else BROKER_MANAGER_QUEUE_SIZE - self.app: Optional[BaseApplication] = None - self.timeout: int = timeout - self.logger = logging.getLogger('BrokerManager') - self.event_queue = asyncio.Queue(maxsize=self.queue_size) - self._num_workers = num_workers - self._workers = [] - super(BrokerManager, self).__init__(dsn, timeout, **kwargs) - self.__initialized__ = True - self._serializer = DataSerializer() - - def setup(self, app: web.Application = None) -> None: - """ - Setup BrokerManager. - """ - if isinstance(app, BaseApplication): - self.app = app.get_app() - else: - self.app = app - if self.app is None: - raise ValueError( - 'App is not defined.' - ) - # Initialize the Producer instance. - self.app.on_startup.append(self.open) - self.app.on_shutdown.append(self.close) - self.app[self._name_] = self - ## Generic Event Subscription: - self.app.router.add_post( - '/api/v1/broker/events/publish_event', - self.event_publisher - ) - self.logger.notice( - ":: Starting RabbitMQ Producer ::" - ) - - async def start_workers(self): - for i in range(self._num_workers): - task = asyncio.create_task( - self._event_broker(i) - ) - self._workers.append(task) - - async def _event_broker(self, worker_id: int): - """ - Creates a Publisher event publisher. - Worker function to publish events from the queue to RabbitMQ. - - will check if there is something available in the temporal queue. - Queue is used to avoid losing things if the publisher is not ready. - Implements backpressure handling by retrying failed publishes. - """ - while True: - # Wait for an event to be available in the queue - event = await self.event_queue.get() - try: - # data: - routing = event.get('routing_key') - exchange = event.get('exchange') - body = event.get('body') - max_retries = event.get('max_retries', 5) - retry_count = 0 - retry_delay = 1 - while True: - try: - # Publish the event to RabbitMQ - await self.publish_message( - exchange_name=exchange, - routing_key=routing, - body=body - ) - self.logger.info( - f"Worker {worker_id} published event: {routing}" - ) - # TODO: Optionally, add the event to a dead-letter queue - # await self.dead_letter_queue.put(event) - break # Exit the retry loop on success - except Exception as e: - retry_count += 1 - if retry_count >= max_retries: - self.logger.error( - f"Worker {worker_id} failed to publish event: {e}" - ) - break # Exit the retry loop on max retries - self.logger.warning( - f"Worker {worker_id} failed to publish event: {e}. " - f"Retrying in {retry_delay} seconds..." - ) - await asyncio.sleep(retry_delay) - retry_delay *= 2 # Exponential backoff - except Exception as e: - self.logger.error( - f"Error publishing event: {e}" - ) - finally: - self.event_queue.task_done() - - async def open(self, app: web.Application) -> None: - """Signal Function to be called when the application is started. - - Connect to RabbitMQ, starts the Queue and start publishing. - """ - await self.connect() - # Start the Queue workers - await self.start_workers() - - async def close(self, app: web.Application) -> None: - # Wait for all events to be processed - await self.event_queue.join() - - # Cancel worker tasks - for task in self._workers: - try: - task.cancel() - except asyncio.CancelledError: - pass - - # Wait for worker tasks to finish - await asyncio.gather(*self._workers, return_exceptions=True) - - # then, close the RabbitMQ connection - await self.disconnect() - - async def queue_event( - self, - exchange: str, - routing_key: str, - body: str, - **kwargs - ) -> None: - """ - Puts an Event into the Queue to be processed for Producer later. - """ - try: - self.event_queue.put_nowait( - { - 'exchange': exchange, - 'routing_key': routing_key, - 'body': body, - **kwargs - } - ) - await asyncio.sleep(.1) - except asyncio.QueueFull: - self.logger.error( - "Event queue is full. Event will not published." - ) - raise - - async def publish_event( - self, - exchange: str, - routing_key: str, - body: str, - **kwargs - ) -> None: - """ - Publish Event on a rabbitMQ Exchange. - """ - # Ensure the exchange exists before publishing - await self.publish_message( - exchange=exchange, - routing_key=routing_key, - body=body, + super(RMQProducer, self).__init__( + credentials=credentials, + queue_size=queue_size, + num_workers=num_workers, + timeout=timeout, **kwargs ) - - # Event Publisher: POST API for dispatching events to RabbitMQ. - async def get_userid(self, session, idx: str = 'user_id') -> int: - try: - if AUTH_SESSION_OBJECT in session: - return session[AUTH_SESSION_OBJECT][idx] - else: - return session[idx] - except KeyError: - raise RuntimeError( - 'User ID is not found in the session.' - ) - - @staticmethod - def service_auth( - fn: Callable[..., Awaitable[Any]] - ) -> Callable[..., Awaitable[Any]]: - @wraps(fn) - async def _wrap(self, request: web.Request, *args, **kwargs) -> Any: - ## get User Session: - try: - session = await get_session(request) - except (ValueError, RuntimeError) as err: - raise web.HTTPUnauthorized( - reason=str(err) - ) - if session: - self._userid = await self.get_userid(session) - # Perform your session and user ID checks here - if not self._userid: - raise web.HTTPUnauthorized( - reason="User ID not found in session" - ) - # TODO: Checking User Permissions: - return await fn(self, request, *args, **kwargs) - return _wrap - - @service_auth - async def event_publisher( - self, - request: web.Request - ) -> web.Response: - """ - Event Publisher. - - Uses as an REST API to send events to RabbitMQ. - """ - data = await request.json() - exc = data.get('exchange', 'navigator') - routing_key = data.get('routing_key') - if not routing_key: - return web.json_response( - { - 'status': 'error', - 'message': 'routing_key is required.' - }, - status=422 - ) - body = data.get('body') - if not body: - return web.json_response( - { - 'status': 'error', - 'message': 'Message Body for Broker is required.' - }, - status=422 - ) - try: - await self.queue_event(exc, routing_key, body) - return web.json_response({ - 'status': 'success', - 'message': f'Event {exc}.{routing_key} Published Successfully.' - }) - except asyncio.QueueFull: - return web.json_response( - { - 'status': 'error', - 'message': 'Event queue is full. Please try again later.' - }, - status=429 - ) diff --git a/navigator/brokers/sqs/__init__.py b/navigator/brokers/sqs/__init__.py index 5c6325c..1d16d7e 100644 --- a/navigator/brokers/sqs/__init__.py +++ b/navigator/brokers/sqs/__init__.py @@ -3,4 +3,4 @@ Using Amazon SQS as a Message Broker. """ -from .sqs import SQSConnection +from .connection import SQSConnection diff --git a/navigator/brokers/sqs/sqs.py b/navigator/brokers/sqs/connection.py similarity index 99% rename from navigator/brokers/sqs/sqs.py rename to navigator/brokers/sqs/connection.py index ad02190..b9d6deb 100644 --- a/navigator/brokers/sqs/sqs.py +++ b/navigator/brokers/sqs/connection.py @@ -3,9 +3,9 @@ """ from typing import Optional, Union, Any from collections.abc import Awaitable, Callable +from dataclasses import is_dataclass import asyncio import aioboto3 -from dataclasses import is_dataclass from datamodel import Model, BaseModel from navconfig import config from navconfig.logging import logging @@ -276,7 +276,7 @@ async def consume_messages( async def consume_message( self, queue_name: str, - callback: Optional[Callable[[dict, str], Awaitable[None]]] = None, + callback: Union[Callable, Awaitable[None]] = None, wait_time: int = 5, ) -> Optional[dict]: """ From 3b668e9e151748f464386cb4a34a1224331d7867 Mon Sep 17 00:00:00 2001 From: Jesus Lara Date: Mon, 2 Dec 2024 03:48:27 +0100 Subject: [PATCH 2/3] rabbitMQ, SQS and Redis streams as message brokers --- examples/brokers/nav_rabbitmq_consumer.py | 22 ++ examples/brokers/nav_redis_consumer.py | 22 ++ examples/brokers/nav_sqs_consumer.py | 22 ++ examples/test_sqs_producer.py | 12 +- navigator/applications/base.pyx | 4 +- navigator/brokers/connection.py | 33 ++- navigator/brokers/consumer.py | 46 +--- navigator/brokers/producer.py | 39 ++- navigator/brokers/rabbitmq/__init__.py | 2 + navigator/brokers/rabbitmq/connection.py | 11 +- navigator/brokers/rabbitmq/consumer.py | 14 +- navigator/brokers/rabbitmq/producer.py | 4 +- navigator/brokers/redis/__init__.py | 6 + navigator/brokers/redis/connection.py | 284 ++++++++++++++++++++++ navigator/brokers/redis/consumer.py | 146 +++++++++++ navigator/brokers/redis/producer.py | 38 +++ navigator/brokers/sqs/__init__.py | 2 + navigator/brokers/sqs/connection.py | 13 +- navigator/brokers/sqs/consumer.py | 133 ++++++++++ navigator/brokers/sqs/producer.py | 38 +++ navigator/conf.py | 30 +++ navigator/navigator.py | 4 +- 22 files changed, 832 insertions(+), 93 deletions(-) create mode 100644 examples/brokers/nav_rabbitmq_consumer.py create mode 100644 examples/brokers/nav_redis_consumer.py create mode 100644 examples/brokers/nav_sqs_consumer.py create mode 100644 navigator/brokers/redis/__init__.py create mode 100644 navigator/brokers/redis/connection.py create mode 100644 navigator/brokers/redis/consumer.py create mode 100644 navigator/brokers/redis/producer.py create mode 100644 navigator/brokers/sqs/consumer.py create mode 100644 navigator/brokers/sqs/producer.py diff --git a/examples/brokers/nav_rabbitmq_consumer.py b/examples/brokers/nav_rabbitmq_consumer.py new file mode 100644 index 0000000..3bd04c1 --- /dev/null +++ b/examples/brokers/nav_rabbitmq_consumer.py @@ -0,0 +1,22 @@ +from navigator import Application +from navigator.brokers.rabbitmq import RMQConsumer + + +async def rabbit_callback(*args, **kwargs): + # Handle your SQS callback here + print('Received Message:', args, kwargs) + +app = Application( + port=5001 +) + +rmq = RMQConsumer( + callback=rabbit_callback +) +rmq.setup(app) + +if __name__ == '__main__': + try: + app.run() + except KeyboardInterrupt: + print('EXIT FROM APP =========') diff --git a/examples/brokers/nav_redis_consumer.py b/examples/brokers/nav_redis_consumer.py new file mode 100644 index 0000000..c56b902 --- /dev/null +++ b/examples/brokers/nav_redis_consumer.py @@ -0,0 +1,22 @@ +from navigator import Application +from navigator.brokers.redis import RedisConsumer + + +async def redis_callback(*args, **kwargs): + # Handle your SQS callback here + print('Received Message:', args, kwargs) + +app = Application( + port=5001 +) + +rmq = RedisConsumer( + callback=redis_callback +) +rmq.setup(app) + +if __name__ == '__main__': + try: + app.run() + except KeyboardInterrupt: + print('EXIT FROM APP =========') diff --git a/examples/brokers/nav_sqs_consumer.py b/examples/brokers/nav_sqs_consumer.py new file mode 100644 index 0000000..b567f87 --- /dev/null +++ b/examples/brokers/nav_sqs_consumer.py @@ -0,0 +1,22 @@ +from navigator import Application +from navigator.brokers.sqs import SQSConsumer + + +async def sqs_callback(*args, **kwargs): + # Handle your SQS callback here + print('Received Message:', args, kwargs) + +app = Application( + port=5001 +) + +sqs = SQSConsumer( + callback=sqs_callback +) +sqs.setup(app) + +if __name__ == '__main__': + try: + app.run() + except KeyboardInterrupt: + print('EXIT FROM APP =========') diff --git a/examples/test_sqs_producer.py b/examples/test_sqs_producer.py index 43026bc..17654f4 100644 --- a/examples/test_sqs_producer.py +++ b/examples/test_sqs_producer.py @@ -28,19 +28,19 @@ async def main(): ) async with connection as sqs: # Create an SQS Queue - queue_name = "MainEvent" + queue_name = "navigator" print(f"Creating queue: {queue_name}") queue = await sqs.create_queue(queue_name) queue_url = queue.url print(f"Queue URL: {queue_url}") # # Publish a JSON Message - # await sqs.publish_message("MyTestQueue", {"key": "value"}) + # await sqs.publish_message({"key": "value"}, "MyTestQueue") # # Publish JSONPickle # model = ExampleModel(name="John Doe", age=30) - # await sqs.publish_message("MyTestQueue", model) + # await sqs.publish_message(model, "MyTestQueue") # # Dataclasses: # mdl = Example(name="John Doe", age=30) - # await sqs.publish_message("MyTestQueue", mdl) + # await sqs.publish_message(mdl, "MyTestQueue") # # Publish CloudPickle # class CustomWrapper: @@ -48,7 +48,7 @@ async def main(): # self.data = data # wrapper = CustomWrapper(data={"nested_key": "nested_value"}) - # await sqs.publish_message("MyTestQueue", wrapper) + # await sqs.publish_message(wrapper, "MyTestQueue") form = { "metadata": { @@ -70,7 +70,7 @@ async def main(): } } # Publish plain text - await sqs.publish_message("MainEvent", form) + await sqs.publish_message(form, "navigator") if __name__ == "__main__": try: diff --git a/navigator/applications/base.pyx b/navigator/applications/base.pyx index 085a8a6..246303d 100644 --- a/navigator/applications/base.pyx +++ b/navigator/applications/base.pyx @@ -31,8 +31,8 @@ cdef class BaseApplication: ### Application handler: self.handler = None self.description: str = description - self.host = APP_HOST - self.port = APP_PORT + self.host = kwargs.pop('host', APP_HOST) + self.port = kwargs.pop('port', APP_PORT) self.path = None self.title = title if title else APP_NAME self.contact = contact diff --git a/navigator/brokers/connection.py b/navigator/brokers/connection.py index 4a399da..352d048 100644 --- a/navigator/brokers/connection.py +++ b/navigator/brokers/connection.py @@ -5,7 +5,9 @@ from collections.abc import Awaitable, Callable from abc import ABC, abstractmethod import asyncio +from aiohttp import web from navconfig.logging import logging +from navigator.applications.base import BaseApplication from .pickle import DataSerializer @@ -16,7 +18,8 @@ class BaseConnection(ABC): def __init__( self, - credentials: Union[str, dict], + *args, + credentials: Union[str, dict] = None, timeout: Optional[int] = 5, **kwargs ): @@ -33,6 +36,7 @@ def __init__( self.reconnect_delay = 1 # Initial delay in seconds self._lock = asyncio.Lock() self._serializer = DataSerializer() + super().__init__(*args, **kwargs) def get_connection(self) -> Optional[Union[Callable, Awaitable]]: if not self._connection: @@ -73,9 +77,8 @@ async def ensure_connection(self) -> None: @abstractmethod async def publish_message( self, - exchange_name: str, - routing_key: str, body: Union[str, list, dict, Any], + queue_name: Optional[str] = None, **kwargs ) -> None: """ @@ -105,3 +108,27 @@ async def process_message( Process a message from the Broker Service. """ raise NotImplementedError + + async def start(self, app: web.Application) -> None: + await self.connect() + + async def stop(self, app: web.Application) -> None: + # close the RabbitMQ connection + await self.disconnect() + + def setup(self, app: web.Application = None) -> None: + """ + Setup Broker Connection. + """ + if isinstance(app, BaseApplication): + self.app = app.get_app() + else: + self.app = app + if self.app is None: + raise ValueError( + 'App is not defined.' + ) + # Initialize the Producer instance. + self.app.on_startup.append(self.start) + self.app.on_shutdown.append(self.stop) + self.app[self._name_] = self diff --git a/navigator/brokers/consumer.py b/navigator/brokers/consumer.py index a06443b..62346f6 100644 --- a/navigator/brokers/consumer.py +++ b/navigator/brokers/consumer.py @@ -1,12 +1,9 @@ from abc import ABC, abstractmethod from typing import Awaitable, Callable, Union, Optional, Any -from aiohttp import web from navconfig.logging import logging -from navigator.applications.base import BaseApplication -from .connection import BaseConnection -class BrokerConsumer(BaseConnection, ABC): +class BrokerConsumer(ABC): """ Broker Consumer Interface. """ @@ -14,54 +11,13 @@ class BrokerConsumer(BaseConnection, ABC): def __init__( self, - credentials: Union[str, dict], - timeout: Optional[int] = 5, callback: Optional[Union[Awaitable, Callable]] = None, **kwargs ): self._queue_name = kwargs.get('queue_name', 'navigator') - super(BrokerConsumer, self).__init__(credentials, timeout, **kwargs) self.logger = logging.getLogger('Broker.Consumer') self._callback_ = callback if callback else self.subscriber_callback - @abstractmethod - async def connect(self): - """ - Connect to Broker. - """ - pass - - @abstractmethod - async def disconnect(self): - """ - Disconnect from Broker. - """ - pass - - async def start(self, app: web.Application) -> None: - await self.connect() - - async def stop(self, app: web.Application) -> None: - # close the RabbitMQ connection - await self.disconnect() - - def setup(self, app: web.Application = None) -> None: - """ - Setup BrokerManager. - """ - if isinstance(app, BaseApplication): - self.app = app.get_app() - else: - self.app = app - if self.app is None: - raise ValueError( - 'App is not defined.' - ) - # Initialize the Producer instance. - self.app.on_startup.append(self.start) - self.app.on_shutdown.append(self.stop) - self.app[self._name_] = self - @abstractmethod async def event_subscribe( self, diff --git a/navigator/brokers/producer.py b/navigator/brokers/producer.py index a1799cc..6f57e99 100644 --- a/navigator/brokers/producer.py +++ b/navigator/brokers/producer.py @@ -1,4 +1,4 @@ -from abc import ABC, abstractmethod +from abc import ABC from typing import Awaitable, Callable, Union, Optional, Any import asyncio from functools import wraps @@ -107,9 +107,9 @@ async def stop(self, app: web.Application) -> None: async def queue_event( self, - exchange: str, - routing_key: str, body: str, + queue_name: str, + routing_key: Optional[str] = None, **kwargs ) -> None: """ @@ -118,9 +118,9 @@ async def queue_event( try: self.event_queue.put_nowait( { - 'exchange': exchange, - 'routing_key': routing_key, 'body': body, + 'queue_name': queue_name, + 'routing_key': routing_key, **kwargs } ) @@ -133,9 +133,8 @@ async def queue_event( async def publish_event( self, - exchange: str, - routing_key: str, body: str, + queue_name: str, **kwargs ) -> None: """ @@ -143,9 +142,8 @@ async def publish_event( """ # Ensure the exchange exists before publishing await self.publish_message( - exchange=exchange, - routing_key=routing_key, body=body, + queue_name=queue_name, **kwargs ) @@ -196,8 +194,8 @@ async def event_publisher( Uses as an REST API to send events to RabbitMQ. """ data = await request.json() - exc = data.get('exchange', 'navigator') - routing_key = data.get('routing_key') + qs = data.pop('queue_name', 'navigator') + routing_key = data.pop('routing_key', None) if not routing_key: return web.json_response( { @@ -206,7 +204,7 @@ async def event_publisher( }, status=422 ) - body = data.get('body') + body = data.pop('body') if not body: return web.json_response( { @@ -216,10 +214,10 @@ async def event_publisher( status=422 ) try: - await self.queue_event(exc, routing_key, body) + await self.queue_event(body, qs, routing_key, **data) return web.json_response({ 'status': 'success', - 'message': f'Event {exc}.{routing_key} Published Successfully.' + 'message': f'Event {qs}.{routing_key} Published Successfully.' }) except asyncio.QueueFull: return web.json_response( @@ -244,19 +242,20 @@ async def _event_broker(self, worker_id: int): event = await self.event_queue.get() try: # data: - routing = event.get('routing_key') - exchange = event.get('exchange') - body = event.get('body') - max_retries = event.get('max_retries', 5) + routing = event.pop('routing_key') + queue_name = event.pop('queue_name') + body = event.pop('body') + max_retries = event.pop('max_retries', 5) retry_count = 0 retry_delay = 1 while True: try: # Publish the event to RabbitMQ await self.publish_message( - exchange_name=exchange, + body=body, + queue_name=queue_name, routing_key=routing, - body=body + **event ) self.logger.info( f"Worker {worker_id} published event: {routing}" diff --git a/navigator/brokers/rabbitmq/__init__.py b/navigator/brokers/rabbitmq/__init__.py index 414bdb0..4256989 100644 --- a/navigator/brokers/rabbitmq/__init__.py +++ b/navigator/brokers/rabbitmq/__init__.py @@ -2,3 +2,5 @@ Using RabbitMQ as Message Broker. """ from .connection import RabbitMQConnection +from .consumer import RMQConsumer +from .producer import RMQProducer diff --git a/navigator/brokers/rabbitmq/connection.py b/navigator/brokers/rabbitmq/connection.py index a50c318..d19ba69 100644 --- a/navigator/brokers/rabbitmq/connection.py +++ b/navigator/brokers/rabbitmq/connection.py @@ -25,7 +25,8 @@ def __init__( **kwargs ): self._dsn = credentials if credentials is not None else rabbitmq_dsn - super().__init__(credentials, timeout=timeout, **kwargs) + print('DSN > ', rabbitmq_dsn) + super().__init__(credentials=credentials, timeout=timeout, **kwargs) self._connection: Optional[AbstractConnection] = None self._channel: Optional[AbstractChannel] = None @@ -184,9 +185,9 @@ async def ensure_exchange( async def publish_message( self, - exchange_name: str, - routing_key: str, body: Union[str, list, dict, Any], + queue_name: str, + routing_key: str, **kwargs ) -> None: """ @@ -194,7 +195,7 @@ async def publish_message( """ await self.ensure_connection() # Ensure the exchange exists before publishing - await self.ensure_exchange(exchange_name) + await self.ensure_exchange(queue_name) headers = kwargs.get('headers', {}) headers.setdefault('x-retry', '0') args = { @@ -222,7 +223,7 @@ async def publish_message( try: await self._channel.basic_publish( body.encode('utf-8'), - exchange=exchange_name, + exchange=queue_name, routing_key=routing_key, properties=aiormq.spec.Basic.Properties( **properties_kwargs diff --git a/navigator/brokers/rabbitmq/consumer.py b/navigator/brokers/rabbitmq/consumer.py index 96222e3..0adc817 100644 --- a/navigator/brokers/rabbitmq/consumer.py +++ b/navigator/brokers/rabbitmq/consumer.py @@ -16,7 +16,7 @@ logging.getLogger('aiormq').setLevel(logging.INFO) -class RMQConsumer(BrokerConsumer, RabbitMQConnection): +class RMQConsumer(RabbitMQConnection, BrokerConsumer): """ RMQConsumer. @@ -26,7 +26,7 @@ class RMQConsumer(BrokerConsumer, RabbitMQConnection): def __init__( self, - credentials: Union[str, dict], + credentials: Union[str, dict] = None, timeout: Optional[int] = 5, callback: Optional[Union[Awaitable, Callable]] = None, **kwargs @@ -34,7 +34,15 @@ def __init__( self._routing_key = kwargs.get('routing_key', '*') self._exchange_type = kwargs.get('exchange_type', 'topic') self._exchange_name = kwargs.get('exchange_name', 'navigator') - super(BrokerConsumer, self).__init__(credentials, timeout, **kwargs) + self._queue_name = kwargs.get('queue_name', None) + if self._queue_name: + self._exchange_name = self._queue_name + super().__init__( + credentials=credentials, + timeout=timeout, + callback=callback, + **kwargs + ) self.logger = logging.getLogger('RMQConsumer') async def subscriber_callback( diff --git a/navigator/brokers/rabbitmq/producer.py b/navigator/brokers/rabbitmq/producer.py index 3428266..bd9be3d 100644 --- a/navigator/brokers/rabbitmq/producer.py +++ b/navigator/brokers/rabbitmq/producer.py @@ -6,7 +6,7 @@ from typing import Union, Optional from navconfig.logging import logging from .connection import RabbitMQConnection -from .producer import BrokerProducer +from ..producer import BrokerProducer # Disable Debug Logging for AIORMQ @@ -18,7 +18,7 @@ class RMQProducer(BrokerProducer, RabbitMQConnection): RMQProducer is the Producer functionality for RabbitMQ using aiormq. Args: - dsn: RabbitMQ DSN. + credentials: RabbitMQ DSN. queue_size: Size of Asyncio Queue for enqueuing messages before send. num_workers: Number of workers to process the queue. timeout: Timeout for RabbitMQ Connection. diff --git a/navigator/brokers/redis/__init__.py b/navigator/brokers/redis/__init__.py new file mode 100644 index 0000000..7c6ed56 --- /dev/null +++ b/navigator/brokers/redis/__init__.py @@ -0,0 +1,6 @@ +"""Redis as Message Broker using Streams. + +""" +from .connection import RedisConnection +from .consumer import RedisConsumer +from .producer import RedisProducer diff --git a/navigator/brokers/redis/connection.py b/navigator/brokers/redis/connection.py new file mode 100644 index 0000000..37b0f5b --- /dev/null +++ b/navigator/brokers/redis/connection.py @@ -0,0 +1,284 @@ +""" +Redis interface (connection and disconnections) using Redis Streams. +""" +from typing import Optional, Union, Any, Dict +from collections.abc import Awaitable, Callable +import time +import asyncio +from dataclasses import is_dataclass +from redis import asyncio as aioredis +from datamodel import Model, BaseModel +from navconfig.logging import logging +from navigator.libs.json import json_encoder, json_decoder +from navigator.exceptions import ValidationError +from ..connection import BaseConnection +from ..wrapper import BaseWrapper +from ...conf import ( + REDIS_BROKER_HOST, + REDIS_BROKER_PORT, + REDIS_BROKER_PASSWORD, + REDIS_BROKER_DB, + REDIS_BROKER_URL +) + +class RedisConnection(BaseConnection): + """ + Manages connection and operations with Redis using Redis Streams. + """ + def __init__( + self, + credentials: Union[str, dict] = None, + timeout: Optional[int] = 5, + **kwargs + ): + self._name_ = self.__class__.__name__ + if not credentials: + credentials = {} + credentials['host'] = REDIS_BROKER_HOST + credentials['port'] = REDIS_BROKER_PORT + credentials['password'] = REDIS_BROKER_PASSWORD + credentials['db'] = REDIS_BROKER_DB + super().__init__(credentials=credentials, timeout=timeout, **kwargs) + self._connection: Optional[aioredis.Redis] = None + self.logger = logging.getLogger('RedisConnection') + self._group_name = kwargs.get('group_name', 'default_group') + self._consumer_name = kwargs.get('consumer_name', 'default_consumer') + self._queue_name = kwargs.get('queue_name', 'message_stream') + + async def connect(self): + """ + Establish connection with Redis. + """ + if self._connection: + return + try: + self.logger.info("Connecting to Redis...") + self._connection = aioredis.Redis( + **self._credentials, + decode_responses=True, + encoding='utf-8' + ) + # Ensure that the group exists + await self.ensure_group_exists() + self.logger.info("Connected to Redis.") + except Exception as e: + self.logger.error(f"Failed to connect to Redis: {e}") + raise + + async def ensure_connection(self) -> None: + """ + Ensures that the connection is active. + """ + if self._connection is None: + await self.connect() + + async def disconnect(self): + """ + Disconnect from Redis. + """ + self.logger.info("Disconnecting from Redis...") + if self._connection: + try: + await self._connection.close() + except Exception as e: + self.logger.error(f"Error closing Redis connection: {e}") + self._connection = None + self.logger.info("Disconnected from Redis.") + + async def ensure_group_exists(self): + """ + Ensure the consumer group exists for the stream. + """ + try: + # Create the stream if it doesn't exist + stream_exists = await self._connection.exists(self._queue_name) + if not stream_exists: + await self._connection.xadd(self._queue_name, {'initial': 'message'}) + # Try to create the group. This will fail if the group already exists. + await self._connection.xgroup_create( + name=self._queue_name, + groupname=self._group_name, + id='0', + mkstream=True + ) + self.logger.info(f"Group '{self._group_name}' created on stream '{self._queue_name}'.") + except aioredis.ResponseError as e: + if "BUSYGROUP Consumer Group name already exists" in str(e): + self.logger.info( + f"Group '{self._group_name}' already exists." + ) + else: + self.logger.error( + f"Error creating group '{self._group_name}': {e}" + ) + raise + try: + # create the consumer: + await self._connection.xgroup_createconsumer( + self._queue_name, self._group_name, self._consumer_name + ) + self.logger.debug( + f":: Creating Consumer {self._consumer_name} on Stream {self._queue_name}" + ) + except Exception as exc: + self.logger.exception(exc, stack_info=True) + raise + + async def publish_message( + self, + body: Union[str, list, dict, Any], + queue_name: Optional[str] = None, + **kwargs + ): + """ + Publish a message to the specified Redis Stream. + """ + stream = queue_name or self._queue_name + try: + message_data = {} + # Determine serialization method based on the type of 'body' + if isinstance(body, (int, float, bool, None.__class__)): + # Use msgpack for primitives + packed_body = self._serializer.pack(body) + content_type = "application/msgpack" + elif isinstance(body, bytes): + # Use msgpack for raw bytes + packed_body = self._serializer.pack(body) + content_type = "application/msgpack" + elif isinstance(body, (dict, list)): + # Use JSON for dictionaries + packed_body = json_encoder(body) + content_type = "application/json" + elif is_dataclass(body) or isinstance(body, (Model, BaseModel)): + # JSONPickle serialization for dataclasses or BaseModel + packed_body = self._serializer.serialize(body) + content_type = "application/cloudpickle" + elif isinstance(body, BaseWrapper): + # CloudPickle serialization for BaseWrapper + packed_body = self._serializer.serialize(body) + content_type = "application/cloudpickle" + elif hasattr(body, "__class__") and not isinstance(body, (str, bytes)): + # CloudPickle serialization for other custom objects + packed_body = self._serializer.serialize(body) + content_type = "application/cloudpickle" + else: + # Fallback to plain text for str and other simple types + packed_body = str(body) + content_type = "text/plain" + + # TODO: add base64 encoding for binary data + message_data['body'] = packed_body + message_data['ContentType'] = content_type + + await self._connection.xadd(stream, message_data, nomkstream=False) + self.logger.info(f"Message published to stream '{stream}'.") + except Exception as e: + self.logger.error(f"Failed to publish message to stream '{stream}': {e}") + raise + + async def process_message(self, message_data: Dict[bytes, bytes]): + """ + Process the message received by the consumer. + """ + body = message_data.get('body') + content_type = message_data.get('ContentType', 'text/plain') + try: + if content_type == 'application/json': + return json_decoder(body) + elif content_type == "application/msgpack": + return self._serializer.unpack(body) + elif content_type == 'application/jsonpickle': + try: + return self._serializer.decode(body) + except Exception as e: + self.logger.error(f"Error decoding JSONPickle message: {e}") + return body + elif content_type == 'application/cloudpickle': + return self._serializer.unserialize(body) + elif content_type == 'text/plain': + return body + else: + self.logger.warning( + f"Unknown content type: {content_type}. Returning raw body." + ) + return body + except ValidationError: + self.logger.warning("Error decoding message.") + return message_data.get(b'body') + except Exception as e: + self.logger.error(f"Failed to process message: {e}") + raise + + async def consume_messages( + self, + queue_name: Optional[str], + callback: Callable[[Dict[str, Any], str], Awaitable[None]], + count: int = 1, + block: int = 1000, + **kwargs + ): + """ + Consume messages from the specified Redis Stream and process them with the callback. + """ + stream = queue_name or self._queue_name + consumer_name = kwargs.get('consumer_name', self._consumer_name) + try: + # Clean up old messages before starting + await self.cleanup_old_messages(stream) + while True: + response = await self._connection.xreadgroup( + groupname=self._group_name, + consumername=consumer_name, + streams={stream: '>'}, + count=count, + block=block + ) + if not response: + await asyncio.sleep(1) + continue + for _, messages in response: + for message_id, message_data in messages: + try: + processed_message = await self.process_message(message_data) + data = { + "message_id": message_id, + "data": message_data + } + if asyncio.iscoroutinefunction(callback): + await callback(data, processed_message) + else: + callback(data, processed_message) + # Acknowledge the message + await self._connection.xack(stream, self._group_name, message_id) + self.logger.info( + f"Message {message_id} acknowledged." + ) + except Exception as e: + self.logger.error( + f"Error processing message {message_id}: {e}" + ) + except (asyncio.CancelledError, KeyboardInterrupt): + self.logger.info( + "Message consumption cancelled. Cleaning up..." + ) + raise + except Exception as e: + self.logger.error(f"Error consuming messages from stream '{stream}': {e}") + raise + + async def cleanup_old_messages(self, stream): + """Removes messages older than 7 days from the stream.""" + try: + # Calculate the timestamp for 7 days ago + seven_days_ago = int((time.time() - 7 * 24 * 60 * 60) * 1000) + # Convert it to a Redis Stream ID format (timestamp-part-sequence) + seven_days_ago_id = f"{seven_days_ago}-0" + # Use XTRIM with minid to remove messages older than the calculated timestamp + await self._connection.xtrim(stream, minid=seven_days_ago_id) + self.logger.info( + f"Cleaned up old messages from stream {stream}" + ) + except Exception as e: + self.logger.error( + f"Error cleaning up old messages: {e}" + ) diff --git a/navigator/brokers/redis/consumer.py b/navigator/brokers/redis/consumer.py new file mode 100644 index 0000000..3309fab --- /dev/null +++ b/navigator/brokers/redis/consumer.py @@ -0,0 +1,146 @@ +""" +Redis Consumer. + +Can be used to consume messages from Redis Streams. +""" +from typing import Union, Optional, Any +from collections.abc import Callable, Awaitable +import asyncio +from aiohttp import web +from navconfig.logging import logging +from .connection import RedisConnection +from ..consumer import BrokerConsumer + + +class RedisConsumer(RedisConnection, BrokerConsumer): + """ + RedisConsumer. + + Broker Client (Consumer) using Redis Streams. + """ + _name_: str = "redis_consumer" + + def __init__( + self, + credentials: Union[str, dict] = None, + timeout: Optional[int] = 5, + callback: Optional[Union[Awaitable, Callable]] = None, + **kwargs + ): + self._queue_name = kwargs.get('queue_name', 'message_stream') + self._group_name = kwargs.get('group_name', 'default_group') + self._consumer_name = kwargs.get('consumer_name', 'default_consumer') + super().__init__( + credentials=credentials, + timeout=timeout, + callback=callback, + queue_name=self._queue_name, + group_name=self._group_name, + consumer_name=self._consumer_name, + **kwargs + ) + self.logger = logging.getLogger('RedisConsumer') + self.consumer_task: Optional[asyncio.Task] = None + self._callback_ = callback if callback else self.subscriber_callback + + async def subscriber_callback( + self, + message_id: str, + body: Any + ) -> None: + """ + Default Callback for Event Subscription. + """ + try: + print(f"Received message ID: {message_id}") + print(f"Received Body: {body}") + self.logger.info(f'Received Message ID: {message_id} Body: {body}') + except Exception as e: + self.logger.error(f"Error in subscriber_callback: {e}") + raise + + def wrap_callback( + self, + callback: Callable[[Any, Any], Awaitable[None]], + ) -> Callable[[Any, Any], Awaitable[None]]: + """ + Wraps the user-provided callback for message handling. + """ + async def wrapped_callback(message_id, body): + try: + if asyncio.iscoroutinefunction(callback): + await callback(message_id, body) + else: + callback(message_id, body) + except Exception as e: + self.logger.error(f"Error processing message {message_id}: {e}") + return wrapped_callback + + async def event_subscribe( + self, + queue_name: Optional[str], + callback: Union[Callable, Awaitable], + **kwargs + ) -> None: + """Event Subscribe.""" + await self.consume_messages( + queue_name=queue_name, + callback=self.wrap_callback(callback), + **kwargs + ) + + async def subscribe_to_events( + self, + queue_name: Optional[str], + callback: Union[Callable, Awaitable], + **kwargs + ) -> None: + """ + Subscribe to events from a specific Stream. + """ + # Declare the stream and ensure group exists + await self.ensure_connection() + try: + self.logger.info(f"Starting Redis consumer for stream: {queue_name}") + self.consumer_task = asyncio.create_task( + self.consume_messages( + queue_name=queue_name, + callback=callback, + **kwargs + ) + ) + except Exception as e: + self.logger.error(f"Error subscribing to events: {e}") + raise + + async def stop_consumer(self): + """ + Stop the Redis consumer task gracefully. + """ + if self.consumer_task: + self.logger.info("Stopping Redis consumer...") + self.consumer_task.cancel() # Cancel the task + try: + await self.consumer_task # Await task cancellation + except asyncio.CancelledError: + self.logger.info("Redis consumer task cancelled.") + self.consumer_task = None + + async def start(self, app: web.Application) -> None: + """Signal Function to be called when the application is started. + + Connect to Redis, and start consuming. + """ + await super().start(app) + await self.subscribe_to_events( + queue_name=self._queue_name, + callback=self._callback_ + ) + + async def stop(self, app: web.Application) -> None: + """Signal Function to be called when the application is stopped. + + Stop consuming and disconnect from Redis. + """ + await self.stop_consumer() + await super().stop(app) diff --git a/navigator/brokers/redis/producer.py b/navigator/brokers/redis/producer.py new file mode 100644 index 0000000..63509ae --- /dev/null +++ b/navigator/brokers/redis/producer.py @@ -0,0 +1,38 @@ +""" +Redis Producer Module. + +can be used to send messages to Redis Streams. +""" +from typing import Union, Optional +from .connection import RedisConnection +from ..producer import BrokerProducer + + +class RedisProducer(RedisConnection, BrokerProducer): + """RedisProducer. + + RedisProducer is the Producer functionality for Message Queue using Redis Streams. + + Args: + credentials: dictionary of redis credentials. + queue_size: Size of Asyncio Queue for enqueuing messages before send. + num_workers: Number of workers to process the queue. + timeout: Timeout for Redis Connection. + """ + _name_: str = "redis_producer" + + def __init__( + self, + credentials: Union[str, dict], + queue_size: Optional[int] = None, + num_workers: Optional[int] = 4, + timeout: Optional[int] = 5, + **kwargs + ): + super(RedisProducer, self).__init__( + credentials=credentials, + queue_size=queue_size, + num_workers=num_workers, + timeout=timeout, + **kwargs + ) diff --git a/navigator/brokers/sqs/__init__.py b/navigator/brokers/sqs/__init__.py index 1d16d7e..ca4c418 100644 --- a/navigator/brokers/sqs/__init__.py +++ b/navigator/brokers/sqs/__init__.py @@ -4,3 +4,5 @@ Using Amazon SQS as a Message Broker. """ from .connection import SQSConnection +from .consumer import SQSConsumer +from .producer import SQSProducer diff --git a/navigator/brokers/sqs/connection.py b/navigator/brokers/sqs/connection.py index b9d6deb..45d6e94 100644 --- a/navigator/brokers/sqs/connection.py +++ b/navigator/brokers/sqs/connection.py @@ -5,6 +5,7 @@ from collections.abc import Awaitable, Callable from dataclasses import is_dataclass import asyncio +from asyncio import Task import aioboto3 from datamodel import Model, BaseModel from navconfig import config @@ -28,17 +29,19 @@ class SQSConnection(BaseConnection): def __init__( self, - credentials: Union[str, dict], + credentials: Union[str, dict] = None, timeout: Optional[int] = 5, **kwargs ): if not credentials: + credentials = {} credentials['aws_access_key_id'] = config.get('AWS_KEY') credentials['aws_secret_access_key'] = config.get('AWS_SECRET') credentials['region_name'] = config.get('AWS_REGION') - super().__init__(credentials, timeout, **kwargs) + super().__init__(credentials=credentials, timeout=timeout, **kwargs) self._connection = None self._session = None + self.consumer_task: Optional[Task] = None async def connect(self): """ @@ -119,16 +122,16 @@ async def ensure_queue(self, queue_name: str, attributes: Optional[dict] = None) async def publish_message( self, + body: Union[str, list, dict, Any], queue_name: str, - body: Union[str, dict, list, Any], - attributes: Optional[dict] = None, + **kwargs ): """ Publish a message to the specified queue. """ try: queue = await self.ensure_queue(queue_name) - message_attributes = attributes or {} + message_attributes = kwargs.get('attributes', {}) # Determine serialization method based on the type of 'body' if isinstance(body, (int, float, bool, None.__class__)): diff --git a/navigator/brokers/sqs/consumer.py b/navigator/brokers/sqs/consumer.py new file mode 100644 index 0000000..08603c6 --- /dev/null +++ b/navigator/brokers/sqs/consumer.py @@ -0,0 +1,133 @@ +""" +RabbitMQ Consumer. + +can be used to consume messages from RabbitMQ. +""" +from typing import Union, Optional, Any +from collections.abc import Callable, Awaitable +import asyncio +from aiohttp import web +from navconfig.logging import logging +from .connection import SQSConnection +from ..consumer import BrokerConsumer + + +class SQSConsumer(SQSConnection, BrokerConsumer): + """ + SQSConsumer. + + Broker Client (Consumer) using Amazon AWS SQS. + """ + _name_: str = "sqs_consumer" + + def __init__( + self, + credentials: Union[str, dict] = None, + timeout: Optional[int] = 5, + callback: Optional[Union[Awaitable, Callable]] = None, + **kwargs + ): + self._queue_name = kwargs.get('queue_name', 'navigator') + super().__init__( + credentials=credentials, + timeout=timeout, + callback=callback, + **kwargs + ) + self.logger = logging.getLogger('SQSConsumer') + + async def subscriber_callback( + self, + message: Any, + body: str + ) -> None: + """ + Default Callback for Event Subscription. + """ + try: + print(f"Received message: {message}") + print(f"Received Body: {body}") + self.logger.info(f'Received Message: {body}') + except Exception as e: + self.logger.error( + f"Error in subscriber_callback: {e}" + ) + raise + + async def event_subscribe( + self, + queue_name: str, + callback: Union[Callable, Awaitable] + ) -> None: + """Event Subscribe. + """ + await self.consume_messages( + queue_name=queue_name, + callback=self.wrap_callback(callback) + ) + + async def subscribe_to_events( + self, + queue_name: str, + callback: Union[Callable, Awaitable], + max_messages: int = 10, + wait_time: int = 10, + idle_sleep: int = 5, + **kwargs + ) -> None: + """ + Subscribe to events from a specific Queue. + """ + # Declare the queue + await self.ensure_connection() + try: + self.logger.notice( + f"Starting SQS consumer for queue: {queue_name}" + ) + self.consumer_task = asyncio.create_task( + self.consume_messages( + queue_name=queue_name, + callback=callback, + max_messages=max_messages, + wait_time=wait_time, + idle_sleep=idle_sleep, + **kwargs + ) + ) + except Exception as e: + self.logger.error( + f"Error subscribing to events: {e}" + ) + raise + + async def start(self, app: web.Application) -> None: + """Signal Function to be called when the application is started. + + Connect to RabbitMQ, and start consuming. + """ + await super().start(app) + await self.subscribe_to_events( + queue_name=self._queue_name, + callback=self._callback_ + ) + + async def stop(self, app: web.Application) -> None: + """Signal Function to be called when the application is stopped. + + Stop consuming and disconnect from SQS. + """ + await self.stop_consumer() + await super().stop(app) + + async def stop_consumer(self): + """ + Stop the SQS consumer task gracefully. + """ + if self.consumer_task: + self.logger.info("Stopping SQS consumer...") + self.consumer_task.cancel() # Cancel the task + try: + await self.consumer_task # Await task cancellation + except asyncio.CancelledError: + self.logger.info("SQS consumer task cancelled.") + self.consumer_task = None diff --git a/navigator/brokers/sqs/producer.py b/navigator/brokers/sqs/producer.py new file mode 100644 index 0000000..446442f --- /dev/null +++ b/navigator/brokers/sqs/producer.py @@ -0,0 +1,38 @@ +""" +RabbitMQ Producer Module. + +can be used to send messages to RabbitMQ. +""" +from typing import Union, Optional +from .connection import SQSConnection +from ..producer import BrokerProducer + + +class SQSProducer(SQSConnection, BrokerProducer): + """SQSProducer. + + SQSProducer is the Producer functionality for Message Queue using AWS SQS. + + Args: + credentials: AWS Credentials. + queue_size: Size of Asyncio Queue for enqueuing messages before send. + num_workers: Number of workers to process the queue. + timeout: Timeout for RabbitMQ Connection. + """ + _name_: str = "sqs_producer" + + def __init__( + self, + credentials: Union[str, dict], + queue_size: Optional[int] = None, + num_workers: Optional[int] = 4, + timeout: Optional[int] = 5, + **kwargs + ): + super(SQSProducer, self).__init__( + credentials=credentials, + queue_size=queue_size, + num_workers=num_workers, + timeout=timeout, + **kwargs + ) diff --git a/navigator/conf.py b/navigator/conf.py index 195f1ba..5ddad33 100644 --- a/navigator/conf.py +++ b/navigator/conf.py @@ -208,6 +208,36 @@ """ QUEUE_CALLBACK = config.get('QUEUE_CALLBACK', fallback=None) +""" +Brokers: +""" + +""" +RabbitMQ Configuration. +""" +USE_RABBITMQ = config.getboolean('USE_RABBITMQ', fallback=False) +RABBITMQ_HOST = config.get("RABBITMQ_HOST", fallback="localhost") +RABBITMQ_PORT = config.get("RABBITMQ_PORT", fallback=5672) +RABBITMQ_USER = config.get("RABBITMQ_USER", fallback="guest") +RABBITMQ_PASS = config.get("RABBITMQ_PASS", fallback="guest") +RABBITMQ_VHOST = config.get("RABBITMQ_VHOST", fallback="navigator") +# RabbitMQ DSN +rabbitmq_dsn = f"amqp://{RABBITMQ_USER}:{RABBITMQ_PASS}@{RABBITMQ_HOST}:{RABBITMQ_PORT}/{RABBITMQ_VHOST}" +BROKER_MANAGER_QUEUE_SIZE = config.getint( + "BROKER_MANAGER_QUEUE_SIZE", + fallback=4 +) + +""" +Redis Configuration. +""" +REDIS_BROKER_HOST = config.get("REDIS_BROKER_HOST", fallback=CACHE_HOST) +REDIS_BROKER_PORT = config.get("REDIS_BROKER_PORT", fallback=CACHE_PORT) +REDIS_BROKER_PASSWORD = config.get("REDIS_BROKER_PASSWORD", fallback=None) +REDIS_BROKER_DB = config.get("REDIS_BROKER_DB", fallback=CACHE_DB) +REDIS_BROKER_URL = f"redis://{REDIS_BROKER_HOST}:{REDIS_BROKER_PORT}/{REDIS_BROKER_DB}" + + ### Zammad Integration via Actions: # Zammad: ZAMMAD_INSTANCE = config.get('ZAMMAD_INSTANCE') diff --git a/navigator/navigator.py b/navigator/navigator.py index 4d9ca06..1a2ac3e 100644 --- a/navigator/navigator.py +++ b/navigator/navigator.py @@ -109,13 +109,13 @@ def __init__( cls = import_module("navigator.handlers.types", package=default_handler) app_obj = getattr(cls, default_handler) # create an instance of AppHandler - self.handler: BaseAppHandler = app_obj(Context, evt=self._loop, **kwargs) + self.handler: BaseAppHandler = app_obj(Context, evt=self._loop) except ImportError as ex: raise NavException( f"Cannot Import default App Handler {default_handler}: {ex}" ) from ex else: - self.handler: BaseAppHandler = handler(Context, evt=self._loop, **kwargs) + self.handler: BaseAppHandler = handler(Context, evt=self._loop) def setup_app(self) -> WebApp: app = self.handler.app From 25863bb9e2b2f926ac1c1a6e284c172e78a027d4 Mon Sep 17 00:00:00 2001 From: Jesus Lara Date: Mon, 2 Dec 2024 03:48:49 +0100 Subject: [PATCH 3/3] new infra for message brokers --- navigator/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/navigator/version.py b/navigator/version.py index f2b4ea3..e1ecbdb 100644 --- a/navigator/version.py +++ b/navigator/version.py @@ -4,7 +4,7 @@ __description__ = ( "Navigator Web Framework based on aiohttp, " "with batteries included." ) -__version__ = "2.12.3" +__version__ = "2.12.4" __author__ = "Jesus Lara" __author_email__ = "jesuslarag@gmail.com" __license__ = "BSD"