diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index db4f8993e..4fee48cf2 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -13,6 +13,7 @@ * Original methods `{set_power/charge/discharge}` are now replaced by `propose_{power/charge/discharge}` * The `propose_*` methods send power proposals to the `PowerManagingActor`, where it can be overridden by proposals from other actors. * They no longer have the `adjust_power` flag, because the `PowerManagingActor` will always adjust power to fit within the available bounds. + * They no longer have a `include_broken_batteries` parameter. The feature has been removed. - `BatteryPool`'s reporting methods diff --git a/src/frequenz/sdk/actor/_power_managing/_base_classes.py b/src/frequenz/sdk/actor/_power_managing/_base_classes.py index 5740a7d60..578a6763f 100644 --- a/src/frequenz/sdk/actor/_power_managing/_base_classes.py +++ b/src/frequenz/sdk/actor/_power_managing/_base_classes.py @@ -104,20 +104,6 @@ class Proposal: request_timeout: datetime.timedelta = datetime.timedelta(seconds=5.0) """The maximum amount of time to wait for the request to be fulfilled.""" - include_broken_batteries: bool = False - """Whether to use all batteries included in the batteries set regardless the status. - - If set to `True`, the power distribution algorithm will consider all batteries, - including the broken ones, when distributing power. In such cases, any remaining - power after distributing among the available batteries will be distributed equally - among the unavailable (broken) batteries. If all batteries in the set are - unavailable, the power will be equally distributed among all the unavailable - batteries in the request. - - If set to `False`, the power distribution will only take into account the available - batteries, excluding any broken ones. - """ - def __lt__(self, other: Proposal) -> bool: """Compare two proposals by their priority. diff --git a/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py b/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py index 334df1f12..826dfa16d 100644 --- a/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py +++ b/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py @@ -157,9 +157,6 @@ async def _send_updated_target_power( request_timeout = ( proposal.request_timeout if proposal else timedelta(seconds=5.0) ) - include_broken_batteries = ( - proposal.include_broken_batteries if proposal else False - ) if target_power is not None: await self._power_distributing_requests_sender.send( power_distributing.Request( @@ -167,7 +164,6 @@ async def _send_updated_target_power( batteries=battery_ids, request_timeout=request_timeout, adjust_power=True, - include_broken_batteries=include_broken_batteries, ) ) diff --git a/src/frequenz/sdk/actor/power_distributing/power_distributing.py b/src/frequenz/sdk/actor/power_distributing/power_distributing.py index c9c52ebca..d23a67df0 100644 --- a/src/frequenz/sdk/actor/power_distributing/power_distributing.py +++ b/src/frequenz/sdk/actor/power_distributing/power_distributing.py @@ -14,14 +14,12 @@ import asyncio import logging -import time from asyncio.tasks import ALL_COMPLETED from collections import abc from collections.abc import Iterable -from dataclasses import dataclass from datetime import timedelta from math import isnan -from typing import Any, Self, TypeVar, cast +from typing import Any, TypeVar, cast import grpc from frequenz.channels import Peekable, Receiver, Sender @@ -51,40 +49,6 @@ _logger = logging.getLogger(__name__) -@dataclass -class _CacheEntry: - """Represents an entry in the cache with expiry time.""" - - inv_bat_pair: InvBatPair - """The inverter and adjacent battery data pair.""" - - expiry_time: int - """The expiration time (taken from the monotonic clock) of the cache entry.""" - - @classmethod - def from_ttl( - cls, inv_bat_pair: InvBatPair, ttl: timedelta = timedelta(hours=2.5) - ) -> Self: - """Initialize a CacheEntry instance from a TTL (Time-To-Live). - - Args: - inv_bat_pair: the inverter and adjacent battery data pair to cache. - ttl: the time a cache entry is kept alive. - - Returns: - this class instance. - """ - return cls(inv_bat_pair, time.monotonic_ns() + int(ttl.total_seconds() * 1e9)) - - def has_expired(self) -> bool: - """Check whether the cache entry has expired. - - Returns: - whether the cache entry has expired. - """ - return time.monotonic_ns() >= self.expiry_time - - def _get_all(source: dict[int, frozenset[int]], keys: abc.Set[int]) -> set[int]: """Get all values for the given keys from the given map. @@ -180,10 +144,6 @@ def __init__( max_data_age_sec=10.0, ) - self._cached_metrics: dict[frozenset[int], _CacheEntry | None] = { - bat_ids: None for bat_ids in self._bat_bats_map.values() - } - def _get_bounds( self, pairs_data: list[InvBatPair], @@ -254,13 +214,13 @@ async def _run(self) -> None: # pylint: disable=too-many-locals async for request in self._requests_receiver: try: pairs_data: list[InvBatPair] = self._get_components_data( - request.batteries, request.include_broken_batteries + request.batteries ) except KeyError as err: await self._result_sender.send(Error(request=request, msg=str(err))) continue - if not pairs_data and not request.include_broken_batteries: + if not pairs_data: error_msg = ( "No data for at least one of the given " f"batteries {str(request.batteries)}" @@ -390,24 +350,10 @@ def _get_power_distribution( ]: unavailable_inv_ids = unavailable_inv_ids.union(inverter_ids) - if request.include_broken_batteries and not available_bat_ids: - return self.distribution_algorithm.distribute_power_equally( - request.power.as_watts(), unavailable_inv_ids - ) - result = self.distribution_algorithm.distribute_power( request.power.as_watts(), inv_bat_pairs ) - if request.include_broken_batteries and unavailable_inv_ids: - additional_result = self.distribution_algorithm.distribute_power_equally( - result.remaining_power, unavailable_inv_ids - ) - - for inv_id, power in additional_result.distribution.items(): - result.distribution[inv_id] = power - result.remaining_power = 0.0 - return result def _check_request( @@ -526,15 +472,11 @@ def _get_components_pairs( {k: frozenset(v) for k, v in inv_invs_map.items()}, ) - def _get_components_data( - self, batteries: abc.Set[int], include_broken: bool - ) -> list[InvBatPair]: + def _get_components_data(self, batteries: abc.Set[int]) -> list[InvBatPair]: """Get data for the given batteries and adjacent inverters. Args: batteries: Batteries that needs data. - include_broken: whether all batteries in the batteries set in the - request must be used regardless the status. Raises: KeyError: If any battery in the given list doesn't exists in microgrid. @@ -544,11 +486,7 @@ def _get_components_data( """ pairs_data: list[InvBatPair] = [] - working_batteries = ( - batteries - if include_broken - else self._all_battery_status.get_working_batteries(batteries) - ) + working_batteries = self._all_battery_status.get_working_batteries(batteries) for battery_id in working_batteries: if battery_id not in self._battery_receivers: @@ -579,12 +517,6 @@ def _get_components_data( inverter_ids: frozenset[int] = self._bat_inv_map[next(iter(battery_ids))] data = self._get_battery_inverter_data(battery_ids, inverter_ids) - if not data and include_broken: - cached_entry = self._cached_metrics[battery_ids] - if cached_entry and not cached_entry.has_expired(): - data = cached_entry.inv_bat_pair - else: - data = None if data is None: _logger.warning( "Skipping battery set %s because at least one of its messages isn't correct.", @@ -669,9 +601,7 @@ def nan_metric_in_list(data: list[DataType], metrics: list[str]) -> bool: ) return None - inv_bat_pair = InvBatPair(AggregatedBatteryData(battery_data), inverter_data) - self._cached_metrics[battery_ids] = _CacheEntry.from_ttl(inv_bat_pair) - return inv_bat_pair + return InvBatPair(AggregatedBatteryData(battery_data), inverter_data) async def _create_channels(self) -> None: """Create channels to get data of components in microgrid.""" diff --git a/src/frequenz/sdk/actor/power_distributing/request.py b/src/frequenz/sdk/actor/power_distributing/request.py index 9ed882abc..c206bd43f 100644 --- a/src/frequenz/sdk/actor/power_distributing/request.py +++ b/src/frequenz/sdk/actor/power_distributing/request.py @@ -32,17 +32,3 @@ class Request: If `False` and the power is outside the batteries' bounds, the request will fail and be replied to with an `OutOfBound` result. """ - - include_broken_batteries: bool = False - """Whether to use all batteries included in the batteries set regardless the status. - - If set to `True`, the power distribution algorithm will consider all batteries, - including the broken ones, when distributing power. In such cases, any remaining - power after distributing among the available batteries will be distributed equally - among the unavailable (broken) batteries. If all batteries in the set are - unavailable, the power will be equally distributed among all the unavailable - batteries in the request. - - If set to `False`, the power distribution will only take into account the available - batteries, excluding any broken ones. - """ diff --git a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py index 6db9304f6..224a3dc54 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py @@ -79,7 +79,6 @@ async def propose_power( power: Power | None, *, request_timeout: timedelta = timedelta(seconds=5.0), - include_broken_batteries: bool = False, bounds: timeseries.Bounds[Power | None] = timeseries.Bounds(None, None), ) -> None: """Send a proposal to the power manager for the pool's set of batteries. @@ -112,11 +111,6 @@ async def propose_power( specified. If both are `None`, it is equivalent to not having a proposal or withdrawing a previous one. request_timeout: The timeout for the request. - include_broken_batteries: if True, the power will be set for all batteries - in the pool, including the broken ones. If False, then the power will be - set only for the working batteries. This is not a guarantee that the - power will be set for all working batteries, as the microgrid API may - still reject the request. bounds: The power bounds for the proposal. These bounds will apply to actors with a lower priority, and can be overridden by bounds from actors with a higher priority. If None, the power bounds will be set @@ -131,7 +125,6 @@ async def propose_power( battery_ids=self._battery_pool._batteries, priority=self._priority, request_timeout=request_timeout, - include_broken_batteries=include_broken_batteries, ) ) @@ -140,7 +133,6 @@ async def propose_charge( power: Power | None, *, request_timeout: timedelta = timedelta(seconds=5.0), - include_broken_batteries: bool = False, ) -> None: """Set the given charge power for the batteries in the pool. @@ -164,11 +156,6 @@ async def propose_charge( If None, the proposed power of higher priority actors will take precedence as the target power. request_timeout: The timeout for the request. - include_broken_batteries: if True, the power will be set for all batteries - in the pool, including the broken ones. If False, then the power will be - set only for the working batteries. This is not a guarantee that the - power will be set for all working batteries, as the microgrid API may - still reject the request. Raises: ValueError: If the given power is negative. @@ -183,7 +170,6 @@ async def propose_charge( battery_ids=self._battery_pool._batteries, priority=self._priority, request_timeout=request_timeout, - include_broken_batteries=include_broken_batteries, ) ) @@ -192,7 +178,6 @@ async def propose_discharge( power: Power | None, *, request_timeout: timedelta = timedelta(seconds=5.0), - include_broken_batteries: bool = False, ) -> None: """Set the given discharge power for the batteries in the pool. @@ -216,11 +201,6 @@ async def propose_discharge( pool. If None, the proposed power of higher priority actors will take precedence as the target power. request_timeout: The timeout for the request. - include_broken_batteries: if True, the power will be set for all batteries - in the pool, including the broken ones. If False, then the power will be - set only for the working batteries. This is not a guarantee that the - power will be set for all working batteries, as the microgrid API may - still reject the request. Raises: ValueError: If the given power is negative. @@ -235,7 +215,6 @@ async def propose_discharge( battery_ids=self._battery_pool._batteries, priority=self._priority, request_timeout=request_timeout, - include_broken_batteries=include_broken_batteries, ) ) diff --git a/tests/actor/power_distributing/test_power_distributing.py b/tests/actor/power_distributing/test_power_distributing.py index 6e042dcbb..c8729d9c6 100644 --- a/tests/actor/power_distributing/test_power_distributing.py +++ b/tests/actor/power_distributing/test_power_distributing.py @@ -1215,260 +1215,3 @@ async def test_not_all_batteries_are_working(self, mocker: MockerFixture) -> Non assert result.excess_power.isclose(Power.from_watts(700.0)) assert result.succeeded_power.isclose(Power.from_watts(500.0)) assert result.request == request - - async def test_use_all_batteries_none_is_working( - self, mocker: MockerFixture - ) -> None: - """Test all batteries are used if none of them works.""" - mockgrid = MockMicrogrid(grid_meter=False) - mockgrid.add_batteries(3) - await mockgrid.start(mocker) - await self.init_component_data(mockgrid) - - mocker.patch("asyncio.sleep", new_callable=AsyncMock) - - attrs: dict[str, set[int]] = {"get_working_batteries.return_value": set()} - mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing.BatteryPoolStatus", - return_value=MagicMock(spec=BatteryPoolStatus, **attrs), - ) - - requests_channel = Broadcast[Request]("power_distributor requests") - results_channel = Broadcast[Result]("power_distributor results") - - battery_status_channel = Broadcast[BatteryStatus]("battery_status") - async with PowerDistributingActor( - requests_receiver=requests_channel.new_receiver(), - results_sender=results_channel.new_sender(), - battery_status_sender=battery_status_channel.new_sender(), - ): - request = Request( - power=Power.from_kilowatts(1.2), - batteries={9, 19}, - request_timeout=SAFETY_TIMEOUT, - include_broken_batteries=True, - ) - - await requests_channel.new_sender().send(request) - result_rx = results_channel.new_receiver() - - done, pending = await asyncio.wait( - [asyncio.create_task(result_rx.receive())], - timeout=SAFETY_TIMEOUT.total_seconds(), - ) - - assert len(pending) == 0 - assert len(done) == 1 - result = done.pop().result() - assert isinstance(result, Success) - assert result.succeeded_batteries == {9, 19} - assert result.excess_power.isclose(Power.from_watts(200.0)) - assert result.succeeded_power.isclose(Power.from_kilowatts(1.0)) - assert result.request == request - - async def test_force_request_a_battery_is_not_working( - self, mocker: MockerFixture - ) -> None: - """Test force request when a battery is not working.""" - mockgrid = MockMicrogrid(grid_meter=False) - mockgrid.add_batteries(3) - await mockgrid.start(mocker) - await self.init_component_data(mockgrid) - - mocker.patch("asyncio.sleep", new_callable=AsyncMock) - - batteries = {9, 19} - - attrs = {"get_working_batteries.return_value": batteries - {9}} - mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing.BatteryPoolStatus", - return_value=MagicMock(spec=BatteryPoolStatus, **attrs), - ) - - requests_channel = Broadcast[Request]("power_distributor requests") - results_channel = Broadcast[Result]("power_distributor results") - - battery_status_channel = Broadcast[BatteryStatus]("battery_status") - async with PowerDistributingActor( - requests_receiver=requests_channel.new_receiver(), - results_sender=results_channel.new_sender(), - battery_status_sender=battery_status_channel.new_sender(), - ): - request = Request( - power=Power.from_kilowatts(1.2), - batteries=batteries, - request_timeout=SAFETY_TIMEOUT, - include_broken_batteries=True, - ) - - await requests_channel.new_sender().send(request) - result_rx = results_channel.new_receiver() - - done, pending = await asyncio.wait( - [asyncio.create_task(result_rx.receive())], - timeout=SAFETY_TIMEOUT.total_seconds(), - ) - - assert len(pending) == 0 - assert len(done) == 1 - result = done.pop().result() - assert isinstance(result, Success) - assert result.succeeded_batteries == {9, 19} - assert result.excess_power.isclose(Power.from_watts(200.0)) - assert result.succeeded_power.isclose(Power.from_kilowatts(1.0)) - assert result.request == request - - async def test_force_request_battery_nan_value_non_cached( - self, mocker: MockerFixture - ) -> None: - """Test battery with NaN in SoC, capacity or power is used if request is forced.""" - # pylint: disable=too-many-locals - mockgrid = MockMicrogrid(grid_meter=False) - mockgrid.add_batteries(3) - await mockgrid.start(mocker) - await self.init_component_data(mockgrid) - - mocker.patch("asyncio.sleep", new_callable=AsyncMock) - - batteries = {9, 19} - - attrs = {"get_working_batteries.return_value": batteries} - mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing.BatteryPoolStatus", - return_value=MagicMock(spec=BatteryPoolStatus, **attrs), - ) - - requests_channel = Broadcast[Request]("power_distributor requests") - results_channel = Broadcast[Result]("power_distributor results") - - battery_status_channel = Broadcast[BatteryStatus]("battery_status") - async with PowerDistributingActor( - requests_receiver=requests_channel.new_receiver(), - results_sender=results_channel.new_sender(), - battery_status_sender=battery_status_channel.new_sender(), - ): - request = Request( - power=Power.from_kilowatts(1.2), - batteries=batteries, - request_timeout=SAFETY_TIMEOUT, - include_broken_batteries=True, - ) - - batteries_data = ( - battery_msg( - 9, - soc=Metric(math.nan, Bound(20, 80)), - capacity=Metric(math.nan), - power=PowerBounds(-1000, 0, 0, 1000), - ), - battery_msg( - 19, - soc=Metric(40, Bound(20, 80)), - capacity=Metric(math.nan), - power=PowerBounds(-1000, 0, 0, 1000), - ), - ) - - for battery in batteries_data: - await mockgrid.mock_client.send(battery) - - await requests_channel.new_sender().send(request) - result_rx = results_channel.new_receiver() - - done, pending = await asyncio.wait( - [asyncio.create_task(result_rx.receive())], - timeout=SAFETY_TIMEOUT.total_seconds(), - ) - - assert len(pending) == 0 - assert len(done) == 1 - result: Result = done.pop().result() - assert isinstance(result, Success) - assert result.succeeded_batteries == batteries - assert result.succeeded_power.isclose(Power.from_kilowatts(1.2)) - assert result.excess_power.isclose(Power.zero(), abs_tol=1e-9) - assert result.request == request - - async def test_force_request_batteries_nan_values_cached( - self, mocker: MockerFixture - ) -> None: - """Test battery with NaN in SoC, capacity or power is used if request is forced.""" - mockgrid = MockMicrogrid(grid_meter=False) - mockgrid.add_batteries(3) - await mockgrid.start(mocker) - await self.init_component_data(mockgrid) - - mocker.patch("asyncio.sleep", new_callable=AsyncMock) - - batteries = {9, 19, 29} - - attrs = {"get_working_batteries.return_value": batteries} - mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing.BatteryPoolStatus", - return_value=MagicMock(spec=BatteryPoolStatus, **attrs), - ) - - requests_channel = Broadcast[Request]("power_distributor requests") - results_channel = Broadcast[Result]("power_distributor results") - - battery_status_channel = Broadcast[BatteryStatus]("battery_status") - async with PowerDistributingActor( - requests_receiver=requests_channel.new_receiver(), - results_sender=results_channel.new_sender(), - battery_status_sender=battery_status_channel.new_sender(), - ): - request = Request( - power=Power.from_kilowatts(1.2), - batteries=batteries, - request_timeout=SAFETY_TIMEOUT, - include_broken_batteries=True, - ) - - result_rx = results_channel.new_receiver() - - async def test_result() -> None: - done, pending = await asyncio.wait( - [asyncio.create_task(result_rx.receive())], - timeout=SAFETY_TIMEOUT.total_seconds(), - ) - assert len(pending) == 0 - assert len(done) == 1 - result: Result = done.pop().result() - assert isinstance(result, Success) - assert result.succeeded_batteries == batteries - assert result.succeeded_power.isclose(Power.from_kilowatts(1.2)) - assert result.excess_power.isclose(Power.zero(), abs_tol=1e-9) - assert result.request == request - - batteries_data = ( - battery_msg( - 9, - soc=Metric(math.nan, Bound(20, 80)), - capacity=Metric(98000), - power=PowerBounds(-1000, 0, 0, 1000), - ), - battery_msg( - 19, - soc=Metric(40, Bound(20, 80)), - capacity=Metric(math.nan), - power=PowerBounds(-1000, 0, 0, 1000), - ), - battery_msg( - 29, - soc=Metric(40, Bound(20, 80)), - capacity=Metric(float(98000)), - power=PowerBounds(math.nan, 0, 0, math.nan), - ), - ) - - # This request is needed to set the battery metrics cache to have valid - # metrics so that the distribution algorithm can be used in the next - # request where the batteries report NaN in the metrics. - await requests_channel.new_sender().send(request) - await test_result() - - for battery in batteries_data: - await mockgrid.mock_client.send(battery) - - await requests_channel.new_sender().send(request) - await test_result()