Skip to content

Commit

Permalink
add wait duration
Browse files Browse the repository at this point in the history
Signed-off-by: bufferflies <[email protected]>
  • Loading branch information
bufferflies committed Nov 22, 2023
1 parent a086c63 commit 9997865
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 6 deletions.
8 changes: 6 additions & 2 deletions src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use std::thread::{Builder as ThreadBuilder, JoinHandle};
use crate::{grpc_sys, metrics};

use crate::cq::{CompletionQueue, CompletionQueueHandle, EventType, WorkQueue};
use crate::metrics::{GRPC_POOL_EXECUTE_DURATION, GRPC_POOL_IO_HANDLE_DURATION};
use crate::metrics::{
GRPC_POOL_EXECUTE_DURATION, GRPC_POOL_IO_HANDLE_DURATION, GRPC_TASK_WAIT_DURATION,
};
use crate::task::CallTag;
use std::time::Instant;

Expand All @@ -26,6 +28,8 @@ fn poll_queue(tx: mpsc::Sender<CompletionQueue>) {
let grpc_pool_execute_duration = GRPC_POOL_EXECUTE_DURATION.with_label_values(&[&name]);
let grpc_event_counter = ["batch", "request", "unary", "stream", "finish"]
.map(|event| metrics::GRPC_POOL_EVENT_COUNT_VEC.with_label_values(&[&name, event]));
let grpc_task_wait_duration = GRPC_TASK_WAIT_DURATION.with_label_values(&[&name]);

loop {
let now = Instant::now();
let e = cq.next();
Expand All @@ -39,7 +43,7 @@ fn poll_queue(tx: mpsc::Sender<CompletionQueue>) {
}

let tag: Box<CallTag> = unsafe { Box::from_raw(e.tag as _) };
tag.report(&grpc_event_counter);
tag.report(&grpc_event_counter, &grpc_task_wait_duration);
tag.resolve(&cq, e.success != 0);
while let Some(work) = unsafe { cq.worker.pop_work() } {
work.finish();
Expand Down
9 changes: 9 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ use lazy_static::lazy_static;
use prometheus::*;

lazy_static! {
/// Grpc wait duration of one task.
pub static ref GRPC_TASK_WAIT_DURATION: HistogramVec = register_histogram_vec!(
"grpc_task_wait_duration",
"Bucketed histogram of grpc wait time only for Spawn task",
&["name"],
exponential_buckets(1e-7, 2.0, 20).unwrap() // 100ns ~ 100ms
)
.unwrap();

// Grpc pool io handle duration .
pub static ref GRPC_POOL_IO_HANDLE_DURATION: HistogramVec = register_histogram_vec!(
"grpc_pool_io_handle_duration",
Expand Down
10 changes: 9 additions & 1 deletion src/task/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! Apparently, to minimize context switch, it's better to bind the future to the
//! same completion queue as its inner call. Hence method `Executor::spawn` is provided.
use std::cell::UnsafeCell;
use std::cell::{RefCell, UnsafeCell};
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicU8, Ordering};
Expand All @@ -21,6 +21,7 @@ use crate::call::Call;
use crate::cq::{CompletionQueue, WorkQueue};
use crate::error::{Error, Result};
use crate::grpc_sys::{self, grpc_call_error};
use std::time::Instant;

/// A handle to a `Spawn`.
/// Inner future is expected to be polled in the same thread as cq.
Expand Down Expand Up @@ -88,6 +89,7 @@ pub struct SpawnTask {
state: AtomicU8,
kicker: Kicker,
queue: Arc<WorkQueue>,
push_time: RefCell<Instant>,
}

/// `SpawnTask` access is guarded by `state` field, which guarantees Sync.
Expand All @@ -102,9 +104,14 @@ impl SpawnTask {
state: AtomicU8::new(IDLE),
kicker,
queue,
push_time: RefCell::new(Instant::now()),
}
}

pub fn reset_push_time(&self) -> Instant {
self.push_time.replace(Instant::now())
}

/// Marks the state of this task to NOTIFIED.
///
/// Returns true means the task was IDLE, needs to be scheduled.
Expand Down Expand Up @@ -154,6 +161,7 @@ impl ArcWake for SpawnTask {

// It can lead to deadlock if poll the future immediately. So we need to
// defer the work instead.
task.reset_push_time();
if let Some(UnfinishedWork(w)) = task.queue.push_work(UnfinishedWork(task.clone())) {
match task.kicker.kick(Box::new(CallTag::Spawn(w))) {
// If the queue is shutdown, then the tag will be notified
Expand Down
10 changes: 7 additions & 3 deletions src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::task::{Context, Poll, Waker};

use parking_lot::Mutex;
use prometheus::core::{AtomicU64, GenericCounter};
use prometheus::Histogram;

use self::callback::{Abort, Request as RequestCallback, UnaryRequest as UnaryRequestCallback};
use self::executor::SpawnTask;
Expand Down Expand Up @@ -183,14 +184,17 @@ impl CallTag {
}
}

pub fn report(&self, counter: &[GenericCounter<AtomicU64>; 5]) {
match *self {
pub fn report(&self, counter: &[GenericCounter<AtomicU64>; 5], wait_his: &Histogram) {
match self {
CallTag::Batch(_) => counter[0].inc(),
CallTag::Request(_) => counter[1].inc(),
CallTag::UnaryRequest(_) => counter[2].inc(),
CallTag::Abort(_) => counter[3].inc(),
CallTag::Action(_) => counter[4].inc(),
CallTag::Spawn(_) => counter[5].inc(),
CallTag::Spawn(task) => {
counter[5].inc();
wait_his.observe(task.reset_push_time().elapsed().as_secs_f64());
}
}
}
}
Expand Down

0 comments on commit 9997865

Please sign in to comment.