Skip to content

Commit

Permalink
Merge branch 'master' into 7077-unboundedSender-len-method_implementa…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
Archibajl authored Jan 15, 2025
2 parents 4ede5b9 + a82bdee commit 35645a0
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 10 deletions.
13 changes: 5 additions & 8 deletions tokio/src/sync/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -825,17 +825,14 @@ impl<T> Sender<T> {
/// let (tx, mut rx1) = broadcast::channel::<u32>(16);
/// let mut rx2 = tx.subscribe();
///
/// tokio::spawn(async move {
/// assert_eq!(rx1.recv().await.unwrap(), 10);
/// });
///
/// let _ = tx.send(10);
/// assert!(tx.closed().now_or_never().is_none());
///
/// let _ = tokio::spawn(async move {
/// assert_eq!(rx2.recv().await.unwrap(), 10);
/// }).await;
/// assert_eq!(rx1.recv().await.unwrap(), 10);
/// drop(rx1);
/// assert!(tx.closed().now_or_never().is_none());
///
/// assert_eq!(rx2.recv().await.unwrap(), 10);
/// drop(rx2);
/// assert!(tx.closed().now_or_never().is_some());
/// }
/// ```
Expand Down
28 changes: 26 additions & 2 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,10 +490,34 @@ impl<T, S: Semaphore> Drop for Rx<T, S> {

self.inner.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
struct Guard<'a, T, S: Semaphore> {
list: &'a mut list::Rx<T>,
tx: &'a list::Tx<T>,
sem: &'a S,
}

impl<'a, T, S: Semaphore> Guard<'a, T, S> {
fn drain(&mut self) {
// call T's destructor.
while let Some(Value(_)) = self.list.pop(self.tx) {
self.sem.add_permit();
}
}
}

while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) {
self.inner.semaphore.add_permit();
impl<'a, T, S: Semaphore> Drop for Guard<'a, T, S> {
fn drop(&mut self) {
self.drain();
}
}

let mut guard = Guard {
list: &mut rx_fields.list,
tx: &self.inner.tx,
sem: &self.inner.semaphore,
};

guard.drain();
});
}
}
Expand Down
46 changes: 46 additions & 0 deletions tokio/tests/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1454,4 +1454,50 @@ async fn test_is_empty_32_msgs() {
}
}

#[test]
#[cfg(not(panic = "abort"))]
fn drop_all_elements_during_panic() {
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;

static COUNTER: AtomicUsize = AtomicUsize::new(0);

struct A(bool);
impl Drop for A {
// cause a panic when inner value is `true`.
fn drop(&mut self) {
COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if self.0 {
panic!("panic!")
}
}
}

fn func(tx: UnboundedSender<A>, rx: UnboundedReceiver<A>) {
tx.send(A(true)).unwrap();
tx.send(A(false)).unwrap();
tx.send(A(false)).unwrap();

drop(rx);

// `mpsc::Rx`'s drop is called and gets panicked while dropping the first value,
// but will keep dropping following elements.
}

let (tx, rx) = mpsc::unbounded_channel();

let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
func(tx.clone(), rx);
}));

// all A's destructor should be called at this point, even before `mpsc::Chan`'s
// drop gets called.
assert_eq!(COUNTER.load(Relaxed), 3);

drop(tx);
// `mpsc::Chan`'s drop is called, freeing the `Block` memory allocation.
}

fn is_debug<T: fmt::Debug>(_: &T) {}

0 comments on commit 35645a0

Please sign in to comment.