Skip to content

Commit

Permalink
Quantity ABC try 2, part 2
Browse files Browse the repository at this point in the history
  • Loading branch information
llucax committed Dec 18, 2023
1 parent 448eddf commit c9a525a
Show file tree
Hide file tree
Showing 13 changed files with 293 additions and 608 deletions.
6 changes: 3 additions & 3 deletions src/frequenz/sdk/microgrid/_data_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#
# pylint: disable=import-outside-toplevel
if typing.TYPE_CHECKING:
from ..actor import ComponentMetricRequest, ResamplerConfig, _power_managing
from ..actor import ComponentMetricRequest, ResamplingActorConfig, _power_managing
from ..actor.power_distributing import ( # noqa: F401 (imports used by string type hints)
ComponentPoolStatus,
PowerDistributingActor,
Expand Down Expand Up @@ -78,7 +78,7 @@ class _DataPipeline: # pylint: disable=too-many-instance-attributes

def __init__(
self,
resampler_config: ResamplerConfig[float],
resampler_config: ResamplingActorConfig,
) -> None:
"""Create a `DataPipeline` instance.
Expand Down Expand Up @@ -384,7 +384,7 @@ async def _stop(self) -> None:
_DATA_PIPELINE: _DataPipeline | None = None


async def initialize(resampler_config: ResamplerConfig[float]) -> None:
async def initialize(resampler_config: ResamplingActorConfig) -> None:
"""Initialize a `DataPipeline` instance.
Args:
Expand Down
68 changes: 26 additions & 42 deletions src/frequenz/sdk/timeseries/_base_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,22 @@
import dataclasses
import enum
import functools
import typing
from collections.abc import Callable, Iterator
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Generic, Protocol, Self, SupportsFloat, overload
from typing import Generic, Self, SupportsFloat, TypeVar, overload

from ._quantities import Power

UNIX_EPOCH = datetime.fromtimestamp(0.0, tz=timezone.utc)
"""The UNIX epoch (in UTC)."""


class Comparable(Protocol):
def __lt__(self, other: Self) -> bool:
...

def __gt__(self, other: Self) -> bool:
...

def __le__(self, other: Self) -> bool:
...

def __ge__(self, other: Self) -> bool:
...


_T = typing.TypeVar("_T")
SupportsFloatT = typing.TypeVar("SupportsFloatT", bound=SupportsFloat)
SupportsFloatT = TypeVar("SupportsFloatT", bound=SupportsFloat)
"""Type variable for types that support conversion to float."""

ComparableT = typing.TypeVar("ComparableT", bound=Comparable)
UNIX_EPOCH = datetime.fromtimestamp(0.0, tz=timezone.utc)
"""The UNIX epoch (in UTC)."""


@dataclass(frozen=True, order=True)
class Sample(Generic[_T]):
class Sample(Generic[SupportsFloatT]):
"""A measurement taken at a particular point in time.
The `value` could be `None` if a component is malfunctioning or data is
Expand All @@ -51,12 +32,12 @@ class Sample(Generic[_T]):
timestamp: datetime
"""The time when this sample was generated."""

value: _T | None = None
value: SupportsFloatT | None = None
"""The value of this sample."""


@dataclass(frozen=True)
class Sample3Phase(Generic[ComparableT]):
class Sample3Phase(Generic[SupportsFloatT]):
"""A 3-phase measurement made at a particular point in time.
Each of the `value` fields could be `None` if a component is malfunctioning
Expand All @@ -67,16 +48,16 @@ class Sample3Phase(Generic[ComparableT]):

timestamp: datetime
"""The time when this sample was generated."""
value_p1: ComparableT | None
value_p1: SupportsFloatT | None
"""The value of the 1st phase in this sample."""

value_p2: ComparableT | None
value_p2: SupportsFloatT | None
"""The value of the 2nd phase in this sample."""

value_p3: ComparableT | None
value_p3: SupportsFloatT | None
"""The value of the 3rd phase in this sample."""

def __iter__(self) -> Iterator[ComparableT | None]:
def __iter__(self) -> Iterator[SupportsFloatT | None]:
"""Return an iterator that yields values from each of the phases.
Yields:
Expand All @@ -87,14 +68,14 @@ def __iter__(self) -> Iterator[ComparableT | None]:
yield self.value_p3

@overload
def max(self, default: ComparableT) -> ComparableT:
def max(self, default: SupportsFloatT) -> SupportsFloatT:
...

@overload
def max(self, default: None = None) -> ComparableT | None:
def max(self, default: None = None) -> SupportsFloatT | None:
...

def max(self, default: ComparableT | None = None) -> ComparableT | None:
def max(self, default: SupportsFloatT | None = None) -> SupportsFloatT | None:
"""Return the max value among all phases, or default if they are all `None`.
Args:
Expand All @@ -105,21 +86,21 @@ def max(self, default: ComparableT | None = None) -> ComparableT | None:
"""
if not any(self):
return default
value: ComparableT = functools.reduce(
lambda x, y: x if x > y else y,
value: SupportsFloatT = functools.reduce(
lambda x, y: x if float(x) > float(y) else y,
filter(None, self),
)
return value

@overload
def min(self, default: ComparableT) -> ComparableT:
def min(self, default: SupportsFloatT) -> SupportsFloatT:
...

@overload
def min(self, default: None = None) -> ComparableT | None:
def min(self, default: None = None) -> SupportsFloatT | None:
...

def min(self, default: ComparableT | None = None) -> ComparableT | None:
def min(self, default: SupportsFloatT | None = None) -> SupportsFloatT | None:
"""Return the min value among all phases, or default if they are all `None`.
Args:
Expand All @@ -130,16 +111,16 @@ def min(self, default: ComparableT | None = None) -> ComparableT | None:
"""
if not any(self):
return default
value: ComparableT = functools.reduce(
lambda x, y: x if x < y else y,
value: SupportsFloatT = functools.reduce(
lambda x, y: x if float(x) < float(y) else y,
filter(None, self),
)
return value

def map(
self,
function: Callable[[ComparableT], ComparableT],
default: ComparableT | None = None,
function: Callable[[SupportsFloatT], SupportsFloatT],
default: SupportsFloatT | None = None,
) -> Self:
"""Apply the given function on each of the phase values and return the result.
Expand All @@ -161,6 +142,9 @@ def map(
)


_T = TypeVar("_T")


@dataclass(frozen=True)
class Bounds(Generic[_T]):
"""Lower and upper bound values."""
Expand Down
21 changes: 11 additions & 10 deletions src/frequenz/sdk/timeseries/_moving_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@
from numpy.typing import ArrayLike

from ..actor._background_service import BackgroundService
from ._base_types import UNIX_EPOCH, Sample
from ._quantities import QuantityT
from ._base_types import UNIX_EPOCH, Sample, SupportsFloatT
from ._resampling import Resampler, ResamplerConfig
from ._ringbuffer import OrderedRingBuffer

_logger = logging.getLogger(__name__)


class MovingWindow(BackgroundService, Generic[QuantityT]):
class MovingWindow(BackgroundService, Generic[SupportsFloatT]):
"""
A data window that moves with the latest datapoints of a data stream.
Expand Down Expand Up @@ -130,9 +129,9 @@ async def run() -> None:
def __init__( # pylint: disable=too-many-arguments
self,
size: timedelta,
resampled_data_recv: Receiver[Sample[QuantityT]],
resampled_data_recv: Receiver[Sample[SupportsFloatT]],
input_sampling_period: timedelta,
resampler_config: ResamplerConfig[QuantityT] | None = None,
resampler_config: ResamplerConfig[SupportsFloatT] | None = None,
align_to: datetime = UNIX_EPOCH,
*,
name: str | None = None,
Expand Down Expand Up @@ -166,8 +165,8 @@ def __init__( # pylint: disable=too-many-arguments

self._sampling_period = input_sampling_period

self._resampler: Resampler[QuantityT] | None = None
self._resampler_sender: Sender[Sample[QuantityT]] | None = None
self._resampler: Resampler[SupportsFloatT] | None = None
self._resampler_sender: Sender[Sample[SupportsFloatT]] | None = None

if resampler_config:
assert (
Expand All @@ -182,7 +181,9 @@ def __init__( # pylint: disable=too-many-arguments
size.total_seconds() / self._sampling_period.total_seconds()
)

self._resampled_data_recv: Receiver[Sample[QuantityT]] = resampled_data_recv
self._resampled_data_recv: Receiver[
Sample[SupportsFloatT]
] = resampled_data_recv
self._buffer = OrderedRingBuffer(
np.empty(shape=num_samples, dtype=float),
sampling_period=self._sampling_period,
Expand Down Expand Up @@ -341,11 +342,11 @@ def _configure_resampler(self) -> None:
"""Configure the components needed to run the resampler."""
assert self._resampler is not None

async def sink_buffer(sample: Sample[QuantityT]) -> None:
async def sink_buffer(sample: Sample[SupportsFloatT]) -> None:
if sample.value is not None:
self._buffer.update(sample)

resampler_channel = Broadcast[Sample[QuantityT]]("average")
resampler_channel = Broadcast[Sample[SupportsFloatT]]("average")
self._resampler_sender = resampler_channel.new_sender()
self._resampler.add_timeseries(
"avg",
Expand Down
8 changes: 4 additions & 4 deletions src/frequenz/sdk/timeseries/_periodic_feature_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
from numpy.typing import NDArray

from .._internal._math import is_close_to_zero
from ..timeseries._quantities import QuantityT
from ._moving_window import MovingWindow
from ._quantities import SupportsFloatT
from ._ringbuffer import OrderedRingBuffer

_logger = logging.getLogger(__name__)
Expand All @@ -50,7 +50,7 @@ class RelativePositions:
"""The relative position of the next incoming sample."""


class PeriodicFeatureExtractor(Generic[QuantityT]):
class PeriodicFeatureExtractor(Generic[SupportsFloatT]):
"""
A feature extractor for historical timeseries data.
Expand Down Expand Up @@ -108,7 +108,7 @@ class PeriodicFeatureExtractor(Generic[QuantityT]):

def __init__(
self,
moving_window: MovingWindow[QuantityT],
moving_window: MovingWindow[SupportsFloatT],
period: timedelta,
) -> None:
"""
Expand All @@ -121,7 +121,7 @@ def __init__(
Raises:
ValueError: If the MovingWindow size is not a integer multiple of the period.
"""
self._moving_window: MovingWindow[QuantityT] = moving_window
self._moving_window: MovingWindow[SupportsFloatT] = moving_window

self._sampling_period = self._moving_window.sampling_period
"""The sampling_period as float to use it for indexing of samples."""
Expand Down
Loading

0 comments on commit c9a525a

Please sign in to comment.