Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve docs and fix copy() type #23

Merged
merged 5 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
## New Features

* Functions to convert to `datetime` and protobufs `Timestamp` have been added.
* The generated documentation was improved to include information on defaults and generic parameters.

## Bug Fixes

<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
* When copying `RetryStrategy`s, the type now will be correctly set to the original type.
19 changes: 11 additions & 8 deletions src/frequenz/client/base/grpc_streaming_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import asyncio
import logging
import typing
from typing import Any, Callable, Generic, TypeVar

import grpc
from grpc.aio import UnaryStreamCall
Expand All @@ -17,18 +17,21 @@
_logger = logging.getLogger(__name__)


_InputT = typing.TypeVar("_InputT")
_OutputT = typing.TypeVar("_OutputT")
InputT = TypeVar("InputT")
"""The input type of the stream."""

OutputT = TypeVar("OutputT")
"""The output type of the stream."""

class GrpcStreamingHelper(typing.Generic[_InputT, _OutputT]):

class GrpcStreamingHelper(Generic[InputT, OutputT]):
"""Helper class to handle grpc streaming methods."""

def __init__(
self,
stream_name: str,
stream_method: typing.Callable[[], UnaryStreamCall[typing.Any, _InputT]],
transform: typing.Callable[[_InputT], _OutputT],
stream_method: Callable[[], UnaryStreamCall[Any, InputT]],
transform: Callable[[InputT], OutputT],
retry_spec: retry_strategy.RetryStrategy | None = None,
):
"""Initialize the streaming helper.
Expand All @@ -48,12 +51,12 @@ def __init__(
retry_strategy.LinearBackoff() if retry_spec is None else retry_spec.copy()
)

self._channel: channels.Broadcast[_OutputT] = channels.Broadcast(
self._channel: channels.Broadcast[OutputT] = channels.Broadcast(
f"GrpcStreamingHelper-{stream_name}"
)
self._task = asyncio.create_task(self._run())

def new_receiver(self, maxsize: int = 50) -> channels.Receiver[_OutputT]:
def new_receiver(self, maxsize: int = 50) -> channels.Receiver[OutputT]:
"""Create a new receiver for the stream.

Args:
Expand Down
17 changes: 8 additions & 9 deletions src/frequenz/client/base/retry_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@

"""Implementations for retry strategies."""

from __future__ import annotations

import random
from abc import ABC, abstractmethod
from collections.abc import Iterator
from copy import deepcopy
from typing import Self

_DEFAULT_RETRY_INTERVAL = 3.0
DEFAULT_RETRY_INTERVAL = 3.0
"""Default retry interval, in seconds."""

_DEFAULT_RETRY_JITTER = 1.0
DEFAULT_RETRY_JITTER = 1.0
"""Default retry jitter, in seconds."""


Expand Down Expand Up @@ -52,7 +51,7 @@ def reset(self) -> None:
"""
self._count = 0

def copy(self) -> RetryStrategy:
def copy(self) -> Self:
"""Create a new instance of `self`.
Returns:
Expand Down Expand Up @@ -80,8 +79,8 @@ class LinearBackoff(RetryStrategy):

def __init__(
self,
interval: float = _DEFAULT_RETRY_INTERVAL,
jitter: float = _DEFAULT_RETRY_JITTER,
interval: float = DEFAULT_RETRY_INTERVAL,
jitter: float = DEFAULT_RETRY_JITTER,
limit: int | None = None,
) -> None:
"""Create a `LinearBackoff` instance.
Expand Down Expand Up @@ -116,7 +115,7 @@ def next_interval(self) -> float | None:
class ExponentialBackoff(RetryStrategy):
"""Provides methods for calculating the exponential interval between retries."""

DEFAULT_INTERVAL = _DEFAULT_RETRY_INTERVAL
DEFAULT_INTERVAL = DEFAULT_RETRY_INTERVAL
"""Default retry interval, in seconds."""

DEFAULT_MAX_INTERVAL = 60.0
Expand All @@ -131,7 +130,7 @@ def __init__(
initial_interval: float = DEFAULT_INTERVAL,
max_interval: float = DEFAULT_MAX_INTERVAL,
multiplier: float = DEFAULT_MULTIPLIER,
jitter: float = _DEFAULT_RETRY_JITTER,
jitter: float = DEFAULT_RETRY_JITTER,
limit: int | None = None,
) -> None:
"""Create a `ExponentialBackoff` instance.
Expand Down
Loading