From c40c7f9fe34eae5f7842735176daa655ad7315dc Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Fri, 15 Dec 2023 11:04:31 +0100 Subject: [PATCH] Add a grpc streaming helper Signed-off-by: Sahas Subramanian --- pyproject.toml | 4 + .../base/grpc_streaming_helper/__init__.py | 8 ++ .../_grpc_streaming_helper.py | 103 ++++++++++++++++++ 3 files changed, 115 insertions(+) create mode 100644 src/frequenz/client/base/grpc_streaming_helper/__init__.py create mode 100644 src/frequenz/client/base/grpc_streaming_helper/_grpc_streaming_helper.py diff --git a/pyproject.toml b/pyproject.toml index f76af1c..31f9a0e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,9 @@ requires-python = ">= 3.11, < 4" # TODO(cookiecutter): Remove and add more dependencies if appropriate dependencies = [ "typing-extensions >= 4.5.0, < 5", + "grpcio >= 1.54.2, < 2", + "grpcio-tools >= 1.54.2, < 2", + "frequenz-channels == 1.0.0b2", ] dynamic = ["version"] @@ -60,6 +63,7 @@ dev-mkdocs = [ dev-mypy = [ "mypy == 1.5.1", "types-Markdown == 3.4.2.10", + "grpc-stubs == 1.24.12", # This dependency introduces breaking changes in patch releases # For checking the noxfile, docs/ script, and tests "frequenz-client-base[dev-mkdocs,dev-noxfile,dev-pytest]", ] diff --git a/src/frequenz/client/base/grpc_streaming_helper/__init__.py b/src/frequenz/client/base/grpc_streaming_helper/__init__.py new file mode 100644 index 0000000..8b210ad --- /dev/null +++ b/src/frequenz/client/base/grpc_streaming_helper/__init__.py @@ -0,0 +1,8 @@ +# License: MIT +# Copyright © 2023 Frequenz Energy-as-a-Service GmbH + +"""Implementation of the streaming helper.""" + +from ._grpc_streaming_helper import GrpcStreamingHelper + +__all__ = ["GrpcStreamingHelper"] diff --git a/src/frequenz/client/base/grpc_streaming_helper/_grpc_streaming_helper.py b/src/frequenz/client/base/grpc_streaming_helper/_grpc_streaming_helper.py new file mode 100644 index 0000000..3b9712b --- /dev/null +++ b/src/frequenz/client/base/grpc_streaming_helper/_grpc_streaming_helper.py @@ -0,0 +1,103 @@ +# License: MIT +# Copyright © 2023 Frequenz Energy-as-a-Service GmbH + +"""Implementation of the grpc streaming helper.""" + +import asyncio +import logging +import typing + +import grpc +from grpc.aio import UnaryStreamCall # type: ignore[import] + +from frequenz import channels + +from .. import retry_strategy + +_logger = logging.getLogger(__name__) + + +_InputT = typing.TypeVar("_InputT") +_OutputT = typing.TypeVar("_OutputT") + + +class GrpcStreamingHelper(typing.Generic[_InputT, _OutputT]): + """Helper class to handle grpc streaming methods.""" + + def __init__( + self, + stream_name: str, + stream_method: typing.Callable[[], UnaryStreamCall[typing.Any, _InputT]], + transform: typing.Callable[[_InputT], _OutputT], + retry_spec: retry_strategy.RetryStrategy | None = None, + ): + """Initialize the streaming helper. + + Args: + stream_name: The name of the streaming method. + stream_method: The streaming method. + transform: A function to transform the input type to the output type. + retry_spec: The retry strategy to use. + """ + self._stream_name = stream_name + self._stream_method = stream_method + self._transform = transform + self._retry_spec = ( + retry_strategy.LinearBackoff() if retry_spec is None else retry_spec.copy() + ) + + self._channel: channels.Broadcast[_OutputT] = channels.Broadcast( + f"GrpcStreamingHelper-{stream_name}" + ) + self._task = asyncio.create_task(self._run()) + + def new_receiver(self, maxsize: int = 50) -> channels.Receiver[_OutputT]: + """Create a new receiver for the stream. + + Returns: + A new receiver. + """ + return self._channel.new_receiver(maxsize=maxsize) + + async def stop(self) -> None: + """Stop the streaming helper.""" + if self._task.done(): + return + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + await self._channel.close() + + async def _run(self) -> None: + """Run the streaming helper.""" + sender = self._channel.new_sender() + + while True: + _logger.debug("Making call to grpc streaming method: %s", self._stream_name) + + try: + call = self._stream_method() + async for msg in call: + await sender.send(self._transform(msg)) + except grpc.aio.AioRpcError: + _logger.exception( + "Error in grpc streaming method: %s", self._stream_name + ) + if interval := self._retry_spec.next_interval(): + _logger.warning( + "`%s`, connection ended, retrying %s in %0.3f seconds.", + self._stream_name, + self._retry_spec.get_progress(), + interval, + ) + await asyncio.sleep(interval) + else: + _logger.warning( + "`%s`, connection ended, retry limit exceeded %s.", + self._stream_name, + self._retry_spec.get_progress(), + ) + await self._channel.close() + break