Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: add running metrics for grpc #639

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ prost = { version = "0.11", optional = true }
bytes = { version = "1.0", optional = true }
log = "0.4"
parking_lot = "0.12"
prometheus = { version = "0.13", default-features = false }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
prometheus = { version = "0.13", default-features = false }
prometheus = { version = "0.13", default-features = false, optional = true }

lazy_static = "1"

[workspace]
members = [
Expand Down
22 changes: 20 additions & 2 deletions src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,36 @@ use std::sync::mpsc;
use std::sync::Arc;
use std::thread::{Builder as ThreadBuilder, JoinHandle};

use crate::grpc_sys;
use crate::{grpc_sys, metrics};

use crate::cq::{CompletionQueue, CompletionQueueHandle, EventType, WorkQueue};
use crate::metrics::{
GRPC_POOL_EXECUTE_DURATION, GRPC_POOL_IO_HANDLE_DURATION, GRPC_TASK_WAIT_DURATION,
};
use crate::task::CallTag;
use std::time::Instant;

// event loop
fn poll_queue(tx: mpsc::Sender<CompletionQueue>) {
let cq = Arc::new(CompletionQueueHandle::new());
let worker_info = Arc::new(WorkQueue::new());
let cq = CompletionQueue::new(cq, worker_info);
tx.send(cq.clone()).expect("send back completion queue");
let name = std::thread::current()
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
.name()
.unwrap_or("unknown")
.to_owned();
let grpc_pool_io_handle_duration = GRPC_POOL_IO_HANDLE_DURATION.with_label_values(&[&name]);
let grpc_pool_execute_duration = GRPC_POOL_EXECUTE_DURATION.with_label_values(&[&name]);
let grpc_event_counter = ["batch", "request", "unary", "abort", "action", "spawn"]
.map(|event| metrics::GRPC_POOL_EVENT_COUNT_VEC.with_label_values(&[&name, event]));
let grpc_task_wait_duration = GRPC_TASK_WAIT_DURATION.with_label_values(&[&name]);

loop {
let now = Instant::now();
let e = cq.next();
grpc_pool_io_handle_duration.observe(now.elapsed().as_secs_f64());
let now = Instant::now();
match e.type_ {
EventType::GRPC_QUEUE_SHUTDOWN => break,
// timeout should not happen in theory.
Expand All @@ -26,11 +43,12 @@ fn poll_queue(tx: mpsc::Sender<CompletionQueue>) {
}

let tag: Box<CallTag> = unsafe { Box::from_raw(e.tag as _) };

tag.report(&grpc_event_counter, &grpc_task_wait_duration);
tag.resolve(&cq, e.success != 0);
while let Some(work) = unsafe { cq.worker.pop_work() } {
work.finish();
}
grpc_pool_execute_duration.observe(now.elapsed().as_secs_f64());
}
}

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ mod env;
mod error;
mod log_util;
mod metadata;
mod metrics;
mod quota;
mod security;
mod server;
Expand Down
43 changes: 43 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0.

//! Metrics of the grpc pool.

use lazy_static::lazy_static;
use prometheus::*;

lazy_static! {
/// Grpc wait duration of one task.
pub static ref GRPC_TASK_WAIT_DURATION: HistogramVec = register_histogram_vec!(
"grpc_task_wait_duration",
"Bucketed histogram of grpc wait time only for Spawn task",
&["name"],
exponential_buckets(1e-7, 2.0, 20).unwrap() // 100ns ~ 100ms
)
.unwrap();

// Grpc pool io handle duration .
pub static ref GRPC_POOL_IO_HANDLE_DURATION: HistogramVec = register_histogram_vec!(
"grpc_pool_io_handle_duration",
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
"Bucketed histogram of grpc pool wait duration from the completion queue",
&["name"],
exponential_buckets(1e-7, 2.0, 20).unwrap() // 100ns ~ 100ms
)
.unwrap();

// Grpc handle execute duration
pub static ref GRPC_POOL_EXECUTE_DURATION: HistogramVec = register_histogram_vec!(
"grpc_pool_execute_duration",
"Bucketed histogram of grpc pool execute duration for every time",
&["name"],
exponential_buckets(1e-7, 2.0, 20).unwrap() // 100ns ~ 100ms
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100ms upper bound is too small here, maybe set the range to 10us~10s is better here.

)
.unwrap();

// Grpc pool event count task .
pub static ref GRPC_POOL_EVENT_COUNT_VEC: IntCounterVec = register_int_counter_vec!(
"grpc_pool_event_task_count",
"Total event task count in grpc pool",
&["name","event"]
)
.unwrap();
}
10 changes: 9 additions & 1 deletion src/task/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! Apparently, to minimize context switch, it's better to bind the future to the
//! same completion queue as its inner call. Hence method `Executor::spawn` is provided.

use std::cell::UnsafeCell;
use std::cell::{RefCell, UnsafeCell};
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicU8, Ordering};
Expand All @@ -21,6 +21,7 @@ use crate::call::Call;
use crate::cq::{CompletionQueue, WorkQueue};
use crate::error::{Error, Result};
use crate::grpc_sys::{self, grpc_call_error};
use std::time::Instant;

/// A handle to a `Spawn`.
/// Inner future is expected to be polled in the same thread as cq.
Expand Down Expand Up @@ -88,6 +89,7 @@ pub struct SpawnTask {
state: AtomicU8,
kicker: Kicker,
queue: Arc<WorkQueue>,
push_time: RefCell<Instant>,
}

/// `SpawnTask` access is guarded by `state` field, which guarantees Sync.
Expand All @@ -102,9 +104,14 @@ impl SpawnTask {
state: AtomicU8::new(IDLE),
kicker,
queue,
push_time: RefCell::new(Instant::now()),
}
}

pub fn reset_push_time(&self) -> Instant {
self.push_time.replace(Instant::now())
}

/// Marks the state of this task to NOTIFIED.
///
/// Returns true means the task was IDLE, needs to be scheduled.
Expand Down Expand Up @@ -154,6 +161,7 @@ impl ArcWake for SpawnTask {

// It can lead to deadlock if poll the future immediately. So we need to
// defer the work instead.
task.reset_push_time();
if let Some(UnfinishedWork(w)) = task.queue.push_work(UnfinishedWork(task.clone())) {
match task.kicker.kick(Box::new(CallTag::Spawn(w))) {
// If the queue is shutdown, then the tag will be notified
Expand Down
16 changes: 16 additions & 0 deletions src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use std::sync::Arc;
use std::task::{Context, Poll, Waker};

use parking_lot::Mutex;
use prometheus::core::{AtomicU64, GenericCounter};
use prometheus::Histogram;

use self::callback::{Abort, Request as RequestCallback, UnaryRequest as UnaryRequestCallback};
use self::executor::SpawnTask;
Expand Down Expand Up @@ -181,6 +183,20 @@ impl CallTag {
CallTag::Spawn(notify) => self::executor::resolve(notify, success),
}
}

pub fn report(&self, counter: &[GenericCounter<AtomicU64>; 6], wait_his: &Histogram) {
match self {
CallTag::Batch(_) => counter[0].inc(),
CallTag::Request(_) => counter[1].inc(),
CallTag::UnaryRequest(_) => counter[2].inc(),
CallTag::Abort(_) => counter[3].inc(),
CallTag::Action(_) => counter[4].inc(),
CallTag::Spawn(task) => {
counter[5].inc();
wait_his.observe(task.reset_push_time().elapsed().as_secs_f64());
}
}
}
}

impl Debug for CallTag {
Expand Down
Loading