From ed9b9eb9da09ba102b0ce06303e3ca7377a4a6b3 Mon Sep 17 00:00:00 2001 From: alexlapa <36732824+alexlapa@users.noreply.github.com> Date: Thu, 26 Sep 2024 17:42:40 +0300 Subject: [PATCH] Fix RTX stops working after packet loss spike --- CHANGELOG.md | 1 + src/streams/send.rs | 3 +++ src/util/value_history.rs | 47 +++++++++++++++++++++++++++++++-------- 3 files changed, 42 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f557876a..545a4e2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ * Ensure compatibility with some 32-bit targets #533 * Fix bug using unreliable channels by default #548 * New add_channel_with_config() for configured data channels #548 + * Fix RTX stops working after packet loss spike #566 # 0.6.1 * Force openssl to be >=0.10.66 #545 diff --git a/src/streams/send.rs b/src/streams/send.rs index 4b6dab41..c6c891b1 100644 --- a/src/streams/send.rs +++ b/src/streams/send.rs @@ -530,6 +530,9 @@ impl StreamTx { } // bytes stats refer to the last second by default + self.stats.bytes_transmitted.purge_old(now); + self.stats.bytes_retransmitted.purge_old(now); + let bytes_transmitted = self.stats.bytes_transmitted.sum(); let bytes_retransmitted = self.stats.bytes_retransmitted.sum(); let ratio = bytes_retransmitted as f32 / (bytes_retransmitted + bytes_transmitted) as f32; diff --git a/src/util/value_history.rs b/src/util/value_history.rs index 82ee486e..2feb0b03 100644 --- a/src/util/value_history.rs +++ b/src/util/value_history.rs @@ -35,23 +35,26 @@ where pub fn push(&mut self, t: Instant, v: T) { self.value += v; self.history.push_back((t, v)); - self.drain(t); } - /// Returns the sum of all values in the history up to max_time - /// This is more efficient than sum_since() as it does not need to iterate over the history + /// Returns the sum of all values in the history up to max_time. Might + /// return stale value unless [`ValueHistory::purge_old`] is called before. pub fn sum(&self) -> T { self.value } - fn drain(&mut self, t: Instant) -> Option<()> { - while t.duration_since(self.history.front()?.0) > self.max_time { + /// Recalculates sum purging values older than `now - max_time`. + pub fn purge_old(&mut self, now: Instant) { + while { + let Some(front_t) = self.history.front().map(|v| v.0) else { + return; + }; + now.duration_since(front_t) > self.max_time + } { if let Some((_, v)) = self.history.pop_front() { self.value -= v; } } - - Some(()) } } @@ -63,7 +66,7 @@ mod test { use super::ValueHistory; #[test] - fn test() { + fn with_value_test() { let now = Instant::now(); let mut h = ValueHistory { @@ -72,11 +75,37 @@ mod test { ..Default::default() }; + assert_eq!(h.sum(), 11); + h.purge_old(now); assert_eq!(h.sum(), 11); h.push(now - Duration::from_millis(1500), 22); h.push(now - Duration::from_millis(500), 22); - assert_eq!(h.sum(), 55); + assert_eq!(h.sum(), 11 + 22 + 22); + h.purge_old(now); + assert_eq!(h.sum(), 11 + 22); h.push(now, 0); + assert_eq!(h.sum(), 11 + 22); + } + + #[test] + fn test() { + let now = Instant::now(); + let mut h = ValueHistory::default(); + + assert_eq!(h.sum(), 0); + h.push(now - Duration::from_millis(1500), 22); + assert_eq!(h.sum(), 22); + h.purge_old(now); + assert_eq!(h.sum(), 0); + h.push(now - Duration::from_millis(700), 22); + h.push(now - Duration::from_millis(500), 33); + assert_eq!(h.sum(), 22 + 33); + h.purge_old(now); + assert_eq!(h.sum(), 22 + 33); + + h.purge_old(now + Duration::from_millis(400)); assert_eq!(h.sum(), 33); + h.purge_old(now + Duration::from_millis(600)); + assert_eq!(h.sum(), 0); } }