Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Make Quantity an abstract base class #822

Draft
wants to merge 17 commits into
base: v1.x.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/frequenz/sdk/actor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,13 +592,12 @@ async def main() -> None: # (6)!
[_run]: #the-_run-method
"""

from ..timeseries._resampling import ResamplerConfig
from ._actor import Actor
from ._background_service import BackgroundService
from ._channel_registry import ChannelRegistry
from ._config_managing import ConfigManagingActor
from ._data_sourcing import ComponentMetricRequest, DataSourcingActor
from ._resampling import ComponentMetricsResamplingActor
from ._resampling import ComponentMetricsResamplingActor, ResamplingActorConfig
from ._run_utils import run

__all__ = [
Expand All @@ -609,6 +608,6 @@ async def main() -> None: # (6)!
"ComponentMetricsResamplingActor",
"ConfigManagingActor",
"DataSourcingActor",
"ResamplerConfig",
"ResamplingActorConfig",
"run",
]
92 changes: 49 additions & 43 deletions src/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
MeterData,
)
from ...timeseries import Sample
from ...timeseries._quantities import Quantity
from .._channel_registry import ChannelRegistry
from ._component_metric_request import ComponentMetricRequest

Expand Down Expand Up @@ -318,7 +317,7 @@ def _get_metric_senders(
self,
category: ComponentCategory,
requests: dict[ComponentMetricId, list[ComponentMetricRequest]],
) -> list[tuple[Callable[[Any], float], list[Sender[Sample[Quantity]]]]]:
) -> list[tuple[Callable[[Any], float], list[Sender[Sample[float]]]]]:
"""Get channel senders from the channel registry for each requested metric.

