Skip to content

Commit

Permalink
Support for disabling overflow-warning logs in broadcast receivers (#329
Browse files Browse the repository at this point in the history
)
  • Loading branch information
shsms authored Dec 9, 2024
2 parents 718ce9d + 8472db2 commit 565084d
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 8 deletions.
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
- Added support for disabling the overflow-warning log messagess in broadcast receivers, through the `warn_on_overflow` parameter on `Broadcast.new_receiver()` calls.

## Bug Fixes

Expand Down
29 changes: 22 additions & 7 deletions src/frequenz/channels/_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def new_sender(self) -> Sender[ChannelMessageT]:
return _Sender(self)

def new_receiver(
self, *, name: str | None = None, limit: int = 50
self, *, name: str | None = None, limit: int = 50, warn_on_overflow: bool = True
) -> Receiver[ChannelMessageT]:
"""Return a new receiver attached to this channel.
Expand All @@ -278,11 +278,15 @@ def new_receiver(
Args:
name: A name to identify the receiver in the logs.
limit: Number of messages the receiver can hold in its buffer.
warn_on_overflow: Whether to log a warning when the receiver's
buffer is full and a message is dropped.
Returns:
A new receiver attached to this channel.
"""
recv: _Receiver[ChannelMessageT] = _Receiver(self, name=name, limit=limit)
recv: _Receiver[ChannelMessageT] = _Receiver(
self, name=name, limit=limit, warn_on_overflow=warn_on_overflow
)
self._receivers[hash(recv)] = weakref.ref(recv)
if self.resend_latest and self._latest is not None:
recv.enqueue(self._latest)
Expand Down Expand Up @@ -371,7 +375,13 @@ class _Receiver(Receiver[_T]):
"""

def __init__(
self, channel: Broadcast[_T], /, *, name: str | None, limit: int
self,
channel: Broadcast[_T],
/,
*,
name: str | None,
limit: int,
warn_on_overflow: bool,
) -> None:
"""Initialize this receiver.
Expand All @@ -387,7 +397,11 @@ def __init__(
purposes, it will be shown in the string representation of the
receiver.
limit: Number of messages the receiver can hold in its buffer.
warn_on_overflow: Whether to log a warning when the receiver's
buffer is full and a message is dropped.
"""
self._warn_on_overflow: bool = warn_on_overflow

self._name: str = name if name is not None else f"{id(self):_}"
"""The name to identify the receiver.
Expand All @@ -412,10 +426,11 @@ def enqueue(self, message: _T, /) -> None:
"""
if len(self._q) == self._q.maxlen:
self._q.popleft()
_logger.warning(
"Broadcast receiver [%s] is full. Oldest message was dropped.",
self,
)
if self._warn_on_overflow:
_logger.warning(
"Broadcast receiver [%s] is full. Oldest message was dropped.",
self,
)
self._q.append(message)

def __len__(self) -> int:
Expand Down

0 comments on commit 565084d

Please sign in to comment.