Skip to content

Commit

Permalink
Fix GrpcStreamBroadcaster retry with a 0 interval (#52)
Browse files Browse the repository at this point in the history
When the retry interval returned by the retry strategy is 0,
`GrpcStreamBroadcaster` stops retrying, but it should keep going until
`None` is returned instead.
  • Loading branch information
llucax authored May 13, 2024
2 parents 3148c60 + 6cb9785 commit e43b075
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 20 deletions.
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@

## Bug Fixes

<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
- Fixed retrying for `GrpcStreamBroadcaster` when the retry interval is set to 0 (before it would stop retrying if the interval was set to 0).
20 changes: 10 additions & 10 deletions src/frequenz/client/base/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,8 @@ async def _run(self) -> None:
except (GrpcioError, GrpclibError) as err:
error = err
error_str = f"Error: {error}" if error else "Stream exhausted"
if interval := self._retry_strategy.next_interval():
_logger.warning(
"%s: connection ended, retrying %s in %0.3f seconds. %s.",
self._stream_name,
self._retry_strategy.get_progress(),
interval,
error_str,
)
await asyncio.sleep(interval)
else:
interval = self._retry_strategy.next_interval()
if interval is None:
_logger.error(
"%s: connection ended, retry limit exceeded (%s), giving up. %s.",
self._stream_name,
Expand All @@ -109,3 +101,11 @@ async def _run(self) -> None:
)
await self._channel.close()
break
_logger.warning(
"%s: connection ended, retrying %s in %0.3f seconds. %s.",
self._stream_name,
self._retry_strategy.get_progress(),
interval,
error_str,
)
await asyncio.sleep(interval)
92 changes: 83 additions & 9 deletions tests/streaming/test_grpc_stream_broadcaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import logging
from collections.abc import AsyncIterator
from contextlib import AsyncExitStack
from unittest.mock import MagicMock
from unittest import mock

import grpc.aio
import grpclib
Expand All @@ -28,9 +28,9 @@ def receiver_ready_event() -> asyncio.Event:


@pytest.fixture
def no_retry() -> MagicMock:
def no_retry() -> mock.MagicMock:
"""Fixture for mocked, non-retrying retry strategy."""
mock_retry = MagicMock(spec=retry.Strategy)
mock_retry = mock.MagicMock(spec=retry.Strategy)
mock_retry.next_interval.return_value = None
mock_retry.copy.return_value = mock_retry
mock_retry.get_progress.return_value = "mock progress"
Expand All @@ -39,7 +39,7 @@ def no_retry() -> MagicMock:

@pytest.fixture
async def ok_helper(
no_retry: MagicMock, # pylint: disable=redefined-outer-name
no_retry: mock.MagicMock, # pylint: disable=redefined-outer-name
receiver_ready_event: asyncio.Event, # pylint: disable=redefined-outer-name
) -> AsyncIterator[streaming.GrpcStreamBroadcaster[int, str]]:
"""Fixture for GrpcStreamBroadcaster."""
Expand Down Expand Up @@ -84,7 +84,7 @@ async def test_streaming_success(
ok_helper: streaming.GrpcStreamBroadcaster[
int, str
], # pylint: disable=redefined-outer-name
no_retry: MagicMock, # pylint: disable=redefined-outer-name
no_retry: mock.MagicMock, # pylint: disable=redefined-outer-name
receiver_ready_event: asyncio.Event, # pylint: disable=redefined-outer-name
caplog: pytest.LogCaptureFixture,
) -> None:
Expand Down Expand Up @@ -114,7 +114,7 @@ async def test_streaming_success(
]


class _NamedMagicMock(MagicMock):
class _NamedMagicMock(mock.MagicMock):
"""Mock with a name."""

def __str__(self) -> str:
Expand All @@ -131,8 +131,8 @@ def __repr__(self) -> str:
(
grpc.aio.AioRpcError(
code=_NamedMagicMock(name="mock grpcio code"),
initial_metadata=MagicMock(),
trailing_metadata=MagicMock(),
initial_metadata=mock.MagicMock(),
trailing_metadata=mock.MagicMock(),
details="mock details",
debug_error_string="mock debug_error_string",
),
Expand All @@ -156,7 +156,7 @@ def __repr__(self) -> str:
async def test_streaming_error( # pylint: disable=too-many-arguments
successes: int,
error_spec: tuple[Exception, str],
no_retry: MagicMock, # pylint: disable=redefined-outer-name
no_retry: mock.MagicMock, # pylint: disable=redefined-outer-name
receiver_ready_event: asyncio.Event, # pylint: disable=redefined-outer-name
caplog: pytest.LogCaptureFixture,
) -> None:
Expand Down Expand Up @@ -196,3 +196,77 @@ async def test_streaming_error( # pylint: disable=too-many-arguments
f"giving up. Error: {expected_error_str}.",
),
]


@pytest.mark.parametrize(
"error_spec",
[
(
grpc.aio.AioRpcError(
code=_NamedMagicMock(name="mock grpcio code"),
initial_metadata=mock.MagicMock(),
trailing_metadata=mock.MagicMock(),
details="mock details",
debug_error_string="mock debug_error_string",
),
"<AioRpcError of RPC that terminated with:\n"
"\tstatus = mock grpcio code\n"
'\tdetails = "mock details"\n'
'\tdebug_error_string = "mock debug_error_string"\n'
">",
),
(
grpclib.GRPCError(
status=_NamedMagicMock(name="mock grpclib status"),
message="mock grpclib error",
details="mock grpclib details",
),
"(mock grpclib status, 'mock grpclib error', 'mock grpclib details')",
),
],
ids=["grpcio", "grpclib"],
)
async def test_retry_next_interval_zero( # pylint: disable=too-many-arguments
error_spec: tuple[Exception, str],
receiver_ready_event: asyncio.Event, # pylint: disable=redefined-outer-name
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test retry logic when next_interval returns 0."""
caplog.set_level(logging.WARNING)
error, expected_error_str = error_spec
mock_retry = mock.MagicMock(spec=retry.Strategy)
mock_retry.next_interval.side_effect = [0, None]
mock_retry.copy.return_value = mock_retry
mock_retry.get_progress.return_value = "mock progress"
helper = streaming.GrpcStreamBroadcaster(
stream_name="test_helper",
stream_method=lambda: _ErroringAsyncIter(error, receiver_ready_event),
transform=_transformer,
retry_strategy=mock_retry,
)

items: list[str] = []
async with AsyncExitStack() as stack:
stack.push_async_callback(helper.stop)

receiver = helper.new_receiver()
receiver_ready_event.set()
async for item in receiver:
items.append(item)

assert not items
assert mock_retry.next_interval.mock_calls == [mock.call(), mock.call()]
assert caplog.record_tuples == [
(
"frequenz.client.base.streaming",
logging.WARNING,
"test_helper: connection ended, retrying mock progress in 0.000 "
f"seconds. Error: {expected_error_str}.",
),
(
"frequenz.client.base.streaming",
logging.ERROR,
"test_helper: connection ended, retry limit exceeded (mock progress), "
f"giving up. Error: {expected_error_str}.",
),
]

0 comments on commit e43b075

Please sign in to comment.