Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: add running metrics for grpc #639

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ prost = { version = "0.11", optional = true }
bytes = { version = "1.0", optional = true }
log = "0.4"
parking_lot = "0.12"
prometheus = { version = "0.13", default-features = false, optional=true}
lazy_static = "1"

[workspace]
members = [
Expand All @@ -42,8 +44,9 @@ members = [
exclude = ["xtask"]

[features]
default = ["protobuf-codec", "boringssl"]
default = ["protobuf-codec", "boringssl","prometheus"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should not enable the "prometheus" by default.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It maybe bring the resouce usage.

_secure = []
prometheus = ["dep:prometheus"]
protobuf-codec = ["protobuf"]
protobufv3-codec = ["protobufv3"]
prost-codec = ["prost", "bytes"]
Expand Down
2 changes: 1 addition & 1 deletion grpc-sys/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ fn build_grpc(cc: &mut cc::Build, library: &str) {

fn figure_systemd_path(build_dir: &str) {
let path = format!("{build_dir}/CMakeCache.txt");
let f = BufReader::new(std::fs::File::open(&path).unwrap());
let f = BufReader::new(std::fs::File::open(path).unwrap());
let mut libdir: Option<String> = None;
let mut libname: Option<String> = None;
for l in f.lines() {
Expand Down
1 change: 1 addition & 0 deletions grpc-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ mod bindings {
mod grpc_wrap;

pub use bindings::*;
#[allow(unused_imports)]
pub use grpc_wrap::*;
138 changes: 130 additions & 8 deletions src/env.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,138 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

#[cfg(feature = "prometheus")]
use prometheus::{local::LocalHistogram, IntCounter};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc;
use std::sync::Arc;
use std::thread::{Builder as ThreadBuilder, JoinHandle};

use crate::grpc_sys;

use crate::cq::{CompletionQueue, CompletionQueueHandle, EventType, WorkQueue};
use crate::grpc_sys;
use crate::task::CallTag;

// event loop
#[cfg(feature = "prometheus")]
use {
crate::metrics::{
GRPC_POOL_CQ_NEXT_DURATION, GRPC_POOL_EVENT_COUNT_VEC, GRPC_POOL_EXECUTE_DURATION,
GRPC_TASK_WAIT_DURATION,
},
crate::task::resolve,
std::time::Instant,
};

#[allow(dead_code)]
const METRICS_FLUSH_INTERVAL: u64 = 10_000; // 10s

#[cfg(feature = "prometheus")]
pub struct GRPCRunner {
cq_next_duration_his: LocalHistogram,
execute_duration_his: LocalHistogram,
wait_duration_his: LocalHistogram,
event_counter: [IntCounter; 6],
last_flush_time: Instant,
}

#[cfg(feature = "prometheus")]
impl GRPCRunner {
pub fn new(name: &String) -> GRPCRunner {
let cq_next_duration_his = GRPC_POOL_CQ_NEXT_DURATION
.with_label_values(&[name])
.local();
let execute_duration_his = GRPC_POOL_EXECUTE_DURATION
.with_label_values(&[name])
.local();
let wait_duration_his = GRPC_TASK_WAIT_DURATION.with_label_values(&[name]).local();
let event_counter = ["batch", "request", "unary", "abort", "action", "spawn"]
.map(|event| GRPC_POOL_EVENT_COUNT_VEC.with_label_values(&[name, event]));
GRPCRunner {
cq_next_duration_his,
execute_duration_his,
wait_duration_his,
event_counter,
last_flush_time: Instant::now(),
}
}

// event loop
pub fn run(&mut self, tx: mpsc::Sender<CompletionQueue>) {
let cq = Arc::new(CompletionQueueHandle::new());
let worker_info = Arc::new(WorkQueue::new());
let cq = CompletionQueue::new(cq, worker_info);
tx.send(cq.clone()).expect("send back completion queue");
loop {
let now = Instant::now();
let e = cq.next();
self.cq_next_duration_his
.observe(now.elapsed().as_secs_f64());
let now = Instant::now();
match e.type_ {
EventType::GRPC_QUEUE_SHUTDOWN => break,
// timeout should not happen in theory.
EventType::GRPC_QUEUE_TIMEOUT => continue,
EventType::GRPC_OP_COMPLETE => {}
}

let tag: Box<CallTag> = unsafe { Box::from_raw(e.tag as _) };
self.resolve(tag, &cq, e.success != 0);
while let Some(work) = unsafe { cq.worker.pop_work() } {
work.finish();
}
self.execute_duration_his
.observe(now.elapsed().as_secs_f64());
self.maybe_flush();
}
}

fn maybe_flush(&mut self) {
let now = Instant::now();
if now.saturating_duration_since(self.last_flush_time)
< std::time::Duration::from_millis(METRICS_FLUSH_INTERVAL)
{
return;
}
self.last_flush_time = now;
self.cq_next_duration_his.flush();
self.execute_duration_his.flush();
self.wait_duration_his.flush();
}

fn resolve(&self, tag: Box<CallTag>, cq: &CompletionQueue, success: bool) {
match *tag {
CallTag::Batch(prom) => {
self.event_counter[0].inc();
prom.resolve(success)
}
CallTag::Request(cb) => {
self.event_counter[1].inc();
cb.resolve(cq, success)
}
CallTag::UnaryRequest(cb) => {
self.event_counter[2].inc();
cb.resolve(cq, success)
}
CallTag::Abort(_) => self.event_counter[3].inc(),
CallTag::Action(prom) => {
self.event_counter[4].inc();
prom.resolve(success)
}
CallTag::Spawn(task) => {
self.event_counter[5].inc();
self.wait_duration_his
.observe(task.reset_push_time().elapsed().as_secs_f64());
resolve(task, success)
}
}
}
}

#[cfg(not(feature = "prometheus"))]
fn poll_queue(tx: mpsc::Sender<CompletionQueue>) {
let cq = Arc::new(CompletionQueueHandle::new());
let worker_info = Arc::new(WorkQueue::new());
let cq = CompletionQueue::new(cq, worker_info);
tx.send(cq.clone()).expect("send back completion queue");

loop {
let e = cq.next();
match e.type_ {
Expand All @@ -24,9 +141,7 @@ fn poll_queue(tx: mpsc::Sender<CompletionQueue>) {
EventType::GRPC_QUEUE_TIMEOUT => continue,
EventType::GRPC_OP_COMPLETE => {}
}

let tag: Box<CallTag> = unsafe { Box::from_raw(e.tag as _) };

tag.resolve(&cq, e.success != 0);
while let Some(work) = unsafe { cq.worker.pop_work() } {
work.finish();
Expand Down Expand Up @@ -94,16 +209,23 @@ impl EnvBuilder {
for i in 0..self.cq_count {
let tx_i = tx.clone();
let mut builder = ThreadBuilder::new();
if let Some(ref prefix) = self.name_prefix {
builder = builder.name(format!("{prefix}-{i}"));
}
let name = self
.name_prefix
.as_ref()
.map_or(format!("grpc-pool-{i}"), |prefix| format!("{prefix}-{i}"));
#[cfg(feature = "prometheus")]
let mut runner = GRPCRunner::new(&name);
builder = builder.name(name);
let after_start = self.after_start.clone();
let before_stop = self.before_stop.clone();
let handle = builder
.spawn(move || {
if let Some(f) = after_start {
f();
}
#[cfg(feature = "prometheus")]
runner.run(tx_i);
#[cfg(not(feature = "prometheus"))]
poll_queue(tx_i);
if let Some(f) = before_stop {
f();
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ mod env;
mod error;
mod log_util;
mod metadata;
#[cfg(feature = "prometheus")]
mod metrics;
mod quota;
mod security;
mod server;
Expand Down
41 changes: 41 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0.

//! Metrics of the grpc pool.
use lazy_static::lazy_static;
use prometheus::*;
lazy_static! {
/// Grpc wait duration of one task.
pub static ref GRPC_TASK_WAIT_DURATION: HistogramVec = register_histogram_vec!(
"grpc_task_wait_duration",
"Bucketed histogram of grpc wait time only for Spawn task",
&["name"],
exponential_buckets(1e-7, 2.0, 20).unwrap() // 100ns ~ 100ms
)
.unwrap();

// Grpc pool io handle duration .
pub static ref GRPC_POOL_CQ_NEXT_DURATION: HistogramVec = register_histogram_vec!(
"grpc_pool_cp_next_duration",
"Bucketed histogram of grpc pool wait duration from the completion queue",
&["name"],
exponential_buckets(1e-7, 2.0, 20).unwrap() // 100ns ~ 100ms
)
.unwrap();

// Grpc handle execute duration
pub static ref GRPC_POOL_EXECUTE_DURATION: HistogramVec = register_histogram_vec!(
"grpc_pool_execute_duration",
"Bucketed histogram of grpc pool execute duration for every time",
&["name"],
exponential_buckets(1e-7, 2.0, 30).unwrap() // 100ns ~ 100s
)
.unwrap();

// Grpc pool event count task .
pub static ref GRPC_POOL_EVENT_COUNT_VEC: IntCounterVec = register_int_counter_vec!(
"grpc_pool_event_task_count",
"Total event task count in grpc pool",
&["name","event"]
)
.unwrap();
}
10 changes: 9 additions & 1 deletion src/task/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! Apparently, to minimize context switch, it's better to bind the future to the
//! same completion queue as its inner call. Hence method `Executor::spawn` is provided.

use std::cell::UnsafeCell;
use std::cell::{RefCell, UnsafeCell};
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicU8, Ordering};
Expand All @@ -21,6 +21,7 @@ use crate::call::Call;
use crate::cq::{CompletionQueue, WorkQueue};
use crate::error::{Error, Result};
use crate::grpc_sys::{self, grpc_call_error};
use std::time::Instant;

/// A handle to a `Spawn`.
/// Inner future is expected to be polled in the same thread as cq.
Expand Down Expand Up @@ -88,6 +89,7 @@ pub struct SpawnTask {
state: AtomicU8,
kicker: Kicker,
queue: Arc<WorkQueue>,
push_time: RefCell<Instant>,
}

/// `SpawnTask` access is guarded by `state` field, which guarantees Sync.
Expand All @@ -102,9 +104,14 @@ impl SpawnTask {
state: AtomicU8::new(IDLE),
kicker,
queue,
push_time: RefCell::new(Instant::now()),
}
}

pub fn reset_push_time(&self) -> Instant {
self.push_time.replace(Instant::now())
}

/// Marks the state of this task to NOTIFIED.
///
/// Returns true means the task was IDLE, needs to be scheduled.
Expand Down Expand Up @@ -154,6 +161,7 @@ impl ArcWake for SpawnTask {

// It can lead to deadlock if poll the future immediately. So we need to
// defer the work instead.
task.reset_push_time();
if let Some(UnfinishedWork(w)) = task.queue.push_work(UnfinishedWork(task.clone())) {
match task.kicker.kick(Box::new(CallTag::Spawn(w))) {
// If the queue is shutdown, then the tag will be notified
Expand Down
7 changes: 5 additions & 2 deletions src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,21 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};

use crate::cq::CompletionQueue;
use parking_lot::Mutex;

use self::callback::{Abort, Request as RequestCallback, UnaryRequest as UnaryRequestCallback};
use self::executor::SpawnTask;
use self::promise::{Action as ActionPromise, Batch as BatchPromise};
use crate::call::server::RequestContext;
use crate::call::{BatchContext, Call};
use crate::cq::CompletionQueue;
use crate::error::{Error, Result};
use crate::server::RequestCallContext;

pub(crate) use self::executor::{Executor, Kicker, UnfinishedWork};

#[cfg(feature = "prometheus")]
pub(crate) use self::executor::resolve;
pub(crate) use self::promise::BatchResult;
pub use self::promise::BatchType;

Expand Down Expand Up @@ -170,7 +173,7 @@ impl CallTag {
}
}

/// Resolve the CallTag with given status.
#[allow(dead_code)]
pub fn resolve(self, cq: &CompletionQueue, success: bool) {
match self {
CallTag::Batch(prom) => prom.resolve(success),
Expand Down
Loading