Skip to content

Commit

Permalink
Implement receive_component_data_samples_stream method
Browse files Browse the repository at this point in the history
This method allows to stream data samples from a component. It uses a
`GrpcStreamBroadcaster` to handle the streaming of the data samples,
even when it doesn't make a lot of sense given how the API works now (it
is much less likely that we will have the same request twice,
justifiying the reuse of the broadcaster/data channel).

To minimize changes, we go with this approach but it will probably be
changed in the future.

Signed-off-by: Leandro Lucarella <[email protected]>
  • Loading branch information
llucax committed Oct 7, 2024
1 parent cd639e9 commit 01e066a
Show file tree
Hide file tree
Showing 7 changed files with 636 additions and 2 deletions.
87 changes: 85 additions & 2 deletions src/frequenz/client/microgrid/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
from collections.abc import Iterable
from dataclasses import replace
from datetime import datetime, timedelta
from typing import Any, assert_never
from typing import assert_never

from frequenz.api.common.v1.metrics import metric_sample_pb2
from frequenz.api.common.v1.microgrid.components import components_pb2
from frequenz.api.microgrid.v1 import microgrid_pb2, microgrid_pb2_grpc
from frequenz.channels import Receiver
from frequenz.client.base import channel, client, conversion, retry, streaming
from google.protobuf.empty_pb2 import Empty

Expand All @@ -23,6 +25,9 @@
from .component._component_proto import component_from_proto
from .component._connection import ComponentConnection
from .component._connection_proto import component_connection_from_proto
from .component._data_samples import ComponentDataSamples
from .component._data_samples_proto import component_data_sample_from_proto
from .metrics._metric import Metric

DEFAULT_GRPC_CALL_TIMEOUT = 60.0
"""The default timeout for gRPC calls made by this client (in seconds)."""
Expand Down Expand Up @@ -74,7 +79,12 @@ def __init__(
channel_defaults=channel_defaults,
)
self._async_stub: microgrid_pb2_grpc.MicrogridAsyncStub = self.stub # type: ignore
self._broadcasters: dict[int, streaming.GrpcStreamBroadcaster[Any, Any]] = {}
self._component_data_samples_broadcasters: dict[
str,
streaming.GrpcStreamBroadcaster[
microgrid_pb2.ReceiveComponentDataStreamResponse, ComponentDataSamples
],
] = {}
self._retry_strategy = retry_strategy

async def get_microgrid_info( # noqa: DOC502 (raises ApiClientError indirectly)
Expand Down Expand Up @@ -375,6 +385,68 @@ async def set_component_power_reactive( # noqa: DOC502 (raises ApiClientError i

return None

# noqa: DOC502 (Raises ApiClientError indirectly)
async def receive_component_data_samples_stream(
self,
component: ComponentId | Component,
metrics: Iterable[Metric | int],
*,
buffer_size: int = 50,
) -> Receiver[ComponentDataSamples]:
"""Stream data samples from a component.
At least one metric must be specified. If no metric is specified, then the
stream will raise an error.
Warning:
Components may not support all metrics. If a component does not support
a given metric, then the returned data stream will not contain that metric.
There is no way to tell if a metric is not being received because the
component does not support it or because there is a transient issue when
retrieving the metric from the component.
The supported metrics by a component can even change with time, for example,
if a component is updated with new firmware.
Args:
component: The component to stream data from.
metrics: List of metrics to return. Only the specified metrics will be
returned.
buffer_size: The maximum number of messages to buffer in the returned
receiver. After this limit is reached, the oldest messages will be
dropped.
Returns:
The data stream from the component.
"""
component_id = _get_component_id(component)
metrics_set = frozenset([_get_metric_value(m) for m in metrics])
key = f"{component_id}-{hash(metrics_set)}"
broadcaster = self._component_data_samples_broadcasters.get(key)
if broadcaster is None:
client_id = hex(id(self))[2:]
stream_name = f"microgrid-client-{client_id}-component-data-{key}"
create_filter = (
microgrid_pb2.ReceiveComponentDataStreamRequest.ComponentDataStreamFilter
)
broadcaster = streaming.GrpcStreamBroadcaster(
stream_name,
lambda: aiter(
self._async_stub.ReceiveComponentDataStream(
microgrid_pb2.ReceiveComponentDataStreamRequest(
component_id=_get_component_id(component),
filter=create_filter(metrics=metrics_set),
),
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
)
),
lambda msg: component_data_sample_from_proto(msg.data),
retry_strategy=self._retry_strategy,
)
self._component_data_samples_broadcasters[key] = broadcaster
return broadcaster.new_receiver(maxsize=buffer_size)


def _get_component_id(component: ComponentId | Component) -> int:
"""Get the component ID from a component or component ID."""
Expand All @@ -387,6 +459,17 @@ def _get_component_id(component: ComponentId | Component) -> int:
assert_never(unexpected)


def _get_metric_value(metric: Metric | int) -> metric_sample_pb2.Metric.ValueType:
"""Get the metric ID from a metric or metric ID."""
match metric:
case Metric():
return metric_sample_pb2.Metric.ValueType(metric.value)
case int():
return metric_sample_pb2.Metric.ValueType(metric)
case unexpected:
assert_never(unexpected)


def _get_category_value(
category: ComponentCategory | int,
) -> components_pb2.ComponentCategory.ValueType:
Expand Down
24 changes: 24 additions & 0 deletions src/frequenz/client/microgrid/component/_data_samples.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# License: MIT
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""Definition of a component data aggregate."""

from dataclasses import dataclass

from .._id import ComponentId
from ..metrics._sample import MetricSample
from ._state_sample import ComponentStateSample


@dataclass(frozen=True, kw_only=True)
class ComponentDataSamples:
"""An aggregate of multiple metrics, states, and errors of a component."""

component_id: ComponentId
"""The unique identifier of the component."""

metrics: list[MetricSample]
"""The metrics sampled from the component."""

states: list[ComponentStateSample]
"""The states sampled from the component."""
35 changes: 35 additions & 0 deletions src/frequenz/client/microgrid/component/_data_samples_proto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# License: MIT
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""Loading of ComponentDataSamples objects from protobuf messages."""


from frequenz.api.common.v1.microgrid.components import components_pb2

from .._id import ComponentId
from ..metrics._sample_proto import metric_sample_from_proto
from ._data_samples import ComponentDataSamples
from ._state_sample_proto import component_state_sample_from_proto


def component_data_sample_from_proto(
message: components_pb2.ComponentData,
) -> ComponentDataSamples:
"""Convert a protobuf component data message to a component data object.
Args:
message: The protobuf message to convert.
Returns:
The resulting `ComponentDataSamples` object.
"""
# At some point it might make sense to also log issues found in the samples, but
# using a naive approach like in `component_from_proto` might spam the logs too
# much, as we can receive several samples per second, and if a component is in
# a unrecognized state for long, it will mean we will emit the same log message
# again and again.
return ComponentDataSamples(
component_id=ComponentId(message.component_id),
metrics=[metric_sample_from_proto(sample) for sample in message.metric_samples],
states=[component_state_sample_from_proto(state) for state in message.states],
)
Loading

0 comments on commit 01e066a

Please sign in to comment.