diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index c70a442..316a2bb 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -14,4 +14,4 @@ ## Bug Fixes - +- Fixed retrying for `GrpcStreamBroadcaster` when the retry interval is set to 0 (before it would stop retrying if the interval was set to 0). diff --git a/src/frequenz/client/base/streaming.py b/src/frequenz/client/base/streaming.py index afcb453..3ec1ca6 100644 --- a/src/frequenz/client/base/streaming.py +++ b/src/frequenz/client/base/streaming.py @@ -91,16 +91,8 @@ async def _run(self) -> None: except (GrpcioError, GrpclibError) as err: error = err error_str = f"Error: {error}" if error else "Stream exhausted" - if interval := self._retry_strategy.next_interval(): - _logger.warning( - "%s: connection ended, retrying %s in %0.3f seconds. %s.", - self._stream_name, - self._retry_strategy.get_progress(), - interval, - error_str, - ) - await asyncio.sleep(interval) - else: + interval = self._retry_strategy.next_interval() + if interval is None: _logger.error( "%s: connection ended, retry limit exceeded (%s), giving up. %s.", self._stream_name, @@ -109,3 +101,11 @@ async def _run(self) -> None: ) await self._channel.close() break + _logger.warning( + "%s: connection ended, retrying %s in %0.3f seconds. %s.", + self._stream_name, + self._retry_strategy.get_progress(), + interval, + error_str, + ) + await asyncio.sleep(interval) diff --git a/tests/streaming/test_grpc_stream_broadcaster.py b/tests/streaming/test_grpc_stream_broadcaster.py index ad12c31..a59f60f 100644 --- a/tests/streaming/test_grpc_stream_broadcaster.py +++ b/tests/streaming/test_grpc_stream_broadcaster.py @@ -7,7 +7,7 @@ import logging from collections.abc import AsyncIterator from contextlib import AsyncExitStack -from unittest.mock import MagicMock +from unittest import mock import grpc.aio import grpclib @@ -28,9 +28,9 @@ def receiver_ready_event() -> asyncio.Event: @pytest.fixture -def no_retry() -> MagicMock: +def no_retry() -> mock.MagicMock: """Fixture for mocked, non-retrying retry strategy.""" - mock_retry = MagicMock(spec=retry.Strategy) + mock_retry = mock.MagicMock(spec=retry.Strategy) mock_retry.next_interval.return_value = None mock_retry.copy.return_value = mock_retry mock_retry.get_progress.return_value = "mock progress" @@ -39,7 +39,7 @@ def no_retry() -> MagicMock: @pytest.fixture async def ok_helper( - no_retry: MagicMock, # pylint: disable=redefined-outer-name + no_retry: mock.MagicMock, # pylint: disable=redefined-outer-name receiver_ready_event: asyncio.Event, # pylint: disable=redefined-outer-name ) -> AsyncIterator[streaming.GrpcStreamBroadcaster[int, str]]: """Fixture for GrpcStreamBroadcaster.""" @@ -84,7 +84,7 @@ async def test_streaming_success( ok_helper: streaming.GrpcStreamBroadcaster[ int, str ], # pylint: disable=redefined-outer-name - no_retry: MagicMock, # pylint: disable=redefined-outer-name + no_retry: mock.MagicMock, # pylint: disable=redefined-outer-name receiver_ready_event: asyncio.Event, # pylint: disable=redefined-outer-name caplog: pytest.LogCaptureFixture, ) -> None: @@ -114,7 +114,7 @@ async def test_streaming_success( ] -class _NamedMagicMock(MagicMock): +class _NamedMagicMock(mock.MagicMock): """Mock with a name.""" def __str__(self) -> str: @@ -131,8 +131,8 @@ def __repr__(self) -> str: ( grpc.aio.AioRpcError( code=_NamedMagicMock(name="mock grpcio code"), - initial_metadata=MagicMock(), - trailing_metadata=MagicMock(), + initial_metadata=mock.MagicMock(), + trailing_metadata=mock.MagicMock(), details="mock details", debug_error_string="mock debug_error_string", ), @@ -156,7 +156,7 @@ def __repr__(self) -> str: async def test_streaming_error( # pylint: disable=too-many-arguments successes: int, error_spec: tuple[Exception, str], - no_retry: MagicMock, # pylint: disable=redefined-outer-name + no_retry: mock.MagicMock, # pylint: disable=redefined-outer-name receiver_ready_event: asyncio.Event, # pylint: disable=redefined-outer-name caplog: pytest.LogCaptureFixture, ) -> None: @@ -196,3 +196,77 @@ async def test_streaming_error( # pylint: disable=too-many-arguments f"giving up. Error: {expected_error_str}.", ), ] + + +@pytest.mark.parametrize( + "error_spec", + [ + ( + grpc.aio.AioRpcError( + code=_NamedMagicMock(name="mock grpcio code"), + initial_metadata=mock.MagicMock(), + trailing_metadata=mock.MagicMock(), + details="mock details", + debug_error_string="mock debug_error_string", + ), + "", + ), + ( + grpclib.GRPCError( + status=_NamedMagicMock(name="mock grpclib status"), + message="mock grpclib error", + details="mock grpclib details", + ), + "(mock grpclib status, 'mock grpclib error', 'mock grpclib details')", + ), + ], + ids=["grpcio", "grpclib"], +) +async def test_retry_next_interval_zero( # pylint: disable=too-many-arguments + error_spec: tuple[Exception, str], + receiver_ready_event: asyncio.Event, # pylint: disable=redefined-outer-name + caplog: pytest.LogCaptureFixture, +) -> None: + """Test retry logic when next_interval returns 0.""" + caplog.set_level(logging.WARNING) + error, expected_error_str = error_spec + mock_retry = mock.MagicMock(spec=retry.Strategy) + mock_retry.next_interval.side_effect = [0, None] + mock_retry.copy.return_value = mock_retry + mock_retry.get_progress.return_value = "mock progress" + helper = streaming.GrpcStreamBroadcaster( + stream_name="test_helper", + stream_method=lambda: _ErroringAsyncIter(error, receiver_ready_event), + transform=_transformer, + retry_strategy=mock_retry, + ) + + items: list[str] = [] + async with AsyncExitStack() as stack: + stack.push_async_callback(helper.stop) + + receiver = helper.new_receiver() + receiver_ready_event.set() + async for item in receiver: + items.append(item) + + assert not items + assert mock_retry.next_interval.mock_calls == [mock.call(), mock.call()] + assert caplog.record_tuples == [ + ( + "frequenz.client.base.streaming", + logging.WARNING, + "test_helper: connection ended, retrying mock progress in 0.000 " + f"seconds. Error: {expected_error_str}.", + ), + ( + "frequenz.client.base.streaming", + logging.ERROR, + "test_helper: connection ended, retry limit exceeded (mock progress), " + f"giving up. Error: {expected_error_str}.", + ), + ]