diff --git a/src/frequenz/client/microgrid/_client.py b/src/frequenz/client/microgrid/_client.py index 471235f..6cf154f 100644 --- a/src/frequenz/client/microgrid/_client.py +++ b/src/frequenz/client/microgrid/_client.py @@ -7,10 +7,12 @@ from collections.abc import Iterable from dataclasses import replace from datetime import datetime, timedelta -from typing import Any, assert_never +from typing import assert_never +from frequenz.api.common.v1.metrics import metric_sample_pb2 from frequenz.api.common.v1.microgrid.components import components_pb2 from frequenz.api.microgrid.v1 import microgrid_pb2, microgrid_pb2_grpc +from frequenz.channels import Receiver from frequenz.client.base import channel, client, conversion, retry, streaming from google.protobuf.empty_pb2 import Empty @@ -23,6 +25,9 @@ from .component._component_proto import component_from_proto from .component._connection import ComponentConnection from .component._connection_proto import component_connection_from_proto +from .component._data_samples import ComponentDataSamples +from .component._data_samples_proto import component_data_sample_from_proto +from .metrics._metric import Metric DEFAULT_GRPC_CALL_TIMEOUT = 60.0 """The default timeout for gRPC calls made by this client (in seconds).""" @@ -74,7 +79,12 @@ def __init__( channel_defaults=channel_defaults, ) self._async_stub: microgrid_pb2_grpc.MicrogridAsyncStub = self.stub # type: ignore - self._broadcasters: dict[int, streaming.GrpcStreamBroadcaster[Any, Any]] = {} + self._component_data_samples_broadcasters: dict[ + str, + streaming.GrpcStreamBroadcaster[ + microgrid_pb2.ReceiveComponentDataStreamResponse, ComponentDataSamples + ], + ] = {} self._retry_strategy = retry_strategy async def get_microgrid_info( # noqa: DOC502 (raises ApiClientError indirectly) @@ -375,6 +385,68 @@ async def set_component_power_reactive( # noqa: DOC502 (raises ApiClientError i return None + # noqa: DOC502 (Raises ApiClientError indirectly) + async def receive_component_data_samples_stream( + self, + component: ComponentId | Component, + metrics: Iterable[Metric | int], + *, + buffer_size: int = 50, + ) -> Receiver[ComponentDataSamples]: + """Stream data samples from a component. + + At least one metric must be specified. If no metric is specified, then the + stream will raise an error. + + Warning: + Components may not support all metrics. If a component does not support + a given metric, then the returned data stream will not contain that metric. + + There is no way to tell if a metric is not being received because the + component does not support it or because there is a transient issue when + retrieving the metric from the component. + + The supported metrics by a component can even change with time, for example, + if a component is updated with new firmware. + + Args: + component: The component to stream data from. + metrics: List of metrics to return. Only the specified metrics will be + returned. + buffer_size: The maximum number of messages to buffer in the returned + receiver. After this limit is reached, the oldest messages will be + dropped. + + Returns: + The data stream from the component. + """ + component_id = _get_component_id(component) + metrics_set = frozenset([_get_metric_value(m) for m in metrics]) + key = f"{component_id}-{hash(metrics_set)}" + broadcaster = self._component_data_samples_broadcasters.get(key) + if broadcaster is None: + client_id = hex(id(self))[2:] + stream_name = f"microgrid-client-{client_id}-component-data-{key}" + create_filter = ( + microgrid_pb2.ReceiveComponentDataStreamRequest.ComponentDataStreamFilter + ) + broadcaster = streaming.GrpcStreamBroadcaster( + stream_name, + lambda: aiter( + self._async_stub.ReceiveComponentDataStream( + microgrid_pb2.ReceiveComponentDataStreamRequest( + component_id=_get_component_id(component), + filter=create_filter(metrics=metrics_set), + ), + timeout=int(DEFAULT_GRPC_CALL_TIMEOUT), + ) + ), + lambda msg: component_data_sample_from_proto(msg.data), + retry_strategy=self._retry_strategy, + ) + self._component_data_samples_broadcasters[key] = broadcaster + return broadcaster.new_receiver(maxsize=buffer_size) + def _get_component_id(component: ComponentId | Component) -> int: """Get the component ID from a component or component ID.""" @@ -387,6 +459,17 @@ def _get_component_id(component: ComponentId | Component) -> int: assert_never(unexpected) +def _get_metric_value(metric: Metric | int) -> metric_sample_pb2.Metric.ValueType: + """Get the metric ID from a metric or metric ID.""" + match metric: + case Metric(): + return metric_sample_pb2.Metric.ValueType(metric.value) + case int(): + return metric_sample_pb2.Metric.ValueType(metric) + case unexpected: + assert_never(unexpected) + + def _get_category_value( category: ComponentCategory | int, ) -> components_pb2.ComponentCategory.ValueType: diff --git a/src/frequenz/client/microgrid/component/_data_samples.py b/src/frequenz/client/microgrid/component/_data_samples.py new file mode 100644 index 0000000..62381e3 --- /dev/null +++ b/src/frequenz/client/microgrid/component/_data_samples.py @@ -0,0 +1,24 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Definition of a component data aggregate.""" + +from dataclasses import dataclass + +from .._id import ComponentId +from ..metrics._sample import MetricSample +from ._state_sample import ComponentStateSample + + +@dataclass(frozen=True, kw_only=True) +class ComponentDataSamples: + """An aggregate of multiple metrics, states, and errors of a component.""" + + component_id: ComponentId + """The unique identifier of the component.""" + + metrics: list[MetricSample] + """The metrics sampled from the component.""" + + states: list[ComponentStateSample] + """The states sampled from the component.""" diff --git a/src/frequenz/client/microgrid/component/_data_samples_proto.py b/src/frequenz/client/microgrid/component/_data_samples_proto.py new file mode 100644 index 0000000..491263f --- /dev/null +++ b/src/frequenz/client/microgrid/component/_data_samples_proto.py @@ -0,0 +1,35 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Loading of ComponentDataSamples objects from protobuf messages.""" + + +from frequenz.api.common.v1.microgrid.components import components_pb2 + +from .._id import ComponentId +from ..metrics._sample_proto import metric_sample_from_proto +from ._data_samples import ComponentDataSamples +from ._state_sample_proto import component_state_sample_from_proto + + +def component_data_sample_from_proto( + message: components_pb2.ComponentData, +) -> ComponentDataSamples: + """Convert a protobuf component data message to a component data object. + + Args: + message: The protobuf message to convert. + + Returns: + The resulting `ComponentDataSamples` object. + """ + # At some point it might make sense to also log issues found in the samples, but + # using a naive approach like in `component_from_proto` might spam the logs too + # much, as we can receive several samples per second, and if a component is in + # a unrecognized state for long, it will mean we will emit the same log message + # again and again. + return ComponentDataSamples( + component_id=ComponentId(message.component_id), + metrics=[metric_sample_from_proto(sample) for sample in message.metric_samples], + states=[component_state_sample_from_proto(state) for state in message.states], + ) diff --git a/src/frequenz/client/microgrid/component/_state_sample.py b/src/frequenz/client/microgrid/component/_state_sample.py new file mode 100644 index 0000000..8b65380 --- /dev/null +++ b/src/frequenz/client/microgrid/component/_state_sample.py @@ -0,0 +1,221 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Definition of a component state.""" + +import enum +from dataclasses import dataclass +from datetime import datetime + + +@enum.unique +class ComponentStateCode(enum.Enum): + """The various states that a component can be in.""" + + UNSPECIFIED = 0 + """The state is unspecified (this should not be normally used).""" + + UNKNOWN = 1 + """The component is in an unknown or undefined condition. + + This is used when the state can be retrieved from the component but it doesn't match + any known state. + """ + + UNAVAILABLE = 2 + """The component is temporarily unavailable for operation.""" + + SWITCHING_OFF = 3 + """The component is in the process of switching off.""" + + OFF = 4 + """The component has successfully switched off.""" + + SWITCHING_ON = 5 + """The component is in the process of switching on.""" + + STANDBY = 6 + """The component is in standby mode and not immediately ready for operation.""" + + READY = 7 + """The component is fully operational and ready for use.""" + + CHARGING = 8 + """The component is actively consuming energy.""" + + DISCHARGING = 9 + """The component is actively producing or releasing energy.""" + + ERROR = 10 + """The component is in an error state and may need attention.""" + + EV_CHARGING_CABLE_UNPLUGGED = 20 + """The EV charging cable is unplugged from the charging station.""" + + EV_CHARGING_CABLE_PLUGGED_AT_STATION = 21 + """The EV charging cable is plugged into the charging station.""" + + EV_CHARGING_CABLE_PLUGGED_AT_EV = 22 + """The EV charging cable is plugged into the vehicle.""" + + EV_CHARGING_CABLE_LOCKED_AT_STATION = 23 + """The EV charging cable is locked at the charging station end.""" + + EV_CHARGING_CABLE_LOCKED_AT_EV = 24 + """The EV charging cable is locked at the vehicle end.""" + + RELAY_OPEN = 30 + """The relay is in an open state, meaning no current can flow through.""" + + RELAY_CLOSED = 31 + """The relay is in a closed state, allowing current to flow.""" + + PRECHARGER_OPEN = 40 + """The precharger circuit is open, meaning it's not currently active.""" + + PRECHARGER_PRECHARGING = 41 + """The precharger is in a precharging state, preparing the main circuit for activation.""" + + PRECHARGER_CLOSED = 42 + """The precharger circuit is closed, allowing full current to flow to the main circuit.""" + + +@enum.unique +class ComponentErrorCode(enum.Enum): + """The various errors that a component can report.""" + + UNSPECIFIED = 0 + """The error is unspecified (this should not be normally used).""" + + UNKNOWN = 1 + """The component is reporting an unknown or undefined error. + + This is used when the error can be retrieved from the component but it doesn't match + any known error. + """ + + SWITCH_ON_FAULT = 2 + """The component could not be switched on.""" + + UNDERVOLTAGE = 3 + """The component is operating under the minimum rated voltage.""" + + OVERVOLTAGE = 4 + """The component is operating over the maximum rated voltage.""" + + OVERCURRENT = 5 + """The component is drawing more current than the maximum rated value.""" + + OVERCURRENT_CHARGING = 6 + """The component's consumption current is over the maximum rated value during charging.""" + + OVERCURRENT_DISCHARGING = 7 + """The component's production current is over the maximum rated value during discharging.""" + + OVERTEMPERATURE = 8 + """The component is operating over the maximum rated temperature.""" + + UNDERTEMPERATURE = 9 + """The component is operating under the minimum rated temperature.""" + + HIGH_HUMIDITY = 10 + """The component is exposed to high humidity levels over the maximum rated value.""" + + FUSE_ERROR = 11 + """The component's fuse has blown.""" + + PRECHARGE_ERROR = 12 + """The component's precharge unit has failed.""" + + PLAUSIBILITY_ERROR = 13 + """Plausibility issues within the system involving this component.""" + + UNDERVOLTAGE_SHUTDOWN = 14 + """System shutdown due to undervoltage involving this component.""" + + EV_UNEXPECTED_PILOT_FAILURE = 15 + """Unexpected pilot failure in an electric vehicle component.""" + + FAULT_CURRENT = 16 + """Fault current detected in the component.""" + + SHORT_CIRCUIT = 17 + """Short circuit detected in the component.""" + + CONFIG_ERROR = 18 + """Configuration error related to the component.""" + + ILLEGAL_COMPONENT_STATE_CODE_REQUESTED = 19 + """Illegal state requested for the component.""" + + HARDWARE_INACCESSIBLE = 20 + """Hardware of the component is inaccessible.""" + + INTERNAL = 21 + """Internal error within the component.""" + + UNAUTHORIZED = 22 + """The component is unauthorized to perform the last requested action.""" + + EV_CHARGING_CABLE_UNPLUGGED_FROM_STATION = 40 + """EV cable was abruptly unplugged from the charging station.""" + + EV_CHARGING_CABLE_UNPLUGGED_FROM_EV = 41 + """EV cable was abruptly unplugged from the vehicle.""" + + EV_CHARGING_CABLE_LOCK_FAILED = 42 + """EV cable lock failure.""" + + EV_CHARGING_CABLE_INVALID = 43 + """Invalid EV cable.""" + + EV_CONSUMER_INCOMPATIBLE = 44 + """Incompatible EV plug.""" + + BATTERY_IMBALANCE = 50 + """Battery system imbalance.""" + + BATTERY_LOW_SOH = 51 + """Low state of health (SOH) detected in the battery.""" + + BATTERY_BLOCK_ERROR = 52 + """Battery block error.""" + + BATTERY_CONTROLLER_ERROR = 53 + """Battery controller error.""" + + BATTERY_RELAY_ERROR = 54 + """Battery relay error.""" + + BATTERY_CALIBRATION_NEEDED = 56 + """Battery calibration is needed.""" + + RELAY_CYCLE_LIMIT_REACHED = 60 + """Relays have been cycled for the maximum number of times.""" + + +@dataclass(frozen=True, kw_only=True) +class ComponentStateSample: + """A collection of the state, warnings, and errors for a component at a specific time.""" + + sampled_at: datetime + """The time at which this state was sampled.""" + + states: frozenset[ComponentStateCode | int] + """The set of states of the component. + + If the reported state is not known by the client (it could happen when using an + older version of the client with a newer version of the server), it will be + represented as an `int` and **not** the + [`ComponentStateCode.UNKNOWN`][frequenz.client.microgrid.component.ComponentStateCode.UNKNOWN] + value (this value is used only when the state is not known by the server). + """ + + warnings: frozenset[ComponentErrorCode | int] + """The set of warnings for the component.""" + + errors: frozenset[ComponentErrorCode | int] + """The set of errors for the component. + + This set will only contain errors if the component is in an error state. + """ diff --git a/src/frequenz/client/microgrid/component/_state_sample_proto.py b/src/frequenz/client/microgrid/component/_state_sample_proto.py new file mode 100644 index 0000000..e5f2f55 --- /dev/null +++ b/src/frequenz/client/microgrid/component/_state_sample_proto.py @@ -0,0 +1,35 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Loading of MetricSample and AggregatedMetricValue objects from protobuf messages.""" + +from frequenz.api.common.v1.microgrid.components import components_pb2 +from frequenz.client.base import conversion + +from .._util import enum_from_proto +from ._state_sample import ComponentErrorCode, ComponentStateCode, ComponentStateSample + + +def component_state_sample_from_proto( + message: components_pb2.ComponentState, +) -> ComponentStateSample: + """Convert a protobuf message to a `ComponentStateSample` object. + + Args: + message: The protobuf message to convert. + + Returns: + The resulting `ComponentStateSample` object. + """ + return ComponentStateSample( + sampled_at=conversion.to_datetime(message.sampled_at), + states=frozenset( + enum_from_proto(s, ComponentStateCode) for s in message.states + ), + warnings=frozenset( + enum_from_proto(w, ComponentErrorCode) for w in message.warnings + ), + errors=frozenset( + enum_from_proto(e, ComponentErrorCode) for e in message.errors + ), + ) diff --git a/src/frequenz/client/microgrid/metrics/_sample.py b/src/frequenz/client/microgrid/metrics/_sample.py new file mode 100644 index 0000000..47fb626 --- /dev/null +++ b/src/frequenz/client/microgrid/metrics/_sample.py @@ -0,0 +1,174 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Definition of a metric sample.""" + +import enum +from collections.abc import Sequence +from dataclasses import dataclass +from datetime import datetime +from typing import assert_never + +from ._bounds import Bounds +from ._metric import Metric + + +@enum.unique +class AggregationMethod(enum.Enum): + """The type of the aggregated value.""" + + AVG = "avg" + """The average value of the metric.""" + + MIN = "min" + """The minimum value of the metric.""" + + MAX = "max" + """The maximum value of the metric.""" + + +@dataclass(frozen=True, kw_only=True) +class AggregatedMetricValue: + """Encapsulates derived statistical summaries of a single metric. + + The message allows for the reporting of statistical summaries — minimum, + maximum, and average values - as well as the complete list of individual + samples if available. + + This message represents derived metrics and contains fields for statistical + summaries—minimum, maximum, and average values. Individual measurements are + are optional, accommodating scenarios where only subsets of this information + are available. + """ + + avg: float + """The derived average value of the metric.""" + + min: float | None + """The minimum measured value of the metric.""" + + max: float | None + """The maximum measured value of the metric.""" + + raw_values: Sequence[float] + """All the raw individual values (it might be empty if not provided by the component).""" + + def __str__(self) -> str: + """Return the short string representation of this instance.""" + extra: list[str] = [] + if self.min is not None: + extra.append(f"min:{self.min}") + if self.max is not None: + extra.append(f"max:{self.max}") + if len(self.raw_values) > 0: + extra.append(f"num_raw:{len(self.raw_values)}") + extra_str = f"<{' '.join(extra)}>" if extra else "" + return f"avg:{self.avg}{extra_str}" + + +@dataclass(frozen=True, kw_only=True) +class MetricSample: + """A sampled metric. + + This represents a single sample of a specific metric, the value of which is either + measured at a particular time. The real-time system-defined bounds are optional and + may not always be present or set. + + Note: Relationship Between Bounds and Metric Samples + Suppose a metric sample for active power has a lower-bound of -10,000 W, and an + upper-bound of 10,000 W. For the system to accept a charge command, clients need + to request current values within the bounds. + """ + + sampled_at: datetime + """The moment when the metric was sampled.""" + + metric: Metric | int + """The metric that was sampled.""" + + # In the protocol this is float | AggregatedMetricValue, but for live data we can't + # receive the AggregatedMetricValue, so we limit this to float for now. + value: float | AggregatedMetricValue | None + """The value of the sampled metric.""" + + bounds: list[Bounds] + """The bounds that apply to the metric sample. + + These bounds adapt in real-time to reflect the operating conditions at the time of + aggregation or derivation. + + In the case of certain components like batteries, multiple bounds might exist. These + multiple bounds collectively extend the range of allowable values, effectively + forming a union of all given bounds. In such cases, the value of the metric must be + within at least one of the bounds. + + In accordance with the passive sign convention, bounds that limit discharge would + have negative numbers, while those limiting charge, such as for the State of Power + (SoP) metric, would be positive. Hence bounds can have positive and negative values + depending on the metric they represent. + + Example: + The diagram below illustrates the relationship between the bounds. + + ``` + bound[0].lower bound[1].upper + <-------|============|------------------|============|---------> + bound[0].upper bound[1].lower + + ---- values here are disallowed and will be rejected + ==== values here are allowed and will be accepted + ``` + """ + + connection: str | None = None + """The electrical connection within the component from which the metric was sampled. + + This will be present when the same `Metric` can be obtained from multiple + electrical connections within the component. Knowing the connection can help in + certain control and monitoring applications. + + In cases where the component has just one connection for a metric, then the + connection is `None`. + + Example: + A hybrid inverter can have a DC string for a battery and another DC string for a + PV array. The connection names could resemble, say, `dc_battery_0` and + ``dc_pv_0`. A metric like DC voltage can be obtained from both connections. For + an application to determine the SoC of the battery using the battery voltage, + which connection the voltage metric was sampled from is important. + """ + + def as_single_value( + self, *, aggregation_method: AggregationMethod = AggregationMethod.AVG + ) -> float | None: + """Return the value of this sample as a single value. + + if [`value`][frequenz.client.microgrid.metric.MetricSample.value] is a `float`, + it is returned as is. If `value` is an + [`AggregatedMetricValue`][frequenz.client.microgrid.metric.AggregatedMetricValue], + the value is aggregated using the provided `aggregation_method`. + + Args: + aggregation_method: The method to use to aggregate the value when `value` is + a `AggregatedMetricValue`. + + Returns: + The value of the sample as a single value, or `None` if the value is `None`. + """ + match self.value: + case float() | int(): + return self.value + case AggregatedMetricValue(): + match aggregation_method: + case AggregationMethod.AVG: + return self.value.avg + case AggregationMethod.MIN: + return self.value.min + case AggregationMethod.MAX: + return self.value.max + case unexpected: + assert_never(unexpected) + case None: + return None + case unexpected: + assert_never(unexpected) diff --git a/src/frequenz/client/microgrid/metrics/_sample_proto.py b/src/frequenz/client/microgrid/metrics/_sample_proto.py new file mode 100644 index 0000000..2646185 --- /dev/null +++ b/src/frequenz/client/microgrid/metrics/_sample_proto.py @@ -0,0 +1,62 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Loading of MetricSample and AggregatedMetricValue objects from protobuf messages.""" + +from frequenz.api.common.v1.metrics import metric_sample_pb2 +from frequenz.client.base import conversion + +from frequenz.client.microgrid.metrics._bounds_proto import bounds_from_proto + +from .._util import enum_from_proto +from ..metrics._metric import Metric +from ._sample import AggregatedMetricValue, MetricSample + + +def aggregated_metric_sample_from_proto( + message: metric_sample_pb2.AggregatedMetricValue, +) -> AggregatedMetricValue: + """Convert a protobuf message to a `AggregatedMetricValue` object. + + Args: + message: The protobuf message to convert. + + Returns: + The resulting `AggregatedMetricValue` object. + """ + return AggregatedMetricValue( + avg=message.avg_value, + min=message.min_value, + max=message.max_value, + raw_values=message.raw_values, + ) + + +def metric_sample_from_proto( + message: metric_sample_pb2.MetricSample, +) -> MetricSample: + """Convert a protobuf message to a `MetricSample` object. + + Args: + message: The protobuf message to convert. + + Returns: + The resulting `MetricSample` object. + """ + value: float | AggregatedMetricValue | None = None + if message.HasField("value"): + match message.value.WhichOneof("metric_value_variant"): + case "simple_metric": + value = message.value.simple_metric.value + case "aggregated_metric": + value = aggregated_metric_sample_from_proto( + message.value.aggregated_metric + ) + + return MetricSample( + sampled_at=conversion.to_datetime(message.sampled_at), + metric=enum_from_proto(message.metric, Metric), + value=value, + bounds=[bounds_from_proto(b) for b in message.bounds], + connection=message.source, + )