diff --git a/Cargo.toml b/Cargo.toml index 2334a4242..132f6d5d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,8 @@ prost = { version = "0.11", optional = true } bytes = { version = "1.0", optional = true } log = "0.4" parking_lot = "0.12" +prometheus = { version = "0.13", default-features = false, optional=true} +lazy_static = "1" [workspace] members = [ @@ -42,8 +44,9 @@ members = [ exclude = ["xtask"] [features] -default = ["protobuf-codec", "boringssl"] +default = ["protobuf-codec", "boringssl","prometheus"] _secure = [] +prometheus = ["dep:prometheus"] protobuf-codec = ["protobuf"] protobufv3-codec = ["protobufv3"] prost-codec = ["prost", "bytes"] diff --git a/grpc-sys/build.rs b/grpc-sys/build.rs index 832a3a384..dd99fecf1 100644 --- a/grpc-sys/build.rs +++ b/grpc-sys/build.rs @@ -300,7 +300,7 @@ fn build_grpc(cc: &mut cc::Build, library: &str) { fn figure_systemd_path(build_dir: &str) { let path = format!("{build_dir}/CMakeCache.txt"); - let f = BufReader::new(std::fs::File::open(&path).unwrap()); + let f = BufReader::new(std::fs::File::open(path).unwrap()); let mut libdir: Option = None; let mut libname: Option = None; for l in f.lines() { diff --git a/grpc-sys/src/lib.rs b/grpc-sys/src/lib.rs index dc0699c06..c39fcbf72 100644 --- a/grpc-sys/src/lib.rs +++ b/grpc-sys/src/lib.rs @@ -10,4 +10,5 @@ mod bindings { mod grpc_wrap; pub use bindings::*; +#[allow(unused_imports)] pub use grpc_wrap::*; diff --git a/src/env.rs b/src/env.rs index 2cc2216ab..bd6281453 100644 --- a/src/env.rs +++ b/src/env.rs @@ -1,21 +1,138 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. +#[cfg(feature = "prometheus")] +use prometheus::{local::LocalHistogram, IntCounter}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc; use std::sync::Arc; use std::thread::{Builder as ThreadBuilder, JoinHandle}; -use crate::grpc_sys; - use crate::cq::{CompletionQueue, CompletionQueueHandle, EventType, WorkQueue}; +use crate::grpc_sys; use crate::task::CallTag; -// event loop +#[cfg(feature = "prometheus")] +use { + crate::metrics::{ + GRPC_POOL_CQ_NEXT_DURATION, GRPC_POOL_EVENT_COUNT_VEC, GRPC_POOL_EXECUTE_DURATION, + GRPC_TASK_WAIT_DURATION, + }, + crate::task::resolve, + std::time::Instant, +}; + +#[allow(dead_code)] +const METRICS_FLUSH_INTERVAL: u64 = 10_000; // 10s + +#[cfg(feature = "prometheus")] +pub struct GRPCRunner { + cq_next_duration_his: LocalHistogram, + execute_duration_his: LocalHistogram, + wait_duration_his: LocalHistogram, + event_counter: [IntCounter; 6], + last_flush_time: Instant, +} + +#[cfg(feature = "prometheus")] +impl GRPCRunner { + pub fn new(name: &String) -> GRPCRunner { + let cq_next_duration_his = GRPC_POOL_CQ_NEXT_DURATION + .with_label_values(&[name]) + .local(); + let execute_duration_his = GRPC_POOL_EXECUTE_DURATION + .with_label_values(&[name]) + .local(); + let wait_duration_his = GRPC_TASK_WAIT_DURATION.with_label_values(&[name]).local(); + let event_counter = ["batch", "request", "unary", "abort", "action", "spawn"] + .map(|event| GRPC_POOL_EVENT_COUNT_VEC.with_label_values(&[name, event])); + GRPCRunner { + cq_next_duration_his, + execute_duration_his, + wait_duration_his, + event_counter, + last_flush_time: Instant::now(), + } + } + + // event loop + pub fn run(&mut self, tx: mpsc::Sender) { + let cq = Arc::new(CompletionQueueHandle::new()); + let worker_info = Arc::new(WorkQueue::new()); + let cq = CompletionQueue::new(cq, worker_info); + tx.send(cq.clone()).expect("send back completion queue"); + loop { + let now = Instant::now(); + let e = cq.next(); + self.cq_next_duration_his + .observe(now.elapsed().as_secs_f64()); + let now = Instant::now(); + match e.type_ { + EventType::GRPC_QUEUE_SHUTDOWN => break, + // timeout should not happen in theory. + EventType::GRPC_QUEUE_TIMEOUT => continue, + EventType::GRPC_OP_COMPLETE => {} + } + + let tag: Box = unsafe { Box::from_raw(e.tag as _) }; + self.resolve(tag, &cq, e.success != 0); + while let Some(work) = unsafe { cq.worker.pop_work() } { + work.finish(); + } + self.execute_duration_his + .observe(now.elapsed().as_secs_f64()); + self.maybe_flush(); + } + } + + fn maybe_flush(&mut self) { + let now = Instant::now(); + if now.saturating_duration_since(self.last_flush_time) + < std::time::Duration::from_millis(METRICS_FLUSH_INTERVAL) + { + return; + } + self.last_flush_time = now; + self.cq_next_duration_his.flush(); + self.execute_duration_his.flush(); + self.wait_duration_his.flush(); + } + + fn resolve(&self, tag: Box, cq: &CompletionQueue, success: bool) { + match *tag { + CallTag::Batch(prom) => { + self.event_counter[0].inc(); + prom.resolve(success) + } + CallTag::Request(cb) => { + self.event_counter[1].inc(); + cb.resolve(cq, success) + } + CallTag::UnaryRequest(cb) => { + self.event_counter[2].inc(); + cb.resolve(cq, success) + } + CallTag::Abort(_) => self.event_counter[3].inc(), + CallTag::Action(prom) => { + self.event_counter[4].inc(); + prom.resolve(success) + } + CallTag::Spawn(task) => { + self.event_counter[5].inc(); + self.wait_duration_his + .observe(task.reset_push_time().elapsed().as_secs_f64()); + resolve(task, success) + } + } + } +} + +#[cfg(not(feature = "prometheus"))] fn poll_queue(tx: mpsc::Sender) { let cq = Arc::new(CompletionQueueHandle::new()); let worker_info = Arc::new(WorkQueue::new()); let cq = CompletionQueue::new(cq, worker_info); tx.send(cq.clone()).expect("send back completion queue"); + loop { let e = cq.next(); match e.type_ { @@ -24,9 +141,7 @@ fn poll_queue(tx: mpsc::Sender) { EventType::GRPC_QUEUE_TIMEOUT => continue, EventType::GRPC_OP_COMPLETE => {} } - let tag: Box = unsafe { Box::from_raw(e.tag as _) }; - tag.resolve(&cq, e.success != 0); while let Some(work) = unsafe { cq.worker.pop_work() } { work.finish(); @@ -94,9 +209,13 @@ impl EnvBuilder { for i in 0..self.cq_count { let tx_i = tx.clone(); let mut builder = ThreadBuilder::new(); - if let Some(ref prefix) = self.name_prefix { - builder = builder.name(format!("{prefix}-{i}")); - } + let name = self + .name_prefix + .as_ref() + .map_or(format!("grpc-pool-{i}"), |prefix| format!("{prefix}-{i}")); + #[cfg(feature = "prometheus")] + let mut runner = GRPCRunner::new(&name); + builder = builder.name(name); let after_start = self.after_start.clone(); let before_stop = self.before_stop.clone(); let handle = builder @@ -104,6 +223,9 @@ impl EnvBuilder { if let Some(f) = after_start { f(); } + #[cfg(feature = "prometheus")] + runner.run(tx_i); + #[cfg(not(feature = "prometheus"))] poll_queue(tx_i); if let Some(f) = before_stop { f(); diff --git a/src/lib.rs b/src/lib.rs index c8cebcac5..c5c6a55f2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,6 +40,8 @@ mod env; mod error; mod log_util; mod metadata; +#[cfg(feature = "prometheus")] +mod metrics; mod quota; mod security; mod server; diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 000000000..ce4b85a66 --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,41 @@ +// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. + +//! Metrics of the grpc pool. +use lazy_static::lazy_static; +use prometheus::*; +lazy_static! { + /// Grpc wait duration of one task. + pub static ref GRPC_TASK_WAIT_DURATION: HistogramVec = register_histogram_vec!( + "grpc_task_wait_duration", + "Bucketed histogram of grpc wait time only for Spawn task", + &["name"], + exponential_buckets(1e-7, 2.0, 20).unwrap() // 100ns ~ 100ms + ) + .unwrap(); + + // Grpc pool io handle duration . + pub static ref GRPC_POOL_CQ_NEXT_DURATION: HistogramVec = register_histogram_vec!( + "grpc_pool_cp_next_duration", + "Bucketed histogram of grpc pool wait duration from the completion queue", + &["name"], + exponential_buckets(1e-7, 2.0, 20).unwrap() // 100ns ~ 100ms + ) + .unwrap(); + + // Grpc handle execute duration + pub static ref GRPC_POOL_EXECUTE_DURATION: HistogramVec = register_histogram_vec!( + "grpc_pool_execute_duration", + "Bucketed histogram of grpc pool execute duration for every time", + &["name"], + exponential_buckets(1e-7, 2.0, 30).unwrap() // 100ns ~ 100s + ) + .unwrap(); + + // Grpc pool event count task . + pub static ref GRPC_POOL_EVENT_COUNT_VEC: IntCounterVec = register_int_counter_vec!( + "grpc_pool_event_task_count", + "Total event task count in grpc pool", + &["name","event"] + ) + .unwrap(); +} diff --git a/src/task/executor.rs b/src/task/executor.rs index 3941005e7..0d2df0136 100644 --- a/src/task/executor.rs +++ b/src/task/executor.rs @@ -7,7 +7,7 @@ //! Apparently, to minimize context switch, it's better to bind the future to the //! same completion queue as its inner call. Hence method `Executor::spawn` is provided. -use std::cell::UnsafeCell; +use std::cell::{RefCell, UnsafeCell}; use std::future::Future; use std::pin::Pin; use std::sync::atomic::{AtomicU8, Ordering}; @@ -21,6 +21,7 @@ use crate::call::Call; use crate::cq::{CompletionQueue, WorkQueue}; use crate::error::{Error, Result}; use crate::grpc_sys::{self, grpc_call_error}; +use std::time::Instant; /// A handle to a `Spawn`. /// Inner future is expected to be polled in the same thread as cq. @@ -88,6 +89,7 @@ pub struct SpawnTask { state: AtomicU8, kicker: Kicker, queue: Arc, + push_time: RefCell, } /// `SpawnTask` access is guarded by `state` field, which guarantees Sync. @@ -102,9 +104,14 @@ impl SpawnTask { state: AtomicU8::new(IDLE), kicker, queue, + push_time: RefCell::new(Instant::now()), } } + pub fn reset_push_time(&self) -> Instant { + self.push_time.replace(Instant::now()) + } + /// Marks the state of this task to NOTIFIED. /// /// Returns true means the task was IDLE, needs to be scheduled. @@ -154,6 +161,7 @@ impl ArcWake for SpawnTask { // It can lead to deadlock if poll the future immediately. So we need to // defer the work instead. + task.reset_push_time(); if let Some(UnfinishedWork(w)) = task.queue.push_work(UnfinishedWork(task.clone())) { match task.kicker.kick(Box::new(CallTag::Spawn(w))) { // If the queue is shutdown, then the tag will be notified diff --git a/src/task/mod.rs b/src/task/mod.rs index d1827fca2..4bb8d5ac5 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -10,6 +10,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll, Waker}; +use crate::cq::CompletionQueue; use parking_lot::Mutex; use self::callback::{Abort, Request as RequestCallback, UnaryRequest as UnaryRequestCallback}; @@ -17,11 +18,13 @@ use self::executor::SpawnTask; use self::promise::{Action as ActionPromise, Batch as BatchPromise}; use crate::call::server::RequestContext; use crate::call::{BatchContext, Call}; -use crate::cq::CompletionQueue; use crate::error::{Error, Result}; use crate::server::RequestCallContext; pub(crate) use self::executor::{Executor, Kicker, UnfinishedWork}; + +#[cfg(feature = "prometheus")] +pub(crate) use self::executor::resolve; pub(crate) use self::promise::BatchResult; pub use self::promise::BatchType; @@ -170,7 +173,7 @@ impl CallTag { } } - /// Resolve the CallTag with given status. + #[allow(dead_code)] pub fn resolve(self, cq: &CompletionQueue, success: bool) { match self { CallTag::Batch(prom) => prom.resolve(success),