-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Sahas Subramanian <[email protected]>
- Loading branch information
Showing
3 changed files
with
115 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
# License: MIT | ||
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH | ||
|
||
"""Implementation of the streaming helper.""" | ||
|
||
from ._grpc_streaming_helper import GrpcStreamingHelper | ||
|
||
__all__ = ["GrpcStreamingHelper"] |
103 changes: 103 additions & 0 deletions
103
src/frequenz/client/base/grpc_streaming_helper/_grpc_streaming_helper.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
# License: MIT | ||
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH | ||
|
||
"""Implementation of the grpc streaming helper.""" | ||
|
||
import asyncio | ||
import logging | ||
import typing | ||
|
||
import grpc | ||
from grpc.aio import UnaryStreamCall # type: ignore[import] | ||
|
||
from frequenz import channels | ||
|
||
from .. import retry_strategy | ||
|
||
_logger = logging.getLogger(__name__) | ||
|
||
|
||
_InputT = typing.TypeVar("_InputT") | ||
_OutputT = typing.TypeVar("_OutputT") | ||
|
||
|
||
class GrpcStreamingHelper(typing.Generic[_InputT, _OutputT]): | ||
"""Helper class to handle grpc streaming methods.""" | ||
|
||
def __init__( | ||
self, | ||
stream_name: str, | ||
stream_method: typing.Callable[[], UnaryStreamCall[typing.Any, _InputT]], | ||
transform: typing.Callable[[_InputT], _OutputT], | ||
retry_spec: retry_strategy.RetryStrategy | None = None, | ||
): | ||
"""Initialize the streaming helper. | ||
Args: | ||
stream_name: The name of the streaming method. | ||
stream_method: The streaming method. | ||
transform: A function to transform the input type to the output type. | ||
retry_spec: The retry strategy to use. | ||
""" | ||
self._stream_name = stream_name | ||
self._stream_method = stream_method | ||
self._transform = transform | ||
self._retry_spec = ( | ||
retry_strategy.LinearBackoff() if retry_spec is None else retry_spec.copy() | ||
) | ||
|
||
self._channel: channels.Broadcast[_OutputT] = channels.Broadcast( | ||
f"GrpcStreamingHelper-{stream_name}" | ||
) | ||
self._task = asyncio.create_task(self._run()) | ||
|
||
def new_receiver(self, maxsize: int = 50) -> channels.Receiver[_OutputT]: | ||
"""Create a new receiver for the stream. | ||
Returns: | ||
A new receiver. | ||
""" | ||
return self._channel.new_receiver(maxsize=maxsize) | ||
|
||
async def stop(self) -> None: | ||
"""Stop the streaming helper.""" | ||
if self._task.done(): | ||
return | ||
self._task.cancel() | ||
try: | ||
await self._task | ||
except asyncio.CancelledError: | ||
pass | ||
await self._channel.close() | ||
|
||
async def _run(self) -> None: | ||
"""Run the streaming helper.""" | ||
sender = self._channel.new_sender() | ||
|
||
while True: | ||
_logger.debug("Making call to grpc streaming method: %s", self._stream_name) | ||
|
||
try: | ||
call = self._stream_method() | ||
async for msg in call: | ||
await sender.send(self._transform(msg)) | ||
except grpc.aio.AioRpcError: | ||
_logger.exception( | ||
"Error in grpc streaming method: %s", self._stream_name | ||
) | ||
if interval := self._retry_spec.next_interval(): | ||
_logger.warning( | ||
"`%s`, connection ended, retrying %s in %0.3f seconds.", | ||
self._stream_name, | ||
self._retry_spec.get_progress(), | ||
interval, | ||
) | ||
await asyncio.sleep(interval) | ||
else: | ||
_logger.warning( | ||
"`%s`, connection ended, retry limit exceeded %s.", | ||
self._stream_name, | ||
self._retry_spec.get_progress(), | ||
) | ||
await self._channel.close() | ||
break |