From 7b4cef2abc2baee2cb097485df45cccad615a818 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Tue, 29 Oct 2024 14:17:45 +0100 Subject: [PATCH 1/2] Support disabling of receiver-overflow warnings This is required in cases where users are only interested in the latest value, for example. Signed-off-by: Sahas Subramanian --- src/frequenz/channels/_broadcast.py | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 96465e36..c1017cd7 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -267,7 +267,7 @@ def new_sender(self) -> Sender[ChannelMessageT]: return _Sender(self) def new_receiver( - self, *, name: str | None = None, limit: int = 50 + self, *, name: str | None = None, limit: int = 50, warn_on_overflow: bool = True ) -> Receiver[ChannelMessageT]: """Return a new receiver attached to this channel. @@ -278,11 +278,15 @@ def new_receiver( Args: name: A name to identify the receiver in the logs. limit: Number of messages the receiver can hold in its buffer. + warn_on_overflow: Whether to log a warning when the receiver's + buffer is full and a message is dropped. Returns: A new receiver attached to this channel. """ - recv: _Receiver[ChannelMessageT] = _Receiver(self, name=name, limit=limit) + recv: _Receiver[ChannelMessageT] = _Receiver( + self, name=name, limit=limit, warn_on_overflow=warn_on_overflow + ) self._receivers[hash(recv)] = weakref.ref(recv) if self.resend_latest and self._latest is not None: recv.enqueue(self._latest) @@ -371,7 +375,13 @@ class _Receiver(Receiver[_T]): """ def __init__( - self, channel: Broadcast[_T], /, *, name: str | None, limit: int + self, + channel: Broadcast[_T], + /, + *, + name: str | None, + limit: int, + warn_on_overflow: bool, ) -> None: """Initialize this receiver. @@ -387,7 +397,11 @@ def __init__( purposes, it will be shown in the string representation of the receiver. limit: Number of messages the receiver can hold in its buffer. + warn_on_overflow: Whether to log a warning when the receiver's + buffer is full and a message is dropped. """ + self._warn_on_overflow: bool = warn_on_overflow + self._name: str = name if name is not None else f"{id(self):_}" """The name to identify the receiver. @@ -412,10 +426,11 @@ def enqueue(self, message: _T, /) -> None: """ if len(self._q) == self._q.maxlen: self._q.popleft() - _logger.warning( - "Broadcast receiver [%s] is full. Oldest message was dropped.", - self, - ) + if self._warn_on_overflow: + _logger.warning( + "Broadcast receiver [%s] is full. Oldest message was dropped.", + self, + ) self._q.append(message) def __len__(self) -> int: From 8472db2d07ffb652f64d0e7ba6a45432c530b009 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Tue, 29 Oct 2024 14:22:13 +0100 Subject: [PATCH 2/2] Update release notes Signed-off-by: Sahas Subramanian --- RELEASE_NOTES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 96a0240b..41a0872e 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -10,7 +10,7 @@ ## New Features - +- Added support for disabling the overflow-warning log messagess in broadcast receivers, through the `warn_on_overflow` parameter on `Broadcast.new_receiver()` calls. ## Bug Fixes