Skip to content

Commit

Permalink
try(histogram): Local calculate for p99/pxx
Browse files Browse the repository at this point in the history
  • Loading branch information
zuston committed Dec 24, 2024
1 parent 21d6af8 commit 74b3719
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 5 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ core_affinity = "0.8.1"
chrono = "0.4.38"
strum = "0.20.0"
strum_macros = "0.20.0"
hdrhistogram = "7.5.4"

[dependencies.logforth]
version = "0.19.1"
Expand Down
12 changes: 10 additions & 2 deletions src/grpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ use crate::grpc::protobuf::uniffle::{
ShuffleUnregisterByAppIdResponse, ShuffleUnregisterRequest, ShuffleUnregisterResponse,
};
use crate::metric::{
GRPC_BUFFER_REQUIRE_PROCESS_TIME, GRPC_GET_LOCALFILE_DATA_PROCESS_TIME,
GRPC_GET_LOCALFILE_DATA_TRANSPORT_TIME, GRPC_GET_MEMORY_DATA_FREEZE_PROCESS_TIME,
GRPC_BUFFER_REQUIRE_PROCESS_TIME, GRPC_GET_LOCALFILE_DATA_LATENCY,
GRPC_GET_LOCALFILE_DATA_PROCESS_TIME, GRPC_GET_LOCALFILE_DATA_TRANSPORT_TIME,
GRPC_GET_LOCALFILE_INDEX_LATENCY, GRPC_GET_MEMORY_DATA_FREEZE_PROCESS_TIME,
GRPC_GET_MEMORY_DATA_PROCESS_TIME, GRPC_GET_MEMORY_DATA_TRANSPORT_TIME,
GRPC_SEND_DATA_PROCESS_TIME, GRPC_SEND_DATA_TRANSPORT_TIME,
};
Expand All @@ -51,6 +52,7 @@ use fastrace::future::FutureExt;
use fastrace::trace;
use log::{debug, error, info, warn};
use std::collections::HashMap;
use tokio::time::Instant;
use tonic::{Request, Response, Status};

/// Use the maximum value for HTTP/2 connection window size to avoid deadlock among multiplexed
Expand Down Expand Up @@ -315,6 +317,7 @@ impl ShuffleServer for DefaultShuffleServer {
&self,
request: Request<GetLocalShuffleIndexRequest>,
) -> Result<Response<GetLocalShuffleIndexResponse>, Status> {
let start = tokio::time::Instant::now();
let req = request.into_inner();
let app_id = req.app_id;
let shuffle_id: i32 = req.shuffle_id;
Expand Down Expand Up @@ -363,6 +366,8 @@ impl ShuffleServer for DefaultShuffleServer {
}));
}

GRPC_GET_LOCALFILE_INDEX_LATENCY.record(start.elapsed().as_millis() as u64);

