From fefb2f3d446d1a7d71d921b3bd12e5f93c9dc125 Mon Sep 17 00:00:00 2001 From: vicanso Date: Sun, 22 Dec 2024 14:48:00 +0800 Subject: [PATCH] refactor: adjust prometheus metrics --- src/proxy/server.rs | 14 +- src/state/prom.rs | 320 +++++++++++++++++++++++++++----------------- 2 files changed, 203 insertions(+), 131 deletions(-) diff --git a/src/proxy/server.rs b/src/proxy/server.rs index 0fa2126..5246144 100644 --- a/src/proxy/server.rs +++ b/src/proxy/server.rs @@ -511,12 +511,6 @@ impl ProxyHttp for Server { } } - // set perometheus stats - #[cfg(feature = "full")] - if let Some(prom) = &self.prometheus { - prom.before(); - } - // locations not found let Some(locations) = get_server_locations(&self.name) else { return Ok(()); @@ -537,6 +531,14 @@ impl ProxyHttp for Server { break; } } + // set perometheus stats + #[cfg(feature = "full")] + if let Some(prom) = &self.prometheus { + let location_name = + ctx.location.as_ref().map_or("", |item| &item.name); + prom.before(location_name); + } + if let Some(location) = &ctx.location { location .validate_content_length(header) diff --git a/src/state/prom.rs b/src/state/prom.rs index d4e981c..d1b6a04 100644 --- a/src/state/prom.rs +++ b/src/state/prom.rs @@ -19,7 +19,9 @@ use humantime::parse_duration; use once_cell::sync::Lazy; use pingora::proxy::Session; use prometheus::core::Collector; -use prometheus::{Encoder, Opts, ProtobufEncoder, Registry, TextEncoder}; +use prometheus::{ + Encoder, HistogramVec, Opts, ProtobufEncoder, Registry, TextEncoder, +}; use prometheus::{ Histogram, HistogramOpts, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, }; @@ -55,21 +57,21 @@ pub static CACHE_WRITING_TIME: Lazy> = Lazy::new(|| { pub struct Prometheus { r: Registry, - http_request_accepted: Box, - http_request_processing: Box, - http_reqesut_body_received: Box, - http_response_codes: Box, - http_response_time: Box, - http_response_body_sent: Box, - connection_reused: Box, + http_requests_total: Box, + http_requests_current: Box, + http_received: Box, + http_responses_codes: Box, + http_response_time: Box, + http_sent: Box, + connection_reuses: Box, tls_handshake_time: Box, - upstream_connected: Box, - upstream_processing: Box, - upstream_tcp_connect_time: Box, - upstream_tls_handshake_time: Box, - upstream_reused: Box, - upstream_processing_time: Box, - upstream_response_time: Box, + upstream_connections: Box, + upstream_connections_current: Box, + upstream_tcp_connect_time: Box, + upstream_tls_handshake_time: Box, + upstream_reuses: Box, + upstream_processing_time: Box, + upstream_response_time: Box, cache_lookup_time: Box, cache_lock_time: Box, cache_reading: Box, @@ -84,90 +86,127 @@ pub struct Prometheus { const SECOND: f64 = 1000.0; impl Prometheus { - pub fn before(&self) { - self.http_request_accepted.inc(); - self.http_request_processing.inc(); + pub fn before(&self, location: &str) { + self.http_requests_total.with_label_values(&[""]).inc(); + self.http_requests_current.with_label_values(&[""]).inc(); + if !location.is_empty() { + self.http_requests_total + .with_label_values(&[location]) + .inc(); + self.http_requests_current + .with_label_values(&[location]) + .inc(); + } } pub fn after(&self, session: &Session, ctx: &State) { - let ms = (util::now().as_millis() as u64) - ctx.created_at; + let mut location = ""; + let mut upstream = ""; + if let Some(lo) = &ctx.location { + location = &lo.name; + upstream = &lo.upstream; + } + let response_time = + ((util::now().as_millis() as u64) - ctx.created_at) as f64 / SECOND; + // payload size(kb) + let payload_size = ctx.payload_size as f64 / 1024.0; let mut code = 0; if let Some(status) = &ctx.status { code = status.as_u16(); } - self.http_request_processing.dec(); + let sent = session.body_bytes_sent() as f64 / 1024.0; // http response code - let label_values = match code { - 100..=199 => Some(["1xx"]), - 200..=299 => Some(["2xx"]), - 300..=399 => Some(["3xx"]), - 400..=499 => Some(["4xx"]), - 500..=599 => Some(["5xx"]), - _ => None, + let code_label = match code { + 100..=199 => "1xx", + 200..=299 => "2xx", + 300..=399 => "3xx", + 400..=499 => "4xx", + 500..=599 => "5xx", + _ => "unknown", }; - if let Some(label_values) = &label_values { - self.http_response_codes - .with_label_values(label_values) - .inc(); + let mut labels_list = Vec::with_capacity(2); + labels_list.push([""]); + if !location.is_empty() { + labels_list.push([location]); + } + for labels in labels_list.iter() { + self.http_requests_current.with_label_values(labels).dec(); + self.http_received + .with_label_values(labels) + .observe(payload_size); + // response time x second + self.http_response_time + .with_label_values(labels) + .observe(response_time); + + // response body size(kb) + self.http_sent.with_label_values(labels).observe(sent); } - // response time x second - self.http_response_time.observe(ms as f64 / SECOND); + self.http_responses_codes + .with_label_values(&["", code_label]) + .inc(); + + if !location.is_empty() { + self.http_responses_codes + .with_label_values(&[location, code_label]) + .inc(); + } // reused connection if ctx.connection_reused { - self.connection_reused.inc(); + self.connection_reuses.inc(); } if let Some(tls_handshake_time) = ctx.tls_handshake_time { self.tls_handshake_time .observe(tls_handshake_time as f64 / SECOND); } - // response body size(kb) - self.http_response_body_sent - .observe(session.body_bytes_sent() as f64 / 1024.0); - // payload size(kb) - if ctx.payload_size != 0 { - self.http_reqesut_body_received - .observe(ctx.payload_size as f64 / 1024.0); - } - - // location stats - if let Some(lo) = &ctx.location { + // upstream + if !upstream.is_empty() { + let upstream_labels = &[upstream]; if let Some(count) = ctx.upstream_connected { - self.upstream_connected - .with_label_values(&[&lo.upstream]) + self.upstream_connections + .with_label_values(upstream_labels) .set(count as i64); } if let Some(count) = ctx.upstream_processing { - self.upstream_processing - .with_label_values(&[&lo.upstream]) + self.upstream_connections_current + .with_label_values(upstream_labels) .set(count as i64); } - } - - // upstream stats - if let Some(upstream_tcp_connect_time) = ctx.upstream_tcp_connect_time { - self.upstream_tcp_connect_time - .observe(upstream_tcp_connect_time as f64 / SECOND); - } - if let Some(upstream_tls_handshake_time) = - ctx.upstream_tls_handshake_time - { - self.upstream_tls_handshake_time - .observe(upstream_tls_handshake_time as f64 / SECOND); - } - if ctx.upstream_reused { - self.upstream_reused.inc(); - } - if let Some(upstream_processing_time) = ctx.upstream_processing_time { - self.upstream_processing_time - .observe(upstream_processing_time as f64 / SECOND); - } - if let Some(upstream_response_time) = ctx.upstream_response_time { - self.upstream_response_time - .observe(upstream_response_time as f64 / SECOND); + // upstream stats + if let Some(upstream_tcp_connect_time) = + ctx.upstream_tcp_connect_time + { + self.upstream_tcp_connect_time + .with_label_values(upstream_labels) + .observe(upstream_tcp_connect_time as f64 / SECOND); + } + if let Some(upstream_tls_handshake_time) = + ctx.upstream_tls_handshake_time + { + self.upstream_tls_handshake_time + .with_label_values(upstream_labels) + .observe(upstream_tls_handshake_time as f64 / SECOND); + } + if ctx.upstream_reused { + self.upstream_reuses + .with_label_values(upstream_labels) + .inc(); + } + if let Some(upstream_processing_time) = ctx.upstream_processing_time + { + self.upstream_processing_time + .with_label_values(upstream_labels) + .observe(upstream_processing_time as f64 / SECOND); + } + if let Some(upstream_response_time) = ctx.upstream_response_time { + self.upstream_response_time + .with_label_values(upstream_labels) + .observe(upstream_response_time as f64 / SECOND); + } } // cache stats @@ -351,7 +390,7 @@ fn new_int_counter_vec( Ok(counter) } -fn new_intguage_vec( +fn new_int_gauge_vec( server: &str, name: &str, help: &str, @@ -385,50 +424,76 @@ fn new_histogram( })?; Ok(histogram) } +fn new_histogram_vec( + server: &str, + name: &str, + help: &str, + label_names: &[&str], + buckets: &[f64], +) -> Result { + let mut opts = HistogramOpts::new(name, help); + if !server.is_empty() { + opts = opts.const_label("server", server); + } + opts = opts.buckets(buckets.into()); + + let histogram = HistogramVec::new(opts, label_names).map_err(|e| { + Error::Prometheus { + message: e.to_string(), + } + })?; + + Ok(histogram) +} /// Create a prometheus metrics for server pub fn new_prometheus(server: &str) -> Result { let r = Registry::new(); - let http_request_accepted = Box::new(new_int_counter( + let http_requests_total = Box::new(new_int_counter_vec( server, - "pingap_http_request_accepted", - "pingap http request accepted count", + "pingap_http_requests_total", + "pingap total http requests", + &["location"], )?); - let http_request_processing = Box::new(new_int_gauge( + let http_requests_current = Box::new(new_int_gauge_vec( server, - "pingap_http_request_processing", - "pingap http request processing count", + "pingap_http_requests_current", + "pingap current http requests", + &["location"], )?); - let http_reqesut_body_received = Box::new(new_histogram( + let http_received = Box::new(new_histogram_vec( server, - "pingap_http_reqesut_body_received", - "pingap http request body received(KB)", + "pingap_http_received", + "pingap http received from clients(KB)", + &["location"], &[1.0, 5.0, 10.0, 50.0, 100.0, 1000.0], )?); - let http_response_codes = Box::new(new_int_counter_vec( + let http_responses_codes = Box::new(new_int_counter_vec( server, - "pingap_http_response_codes", - "pingap http response codes", - &["status_code"], + "pingap_http_responses_codes", + "pingap total responses sent to clients by code", + &["location", "code"], )?); - let http_response_time = Box::new(new_histogram( + let http_response_time = Box::new(new_histogram_vec( server, "pingap_http_response_time", "pingap http response time(second)", + &["location"], &[ 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, ], )?); - let http_response_body_sent = Box::new(new_histogram( + let http_sent = Box::new(new_histogram_vec( server, - "pingap_http_response_body_sent", - "pingap http response body send(KB)", + "pingap_http_sent", + "pingap http sent to clients(KB)", + &["location"], &[1.0, 5.0, 10.0, 50.0, 100.0, 1000.0, 10000.0], )?); - let connection_reused = Box::new(new_int_counter( + let connection_reuses = Box::new(new_int_counter( server, - "pingap_connection_reused", - "pingap connection reused count", + "pingap_connection_reuses", + "pingap connection reuses during tcp connect", )?); let tls_handshake_time = Box::new(new_histogram( server, @@ -437,45 +502,50 @@ pub fn new_prometheus(server: &str) -> Result { &[0.01, 0.05, 0.1, 0.5, 1.0], )?); - let upstream_connected = Box::new(new_intguage_vec( + let upstream_connections = Box::new(new_int_gauge_vec( server, - "pingap_upstream_connected", - "pingap upstream connected count", + "pingap_upstream_connections", + "pingap connected connections of upstream", &["upstream"], )?); - let upstream_processing = Box::new(new_intguage_vec( + let upstream_connections_current = Box::new(new_int_gauge_vec( server, - "pingap_upstream_processing", - "pingap upstream processing count", + "pingap_upstream_connections_current", + "pingap current connections of upstream", &["upstream"], )?); - let upstream_tcp_connect_time = Box::new(new_histogram( + let upstream_tcp_connect_time = Box::new(new_histogram_vec( server, "pingap_upstream_tcp_connect_time", "pingap upstream tcp connect time(second)", + &["upstream"], &[0.005, 0.01, 0.05, 0.1, 0.5, 1.0], )?); - let upstream_tls_handshake_time = Box::new(new_histogram( + let upstream_tls_handshake_time = Box::new(new_histogram_vec( server, "pingap_upstream_tls_handshake_time", "pingap upstream tsl handshake time(second)", + &["upstream"], &[0.01, 0.05, 0.1, 0.5, 1.0], )?); - let upstream_reused = Box::new(new_int_counter( + let upstream_reuses = Box::new(new_int_counter_vec( server, - "pingap_upstream_reused", - "pingap upstream reused count", + "pingap_upstream_reuses", + "pingap connection reuse during connect to upstream", + &["upstream"], )?); - let upstream_processing_time = Box::new(new_histogram( + let upstream_processing_time = Box::new(new_histogram_vec( server, "pingap_upstream_processing_time", "pingap upstream processing time(second)", + &["upstream"], &[0.01, 0.02, 0.1, 0.5, 1.0, 5.0, 10.0], )?); - let upstream_response_time = Box::new(new_histogram( + let upstream_response_time = Box::new(new_histogram_vec( server, "pingap_upstream_response_time", "pingap upstream response time(second)", + &["upstream"], &[0.005, 0.01, 0.05, 0.1, 0.5, 1.0], )?); let cache_lookup_time = Box::new(new_histogram( @@ -529,19 +599,19 @@ pub fn new_prometheus(server: &str) -> Result { )?); let collectors: Vec> = vec![ - http_request_accepted.clone(), - http_request_processing.clone(), - http_reqesut_body_received.clone(), - http_response_codes.clone(), + http_requests_total.clone(), + http_requests_current.clone(), + http_received.clone(), + http_responses_codes.clone(), http_response_time.clone(), - http_response_body_sent.clone(), - connection_reused.clone(), + http_sent.clone(), + connection_reuses.clone(), tls_handshake_time.clone(), - upstream_connected.clone(), - upstream_processing.clone(), + upstream_connections.clone(), + upstream_connections_current.clone(), upstream_tcp_connect_time.clone(), upstream_tls_handshake_time.clone(), - upstream_reused.clone(), + upstream_reuses.clone(), upstream_processing_time.clone(), upstream_response_time.clone(), cache_lookup_time.clone(), @@ -564,19 +634,19 @@ pub fn new_prometheus(server: &str) -> Result { Ok(Prometheus { r, - http_request_accepted, - http_request_processing, - http_reqesut_body_received, - http_response_codes, + http_requests_total, + http_requests_current, + http_received, + http_responses_codes, http_response_time, - http_response_body_sent, - connection_reused, + http_sent, + connection_reuses, tls_handshake_time, - upstream_connected, - upstream_processing, + upstream_connections, + upstream_connections_current, upstream_tcp_connect_time, upstream_tls_handshake_time, - upstream_reused, + upstream_reuses, upstream_processing_time, upstream_response_time, cache_lookup_time, @@ -624,7 +694,7 @@ mod tests { session.read_request().await.unwrap(); let p = new_prometheus("pingap").unwrap(); - p.before(); + p.before(""); p.after( &session,