From 4c0c320eedc6d04c35e77ff41ec86cb786630894 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Fri, 28 Jun 2024 14:54:44 +0200 Subject: [PATCH 1/3] Fix typos in `_Mapper` method docstrings Signed-off-by: Sahas Subramanian --- src/frequenz/channels/_receiver.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/frequenz/channels/_receiver.py b/src/frequenz/channels/_receiver.py index 8e30d56c..4704529e 100644 --- a/src/frequenz/channels/_receiver.py +++ b/src/frequenz/channels/_receiver.py @@ -336,9 +336,9 @@ def consume(self) -> MappedMessageT_co: # noqa: DOC502 ) # pylint: disable=protected-access def __str__(self) -> str: - """Return a string representation of the timer.""" + """Return a string representation of the mapper.""" return f"{type(self).__name__}:{self._receiver}:{self._mapping_function}" def __repr__(self) -> str: - """Return a string representation of the timer.""" + """Return a string representation of the mapper.""" return f"{type(self).__name__}({self._receiver!r}, {self._mapping_function!r})" From 44fc2d9ac42014cd59e4969bd0ce97082e1bfff7 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 27 Jun 2024 15:05:00 +0200 Subject: [PATCH 2/3] Support filtering the messages on a receiver Signed-off-by: Sahas Subramanian --- src/frequenz/channels/_receiver.py | 110 +++++++++++++++++++++++++++++ tests/test_anycast.py | 17 +++++ tests/test_broadcast.py | 17 +++++ 3 files changed, 144 insertions(+) diff --git a/src/frequenz/channels/_receiver.py b/src/frequenz/channels/_receiver.py index 4704529e..dbb533f3 100644 --- a/src/frequenz/channels/_receiver.py +++ b/src/frequenz/channels/_receiver.py @@ -242,6 +242,25 @@ def map( """ return _Mapper(receiver=self, mapping_function=mapping_function) + def filter( + self, filter_function: Callable[[ReceiverMessageT_co], bool], / + ) -> Receiver[ReceiverMessageT_co]: + """Apply a filter function on the messages on a receiver. + + Tip: + The returned receiver type won't have all the methods of the original + receiver. If you need to access methods of the original receiver that are + not part of the `Receiver` interface you should save a reference to the + original receiver and use that instead. + + Args: + filter_function: The function to be applied on incoming messages. + + Returns: + A new receiver that applies the function on the received messages. + """ + return _Filter(receiver=self, filter_function=filter_function) + class ReceiverError(Error, Generic[ReceiverMessageT_co]): """An error that originated in a [Receiver][frequenz.channels.Receiver]. @@ -342,3 +361,94 @@ def __str__(self) -> str: def __repr__(self) -> str: """Return a string representation of the mapper.""" return f"{type(self).__name__}({self._receiver!r}, {self._mapping_function!r})" + + +class _Sentinel: + """A sentinel object to represent no value received yet.""" + + def __str__(self) -> str: + """Return a string representation of this sentinel.""" + return "" + + def __repr__(self) -> str: + """Return a string representation of this sentinel.""" + return "" + + +_SENTINEL = _Sentinel() + + +class _Filter(Receiver[ReceiverMessageT_co], Generic[ReceiverMessageT_co]): + """Apply a filter function on the messages on a receiver.""" + + def __init__( + self, + *, + receiver: Receiver[ReceiverMessageT_co], + filter_function: Callable[[ReceiverMessageT_co], bool], + ) -> None: + """Initialize this receiver filter. + + Args: + receiver: The input receiver. + filter_function: The function to apply on the input data. + """ + self._receiver: Receiver[ReceiverMessageT_co] = receiver + """The input receiver.""" + + self._filter_function: Callable[[ReceiverMessageT_co], bool] = filter_function + """The function to apply on the input data.""" + + self._next_message: ReceiverMessageT_co | _Sentinel = _SENTINEL + + self._recv_closed = False + + async def ready(self) -> bool: + """Wait until the receiver is ready with a message or an error. + + Once a call to `ready()` has finished, the message should be read with + a call to `consume()` (`receive()` or iterated over). The receiver will + remain ready (this method will return immediately) until it is + consumed. + + Returns: + Whether the receiver is still active. + """ + while await self._receiver.ready(): + message = self._receiver.consume() + if self._filter_function(message): + self._next_message = message + return True + self._recv_closed = True + return False + + def consume(self) -> ReceiverMessageT_co: + """Return a transformed message once `ready()` is complete. + + Returns: + The next message that was received. + + Raises: + ReceiverStoppedError: If the receiver stopped producing messages. + ReceiverError: If there is a problem with the receiver. + """ + if self._recv_closed: + raise ReceiverStoppedError(self) + assert not isinstance( + self._next_message, _Sentinel + ), "`consume()` must be preceded by a call to `ready()`" + + message = self._next_message + self._next_message = _SENTINEL + return message + + def __str__(self) -> str: + """Return a string representation of the filter.""" + return f"{type(self).__name__}:{self._receiver}:{self._filter_function}" + + def __repr__(self) -> str: + """Return a string representation of the filter.""" + return ( + f"<{type(self).__name__} receiver={self._receiver!r} " + f"filter={self._filter_function!r} next_message={self._next_message!r}>" + ) diff --git a/tests/test_anycast.py b/tests/test_anycast.py index b6dca400..c6db0d9a 100644 --- a/tests/test_anycast.py +++ b/tests/test_anycast.py @@ -200,3 +200,20 @@ async def test_anycast_map() -> None: assert (await receiver.receive()) is False assert (await receiver.receive()) is True + + +async def test_anycast_filter() -> None: + """Ensure filter keeps only the messages that pass the filter.""" + chan = Anycast[int](name="input-chan") + sender = chan.new_sender() + + # filter out all numbers less than 10. + receiver: Receiver[int] = chan.new_receiver().filter(lambda num: num > 10) + + await sender.send(8) + await sender.send(12) + await sender.send(5) + await sender.send(15) + + assert (await receiver.receive()) == 12 + assert (await receiver.receive()) == 15 diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index 3c8eff3e..f480d194 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -231,6 +231,23 @@ async def test_broadcast_map() -> None: assert (await receiver.receive()) is True +async def test_broadcast_filter() -> None: + """Ensure filter keeps only the messages that pass the filter.""" + chan = Broadcast[int](name="input-chan") + sender = chan.new_sender() + + # filter out all numbers less than 10. + receiver: Receiver[int] = chan.new_receiver().filter(lambda num: num > 10) + + await sender.send(8) + await sender.send(12) + await sender.send(5) + await sender.send(15) + + assert (await receiver.receive()) == 12 + assert (await receiver.receive()) == 15 + + async def test_broadcast_receiver_drop() -> None: """Ensure deleted receivers get cleaned up.""" chan = Broadcast[int](name="input-chan") From 068b04e2e221e60146f9be2a0b6d83e2cff1c2d3 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 27 Jun 2024 15:08:18 +0200 Subject: [PATCH 3/3] Update release notes Signed-off-by: Sahas Subramanian --- RELEASE_NOTES.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index ffb81a3f..cea532dc 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -16,6 +16,8 @@ - **Experimental**: `Pipe`, which provides a pipe between two channels, by connecting a `Receiver` to a `Sender`. +- `Receiver`s now have a `filter` method that applies a filter function on the messages on a receiver. + ## Bug Fixes