match data_index_wrapper.unwrap() {
ResponseDataIndex::Local(data_index) => {
Ok(Response::new(GetLocalShuffleIndexResponse {
Expand All @@ -380,6 +385,7 @@ impl ShuffleServer for DefaultShuffleServer {
&self,
request: Request<GetLocalShuffleDataRequest>,
) -> Result<Response<GetLocalShuffleDataResponse>, Status> {
let start = tokio::time::Instant::now();
let timer = GRPC_GET_LOCALFILE_DATA_PROCESS_TIME.start_timer();
let req = request.into_inner();
let app_id = req.app_id;
Expand Down Expand Up @@ -435,6 +441,8 @@ impl ShuffleServer for DefaultShuffleServer {

timer.observe_duration();

GRPC_GET_LOCALFILE_DATA_LATENCY.record(start.elapsed().as_millis() as u64);

Ok(Response::new(GetLocalShuffleDataResponse {
data: data_fetched_result.unwrap().from_local(),
status: StatusCode::SUCCESS.into(),
Expand Down
55 changes: 55 additions & 0 deletions src/histogram/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use crate::metric::REGISTRY;
use log::{error, info};
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use prometheus::{register_int_gauge_vec, IntGaugeVec};

pub struct Histogram {
name: String,
recorder: Mutex<hdrhistogram::Histogram<u64>>,
gauge: IntGaugeVec,
}

impl Histogram {
pub fn new(name: &str) -> Histogram {
info!("Registering metrics for {}", name);
let gauge: IntGaugeVec = { register_int_gauge_vec!(name, name, &["quantile"]).unwrap() };

REGISTRY.register(Box::new(gauge.clone())).expect("");

Self {
name: name.to_owned(),
recorder: Mutex::new(hdrhistogram::Histogram::new(4).unwrap()),
gauge,
}
}

pub fn record(&self, value: u64) {
let mut recorder = self.recorder.lock();
if let Err(e) = recorder.record(value) {
error!("failed to record `{}`: {}", self.name, e);
}
}

fn clear(&self) {
let mut recorder = self.recorder.lock();
recorder.clear()
}

pub fn observe(&self) {
let mut recorder = self.recorder.lock();
let p99 = recorder.value_at_quantile(0.99);
let p95 = recorder.value_at_quantile(0.95);
let p90 = recorder.value_at_quantile(0.90);
let p80 = recorder.value_at_quantile(0.80);
let p50 = recorder.value_at_quantile(0.50);

self.gauge.with_label_values(&["p99"]).set(p99 as i64);
self.gauge.with_label_values(&["p95"]).set(p95 as i64);
self.gauge.with_label_values(&["p90"]).set(p90 as i64);
self.gauge.with_label_values(&["p80"]).set(p80 as i64);
self.gauge.with_label_values(&["p50"]).set(p50 as i64);

recorder.clear();
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ mod reject;
pub mod semaphore_with_index;
pub mod storage;

pub mod histogram;

use crate::app::{AppManager, AppManagerRef};
use crate::common::init_global_variable;
use crate::grpc::protobuf::uniffle::shuffle_server_client::ShuffleServerClient;
Expand Down
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ mod log_service;
#[cfg(feature = "logforth")]
mod logforth_service;

pub mod histogram;
mod mem_allocator;
mod metric;
mod readable_size;
Expand All @@ -76,6 +77,7 @@ pub mod store;
pub mod tracing;
pub mod urpc;
pub mod util;

const MAX_MEMORY_ALLOCATION_SIZE_ENV_KEY: &str = "MAX_MEMORY_ALLOCATION_LIMIT_SIZE";

fn main() -> Result<()> {
Expand Down
21 changes: 18 additions & 3 deletions src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@

use crate::app::SHUFFLE_SERVER_ID;
use crate::config::Config;
use crate::histogram;
use crate::mem_allocator::ALLOCATOR;
use crate::readable_size::ReadableSize;
use crate::runtime::manager::RuntimeManager;
use log::{error, info};
use once_cell::sync::Lazy;
use prometheus::{
histogram_opts, labels, register_gauge_vec, register_histogram_vec_with_registry,
register_int_counter_vec, register_int_gauge_vec, GaugeVec, Histogram, HistogramOpts,
HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
histogram_opts, labels, register_gauge_vec, register_histogram_vec,
register_histogram_vec_with_registry, register_int_counter_vec, register_int_gauge_vec,
GaugeVec, Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge,
IntGaugeVec, Registry,
};
use std::collections::HashMap;
use std::time::Duration;
Expand All @@ -50,6 +52,12 @@ const SPILL_BATCH_SIZE_BUCKETS: &[f64] = &[
ReadableSize::gb(100).as_bytes() as f64,
];

pub static GRPC_GET_LOCALFILE_DATA_LATENCY: Lazy<histogram::Histogram> =
Lazy::new(|| crate::histogram::Histogram::new("grpc_get_localfile_data_latency"));

pub static GRPC_GET_LOCALFILE_INDEX_LATENCY: Lazy<histogram::Histogram> =
Lazy::new(|| crate::histogram::Histogram::new("grpc_get_localfile_index_latency"));

pub static REGISTRY: Lazy<Registry> = Lazy::new(Registry::new);

pub static TOTAL_RECEIVED_DATA: Lazy<IntCounter> = Lazy::new(|| {
Expand Down Expand Up @@ -101,6 +109,10 @@ pub static GAUGE_MEMORY_SPILL_IN_QUEUE_BYTES: Lazy<IntGauge> = Lazy::new(|| {
.expect("")
});

pub static LATENCY_GENERAL: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!("latency_general", "latency_general", &["name", "quantile"]).unwrap()
});

pub static GRPC_GET_MEMORY_DATA_TRANSPORT_TIME: Lazy<Histogram> = Lazy::new(|| {
let opts = HistogramOpts::new("grpc_get_memory_data_transport_time", "none")
.buckets(Vec::from(DEFAULT_BUCKETS as &'static [f64]));
Expand Down Expand Up @@ -836,6 +848,9 @@ impl MetricService {
#[cfg(all(unix, feature = "allocator-analysis"))]
GAUGE_ALLOCATOR_ALLOCATED_SIZE.set(ALLOCATOR.allocated() as i64);

GRPC_GET_LOCALFILE_DATA_LATENCY.observe();
GRPC_GET_LOCALFILE_INDEX_LATENCY.observe();

let general_metrics = prometheus::gather();
let custom_metrics = REGISTRY.gather();
let mut metrics = vec![];
Expand Down

0 comments on commit 74b3719

Please sign in to comment.