Args:
Expand All @@ -335,7 +334,7 @@ def _get_metric_senders(
self._get_data_extraction_method(category, metric),
[
self._registry.get_or_create(
Sample[Quantity], request.get_channel_name()
Sample[float], request.get_channel_name()
).new_sender()
for request in req_list
],
Expand All @@ -353,48 +352,55 @@ async def _handle_data_stream(
Args:
comp_id: Id of the requested component.
category: The category of the component.

Raises:
Exception: if an error occurs while streaming data.
"""
stream_senders = []
if comp_id in self._req_streaming_metrics:
await self._check_requested_component_and_metrics(
comp_id, category, self._req_streaming_metrics[comp_id]
try:
stream_senders = []
if comp_id in self._req_streaming_metrics:
await self._check_requested_component_and_metrics(
comp_id, category, self._req_streaming_metrics[comp_id]
)
stream_senders = self._get_metric_senders(
category, self._req_streaming_metrics[comp_id]
)
api_data_receiver: Receiver[Any] = self.comp_data_receivers[comp_id]

senders_done: asyncio.Event = asyncio.Event()
pending_messages = 0

def process_msg(data: Any) -> None:
tasks = []
for extractor, senders in stream_senders:
for sender in senders:
tasks.append(
sender.send(Sample(data.timestamp, extractor(data)))
)
asyncio.gather(*tasks)
nonlocal pending_messages
pending_messages -= 1
if pending_messages == 0:
senders_done.set()

async for data in api_data_receiver:
pending_messages += 1
senders_done.clear()
process_msg(data)

while pending_messages > 0:
await senders_done.wait()

await asyncio.gather(
*[
self._registry.close_and_remove(r.get_channel_name())
for requests in self._req_streaming_metrics[comp_id].values()
for r in requests
]
)
stream_senders = self._get_metric_senders(
category, self._req_streaming_metrics[comp_id]
)
api_data_receiver: Receiver[Any] = self.comp_data_receivers[comp_id]

senders_done: asyncio.Event = asyncio.Event()
pending_messages = 0

def process_msg(data: Any) -> None:
tasks = []
for extractor, senders in stream_senders:
for sender in senders:
tasks.append(
sender.send(Sample(data.timestamp, Quantity(extractor(data))))
)
asyncio.gather(*tasks)
nonlocal pending_messages
pending_messages -= 1
if pending_messages == 0:
senders_done.set()

async for data in api_data_receiver:
pending_messages += 1
senders_done.clear()
process_msg(data)

while pending_messages > 0:
await senders_done.wait()

await asyncio.gather(
*[
self._registry.close_and_remove(r.get_channel_name())
for requests in self._req_streaming_metrics[comp_id].values()
for r in requests
]
)
except Exception as exc:
_logger.exception("Error while streaming data for component %d", comp_id)
raise exc

async def _update_streams(
self,
Expand Down
25 changes: 19 additions & 6 deletions src/frequenz/sdk/actor/_resampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
import asyncio
import dataclasses
import logging
from typing import Callable

from frequenz.channels import Receiver, Sender

from .._internal._asyncio import cancel_and_await
from ..timeseries import Sample
from ..timeseries._quantities import Quantity
from ..timeseries._base_types import Sample
from ..timeseries._resampling import Resampler, ResamplerConfig, ResamplingError
from ._actor import Actor
from ._channel_registry import ChannelRegistry
Expand All @@ -21,6 +21,17 @@
_logger = logging.getLogger(__name__)


# We need to use the dataclass decorator again because we are making a required
# attribute optional, so we need the dataclass to re-generate the constructor with the
# new signature.
@dataclasses.dataclass(frozen=True)
class ResamplingActorConfig(ResamplerConfig[float]):
"""Configuration for the resampling actor."""

value_constructor: Callable[[float], float] = float
"""The constructor to use to create new sample values."""


class ComponentMetricsResamplingActor(Actor):
"""An actor to resample microgrid component metrics."""

Expand All @@ -30,7 +41,7 @@ def __init__( # pylint: disable=too-many-arguments
channel_registry: ChannelRegistry,
data_sourcing_request_sender: Sender[ComponentMetricRequest],
resampling_request_receiver: Receiver[ComponentMetricRequest],
config: ResamplerConfig,
config: ResamplingActorConfig,
name: str | None = None,
) -> None:
"""Initialize an instance.
Expand All @@ -49,13 +60,15 @@ def __init__( # pylint: disable=too-many-arguments
"""
super().__init__(name=name)
self._channel_registry: ChannelRegistry = channel_registry

self._data_sourcing_request_sender: Sender[ComponentMetricRequest] = (
data_sourcing_request_sender
)
self._resampling_request_receiver: Receiver[ComponentMetricRequest] = (
resampling_request_receiver
)
self._resampler: Resampler = Resampler(config)
self._resampler: Resampler[float] = Resampler(config)

self._active_req_channels: set[str] = set()

async def _subscribe(self, request: ComponentMetricRequest) -> None:
Expand All @@ -78,13 +91,13 @@ async def _subscribe(self, request: ComponentMetricRequest) -> None:
data_source_channel_name = data_source_request.get_channel_name()
await self._data_sourcing_request_sender.send(data_source_request)
receiver = self._channel_registry.get_or_create(
Sample[Quantity], data_source_channel_name
Sample[float], data_source_channel_name
).new_receiver()

# This is a temporary hack until the Sender implementation uses
# exceptions to report errors.
sender = self._channel_registry.get_or_create(
Sample[Quantity], request_channel_name
Sample[float], request_channel_name
).new_sender()

self._resampler.add_timeseries(request_channel_name, receiver, sender.send)
Expand Down
10 changes: 8 additions & 2 deletions src/frequenz/sdk/microgrid/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@
to limit the charge power of individual EV Chargers.
""" # noqa: D205, D400

from ..actor import ResamplerConfig
import typing

from . import _data_pipeline, client, component, connection_manager, metadata
from ._data_pipeline import (
battery_pool,
Expand All @@ -134,8 +135,13 @@
voltage,
)

if typing.TYPE_CHECKING:
from ..actor._resampling import ResamplingActorConfig


async def initialize(host: str, port: int, resampler_config: ResamplerConfig) -> None:
async def initialize(
host: str, port: int, resampler_config: "ResamplingActorConfig"
) -> None:
"""Initialize the microgrid connection manager and the data pipeline.

Args:
Expand Down
8 changes: 4 additions & 4 deletions src/frequenz/sdk/microgrid/_data_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
#
# pylint: disable=import-outside-toplevel
if typing.TYPE_CHECKING:
from ..actor import ComponentMetricRequest, ResamplerConfig, _power_managing
from ..actor import ComponentMetricRequest, ResamplingActorConfig, _power_managing
from ..actor.power_distributing import ( # noqa: F401 (imports used by string type hints)
ComponentPoolStatus,
PowerDistributingActor,
Expand Down Expand Up @@ -81,7 +81,7 @@ class _DataPipeline: # pylint: disable=too-many-instance-attributes

def __init__(
self,
resampler_config: ResamplerConfig,
resampler_config: ResamplingActorConfig,
) -> None:
"""Create a `DataPipeline` instance.

Expand All @@ -90,7 +90,7 @@ def __init__(
"""
from ..actor import ChannelRegistry

self._resampler_config: ResamplerConfig = resampler_config
self._resampler_config: ResamplingActorConfig = resampler_config

self._channel_registry: ChannelRegistry = ChannelRegistry(
name="Data Pipeline Registry"
Expand Down Expand Up @@ -408,7 +408,7 @@ async def _stop(self) -> None:
_DATA_PIPELINE: _DataPipeline | None = None


async def initialize(resampler_config: ResamplerConfig) -> None:
async def initialize(resampler_config: ResamplingActorConfig) -> None:
"""Initialize a `DataPipeline` instance.

Args:
Expand Down
Loading
Loading