diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 0749bd3..860f844 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -11,7 +11,8 @@ ## New Features * Functions to convert to `datetime` and protobufs `Timestamp` have been added. +* The generated documentation was improved to include information on defaults and generic parameters. ## Bug Fixes - +* When copying `RetryStrategy`s, the type now will be correctly set to the original type. diff --git a/src/frequenz/client/base/grpc_streaming_helper.py b/src/frequenz/client/base/grpc_streaming_helper.py index 95afe0a..8fa7faa 100644 --- a/src/frequenz/client/base/grpc_streaming_helper.py +++ b/src/frequenz/client/base/grpc_streaming_helper.py @@ -5,7 +5,7 @@ import asyncio import logging -import typing +from typing import Any, Callable, Generic, TypeVar import grpc from grpc.aio import UnaryStreamCall @@ -17,18 +17,21 @@ _logger = logging.getLogger(__name__) -_InputT = typing.TypeVar("_InputT") -_OutputT = typing.TypeVar("_OutputT") +InputT = TypeVar("InputT") +"""The input type of the stream.""" +OutputT = TypeVar("OutputT") +"""The output type of the stream.""" -class GrpcStreamingHelper(typing.Generic[_InputT, _OutputT]): + +class GrpcStreamingHelper(Generic[InputT, OutputT]): """Helper class to handle grpc streaming methods.""" def __init__( self, stream_name: str, - stream_method: typing.Callable[[], UnaryStreamCall[typing.Any, _InputT]], - transform: typing.Callable[[_InputT], _OutputT], + stream_method: Callable[[], UnaryStreamCall[Any, InputT]], + transform: Callable[[InputT], OutputT], retry_spec: retry_strategy.RetryStrategy | None = None, ): """Initialize the streaming helper. @@ -48,12 +51,12 @@ def __init__( retry_strategy.LinearBackoff() if retry_spec is None else retry_spec.copy() ) - self._channel: channels.Broadcast[_OutputT] = channels.Broadcast( + self._channel: channels.Broadcast[OutputT] = channels.Broadcast( f"GrpcStreamingHelper-{stream_name}" ) self._task = asyncio.create_task(self._run()) - def new_receiver(self, maxsize: int = 50) -> channels.Receiver[_OutputT]: + def new_receiver(self, maxsize: int = 50) -> channels.Receiver[OutputT]: """Create a new receiver for the stream. Args: diff --git a/src/frequenz/client/base/retry_strategy.py b/src/frequenz/client/base/retry_strategy.py index 4eb19e0..e4bd626 100644 --- a/src/frequenz/client/base/retry_strategy.py +++ b/src/frequenz/client/base/retry_strategy.py @@ -3,17 +3,16 @@ """Implementations for retry strategies.""" -from __future__ import annotations - import random from abc import ABC, abstractmethod from collections.abc import Iterator from copy import deepcopy +from typing import Self -_DEFAULT_RETRY_INTERVAL = 3.0 +DEFAULT_RETRY_INTERVAL = 3.0 """Default retry interval, in seconds.""" -_DEFAULT_RETRY_JITTER = 1.0 +DEFAULT_RETRY_JITTER = 1.0 """Default retry jitter, in seconds.""" @@ -52,7 +51,7 @@ def reset(self) -> None: """ self._count = 0 - def copy(self) -> RetryStrategy: + def copy(self) -> Self: """Create a new instance of `self`. Returns: @@ -80,8 +79,8 @@ class LinearBackoff(RetryStrategy): def __init__( self, - interval: float = _DEFAULT_RETRY_INTERVAL, - jitter: float = _DEFAULT_RETRY_JITTER, + interval: float = DEFAULT_RETRY_INTERVAL, + jitter: float = DEFAULT_RETRY_JITTER, limit: int | None = None, ) -> None: """Create a `LinearBackoff` instance. @@ -116,7 +115,7 @@ def next_interval(self) -> float | None: class ExponentialBackoff(RetryStrategy): """Provides methods for calculating the exponential interval between retries.""" - DEFAULT_INTERVAL = _DEFAULT_RETRY_INTERVAL + DEFAULT_INTERVAL = DEFAULT_RETRY_INTERVAL """Default retry interval, in seconds.""" DEFAULT_MAX_INTERVAL = 60.0 @@ -131,7 +130,7 @@ def __init__( initial_interval: float = DEFAULT_INTERVAL, max_interval: float = DEFAULT_MAX_INTERVAL, multiplier: float = DEFAULT_MULTIPLIER, - jitter: float = _DEFAULT_RETRY_JITTER, + jitter: float = DEFAULT_RETRY_JITTER, limit: int | None = None, ) -> None: """Create a `ExponentialBackoff` instance.