diff --git a/bb8/src/api.rs b/bb8/src/api.rs index efbd869..004790d 100644 --- a/bb8/src/api.rs +++ b/bb8/src/api.rs @@ -85,6 +85,20 @@ pub struct State { pub connections: u32, /// The number of idle connections. pub idle_connections: u32, + /// Statistics about the historical usage of the pool. + pub statistics: Statistics, +} + +/// Statistics about the historical usage of the `Pool`. +#[derive(Debug, Default)] +#[non_exhaustive] +pub struct Statistics { + /// Total gets performed that did not have to wait for a connection. + pub get_direct: u64, + /// Total gets performed that had to wait for a connection available. + pub get_waited: u64, + /// Total gets performed that timed out while waiting for a connection. + pub get_timed_out: u64, } /// A builder for a connection pool. diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs index 7ca6e20..91b43b0 100644 --- a/bb8/src/inner.rs +++ b/bb8/src/inner.rs @@ -10,7 +10,7 @@ use tokio::spawn; use tokio::time::{interval_at, sleep, timeout, Interval}; use crate::api::{Builder, ConnectionState, ManageConnection, PooledConnection, RunError, State}; -use crate::internals::{Approval, ApprovalIter, Conn, SharedPool}; +use crate::internals::{Approval, ApprovalIter, Conn, SharedPool, StatsKind}; pub(crate) struct PoolInner where @@ -85,6 +85,8 @@ where } pub(crate) async fn get(&self) -> Result, RunError> { + let mut kind = StatsKind::Direct; + let future = async { loop { let (conn, approvals) = self.inner.pop(); @@ -96,6 +98,7 @@ where let mut conn = match conn { Some(conn) => PooledConnection::new(self, conn), None => { + kind = StatsKind::Waited; self.inner.notify.notified().await; continue; } @@ -116,10 +119,16 @@ where } }; - match timeout(self.inner.statics.connection_timeout, future).await { + let result = match timeout(self.inner.statics.connection_timeout, future).await { Ok(result) => result, - _ => Err(RunError::TimedOut), - } + _ => { + kind = StatsKind::TimedOut; + Err(RunError::TimedOut) + } + }; + + self.inner.statistics.record(kind); + result } pub(crate) async fn connect(&self) -> Result { @@ -148,7 +157,10 @@ where /// Returns information about the current state of the pool. pub(crate) fn state(&self) -> State { - (&*self.inner.internals.lock()).into() + self.inner + .internals + .lock() + .state((&self.inner.statistics).into()) } // Outside of Pool to avoid borrow splitting issues on self diff --git a/bb8/src/internals.rs b/bb8/src/internals.rs index 59dfe63..23b45f2 100644 --- a/bb8/src/internals.rs +++ b/bb8/src/internals.rs @@ -1,12 +1,13 @@ use std::cmp::min; +use std::collections::VecDeque; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Instant; -use crate::{api::QueueStrategy, lock::Mutex}; use tokio::sync::Notify; -use crate::api::{Builder, ManageConnection, State}; -use std::collections::VecDeque; +use crate::api::{Builder, ManageConnection, QueueStrategy, State, Statistics}; +use crate::lock::Mutex; /// The guts of a `Pool`. #[allow(missing_debug_implementations)] @@ -18,6 +19,7 @@ where pub(crate) manager: M, pub(crate) internals: Mutex>, pub(crate) notify: Arc, + pub(crate) statistics: AtomicStatistics, } impl SharedPool @@ -30,6 +32,7 @@ where manager, internals: Mutex::new(PoolInternals::default()), notify: Arc::new(Notify::new()), + statistics: AtomicStatistics::default(), } } @@ -153,14 +156,12 @@ where self.dropped((before - self.conns.len()) as u32, config) } -} -#[allow(clippy::from_over_into)] // Keep this more private with the internal type -impl Into for &PoolInternals { - fn into(self) -> State { + pub(crate) fn state(&self, statistics: Statistics) -> State { State { connections: self.num_conns, idle_connections: self.conns.len() as u32, + statistics, } } } @@ -248,3 +249,36 @@ impl From> for IdleConn { } } } + +#[derive(Default)] +pub(crate) struct AtomicStatistics { + pub(crate) get_direct: AtomicU64, + pub(crate) get_waited: AtomicU64, + pub(crate) get_timed_out: AtomicU64, +} + +impl From<&AtomicStatistics> for Statistics { + fn from(item: &AtomicStatistics) -> Self { + Self { + get_direct: item.get_direct.load(Ordering::SeqCst), + get_waited: item.get_waited.load(Ordering::SeqCst), + get_timed_out: item.get_timed_out.load(Ordering::SeqCst), + } + } +} + +impl AtomicStatistics { + pub(crate) fn record(&self, kind: StatsKind) { + match kind { + StatsKind::Direct => self.get_direct.fetch_add(1, Ordering::SeqCst), + StatsKind::Waited => self.get_waited.fetch_add(1, Ordering::SeqCst), + StatsKind::TimedOut => self.get_timed_out.fetch_add(1, Ordering::SeqCst), + }; + } +} + +pub(crate) enum StatsKind { + Direct, + Waited, + TimedOut, +} diff --git a/bb8/tests/test.rs b/bb8/tests/test.rs index 95b5182..ee32cad 100644 --- a/bb8/tests/test.rs +++ b/bb8/tests/test.rs @@ -885,3 +885,43 @@ async fn test_broken_connections_dont_starve_pool() { future.await.unwrap(); } } + +#[tokio::test] +async fn test_state_get_contention() { + let pool = Pool::builder() + .max_size(1) + .min_idle(1) + .build(OkManager::::new()) + .await + .unwrap(); + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + let clone = pool.clone(); + tokio::spawn(async move { + let conn = clone.get().await.unwrap(); + tx1.send(()).unwrap(); + let _ = rx2 + .then(|r| match r { + Ok(v) => ok((v, conn)), + Err(_) => err((Error, conn)), + }) + .await; + }); + + // Get the first connection. + rx1.await.unwrap(); + + // Now try to get a new connection without waiting. + let f = pool.get(); + + // Release the first connection. + tx2.send(()).unwrap(); + + // Wait for the second attempt to get a connection. + f.await.unwrap(); + + let statistics = pool.state().statistics; + assert_eq!(statistics.get_direct, 1); + assert_eq!(statistics.get_waited, 1); +}