diff --git a/Cargo.lock b/Cargo.lock index 1ca5c93..2ca033e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -949,11 +949,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", - "log", "wasi", "windows-sys 0.48.0", ] +[[package]] +name = "mio" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4929e1f84c5e54c3ec6141cd5d8b5a5c055f031f80cf78f2072920173cb4d880" +dependencies = [ + "hermit-abi", + "libc", + "log", + "wasi", + "windows-sys 0.52.0", +] + [[package]] name = "multimap" version = "0.10.0" @@ -1485,9 +1497,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c82cf8cff14456045f55ec4241383baeff27af886adb72ffb2162f99911de0fd" +checksum = "2a908a6e00f1fdd0dfd9c0eb08ce85126f6d8bbda50017e74bc4a4b7d4a926a4" dependencies = [ "bitflags", ] @@ -1970,14 +1982,14 @@ dependencies = [ [[package]] name = "sozu-client" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36c7967edbe9836d3eed9dd44d3804c5ace8798d6aca2f27926f3e8a00bc5fdc" +checksum = "85fac7a514b946392e349307a71d45df69bbbeb5964ec232b6549f1b2cdba889" dependencies = [ "async-trait", "bb8", "config", - "mio", + "mio 1.0.0", "serde_json", "sozu-command-lib", "tempdir", @@ -1988,15 +2000,15 @@ dependencies = [ [[package]] name = "sozu-command-lib" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff7c4c859df42f9504d8a79630dfa847115903b65425f1b30847cd6979aa2d17" +checksum = "1f587a5cf85a7b8acaed005bca6502b7bd241a3e931d2cfb462327d540086004" dependencies = [ "hex", "libc", "log", "memchr", - "mio", + "mio 1.0.0", "nix", "nom", "pool", @@ -2138,18 +2150,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.62" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2675633b1499176c2dff06b0856a27976a8f9d436737b4cf4f312d4d91d8bbb" +checksum = "c0342370b38b6a11b6cc11d6a805569958d54cfa061a29969c3b5ce2ea405724" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.62" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d20468752b09f49e909e55a5d338caa8bedf615594e9d80bc4c565d30faf798c" +checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", @@ -2223,14 +2235,14 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.38.0" +version = "1.38.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a" +checksum = "eb2caba9f80616f438e09748d5acda951967e1ea58508ef53d9c6402485a46df" dependencies = [ "backtrace", "bytes", "libc", - "mio", + "mio 0.8.11", "parking_lot", "pin-project-lite", "signal-hook-registry", @@ -2263,9 +2275,9 @@ dependencies = [ [[package]] name = "toml" -version = "0.8.14" +version = "0.8.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f49eb2ab21d2f26bd6db7bf383edc527a7ebaee412d17af4d40fdccd442f335" +checksum = "ac2caab0bf757388c6c0ae23b3293fdb463fee59434529014f85e3263b995c28" dependencies = [ "serde", "serde_spanned", @@ -2284,9 +2296,9 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.22.15" +version = "0.22.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d59a3a72298453f564e2b111fa896f8d07fabb36f51f06d7e875fc5e0b5a3ef1" +checksum = "278f3d518e152219c994ce877758516bca5e118eaed6996192a774fb9fbf0788" dependencies = [ "indexmap", "serde", diff --git a/Cargo.toml b/Cargo.toml index 3a8e395..f0694e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,8 +23,8 @@ serde = { version = "^1.0.195", features = ["derive"] } serde_json = "^1.0.111" sentry = { version = "^0.34.0", default-features = false, features = ["backtrace", "contexts", "panic", "reqwest", "rustls"] } sentry-tracing = "^0.34.0" -sozu-client = "^0.4.0" -sozu-command-lib = "^1.0.0" +sozu-client = "^0.4.1" +sozu-command-lib = "^1.0.3" thiserror = "^1.0.56" tokio = { version = "^1.35.1", features = ["macros", "rt", "signal"] } tracing = "^0.1.40" diff --git a/example.config.toml b/example.config.toml index da4d9f1..5c7d102 100644 --- a/example.config.toml +++ b/example.config.toml @@ -1,9 +1,6 @@ # Socket address on which to listen listening-address = "0.0.0.0:3000" -# aggregate backend metrics by cluster id -aggregate-backend-metrics = true - [sozu] # Path to Sōzu's configuration file configuration = "path/to/sozu/config.toml" diff --git a/src/svc/config.rs b/src/svc/config.rs index 78bf7f3..9bafe91 100644 --- a/src/svc/config.rs +++ b/src/svc/config.rs @@ -42,8 +42,6 @@ pub struct Sozu { pub struct ConnectorConfiguration { #[serde(rename = "listening-address")] pub listening_address: SocketAddr, - #[serde(rename = "aggregate-backend-metrics")] - pub aggregate_backend_metrics: bool, #[serde(rename = "sozu")] pub sozu: Sozu, #[serde(rename = "sentry")] diff --git a/src/svc/http/server/handler.rs b/src/svc/http/server/handler.rs index 03ced7c..90b6bc3 100644 --- a/src/svc/http/server/handler.rs +++ b/src/svc/http/server/handler.rs @@ -97,10 +97,7 @@ pub async fn telemetry(State(state): State, _req: Request) content_type: Some(ContentType::Metrics(aggregated_metrics)), }), .. - }) => convert_metrics_to_prometheus( - aggregated_metrics, - state.config.aggregate_backend_metrics, - ), + }) => convert_metrics_to_prometheus(aggregated_metrics), Ok(response) => { let headers = res.headers_mut(); let message = serde_json::json!({ diff --git a/src/svc/telemetry/prometheus.rs b/src/svc/telemetry/prometheus.rs index 0b780cf..38ef733 100644 --- a/src/svc/telemetry/prometheus.rs +++ b/src/svc/telemetry/prometheus.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, fmt::Display}; +use std::{fmt::Display, iter}; use sozu_command_lib::proto::command::{ filtered_metrics::Inner, AggregatedMetrics, BackendMetrics, FilteredMetrics, @@ -10,7 +10,7 @@ use urlencoding::encode; enum MetricType { Counter, Gauge, - // Histogram, + Histogram, Unsupported, } @@ -19,14 +19,14 @@ impl Display for MetricType { match *self { MetricType::Counter => write!(f, "counter"), MetricType::Gauge => write!(f, "gauge"), - // MetricType::Histogram => write!(f, "histogram"), + MetricType::Histogram => write!(f, "histogram"), MetricType::Unsupported => write!(f, "unsupported"), // should never happen } } } /// convertible to prometheus metric in this form: -/// metric_name{label="something",second_lable="something-else"} value +/// metric_name{label="something",second_label="something-else"} value struct LabeledMetric { metric_name: String, labels: Vec<(String, String)>, @@ -50,33 +50,83 @@ impl LabeledMetric { self.metric_name.replace('.', "_") } + /// Create a type line, typically: + /// /// # TYPE protocol_https gauge fn type_line(&self) -> String { let printable_metric_name = self.printable_name(); format!("# TYPE {} {}", printable_metric_name, self.metric_type) } + /// Format labels in a comma-separated list: + /// + /// ```plain + /// "label"="value","other"="value" + /// ``` + fn formatted_labels(&self) -> String { + self.labels + .iter() + .map(|(name, value)| format!("{}=\"{}\"", name, value)) + .collect::>() + .join(",") + } + + /// Create a metric line, typically: + /// + /// ```plain /// http_active_requests{worker="0"} 0 + /// ``` + /// For histograms, several lines are produced: sum, count, buckets fn metric_line(&self) -> String { let printable_metric_name = self.printable_name(); - let formatted_labels: String = format_labels(&self.labels); - let value = match &self.value.inner { + let formatted_labels = self.formatted_labels(); + match &self.value.inner { Some(inner) => { match inner { - Inner::Gauge(value) => value.to_string(), - Inner::Count(value) => value.to_string(), + Inner::Gauge(value) => format!( + "{}{{{}}} {}", + printable_metric_name, formatted_labels, value + ), + Inner::Count(value) => format!( + "{}{{{}}} {}", + printable_metric_name, formatted_labels, value + ), + Inner::Histogram(hist) => hist + .buckets + .iter() + .map(|bucket| { + if formatted_labels.is_empty() { + format!( + "{}_bucket{{le=\"{}\"}} {}\n", + printable_metric_name, bucket.le, bucket.count + ) + } else { + format!( + "{}_bucket{{{}, le=\"{}\"}} {}\n", + printable_metric_name, + formatted_labels, + bucket.le, + bucket.count + ) + } + }) + .chain(iter::once(format!( + "{}_sum{{{}}} {}\n", + printable_metric_name, formatted_labels, hist.sum + ))) + .chain(iter::once(format!( + "{}_count{{{}}} {}", + printable_metric_name, formatted_labels, hist.count + ))) + .collect::(), Inner::Time(_) | Inner::Percentiles(_) | Inner::TimeSerie(_) => { // should not happen at that point - return String::new(); + String::new() } } } - None => return String::new(), - }; - format!( - "{}{{{}}} {}", - printable_metric_name, formatted_labels, value - ) + None => String::new(), + } } } @@ -86,6 +136,7 @@ impl From for LabeledMetric { Some(inner) => match inner { Inner::Gauge(_) => MetricType::Gauge, Inner::Count(_) => MetricType::Counter, + Inner::Histogram(_) => MetricType::Histogram, Inner::Time(_) | Inner::Percentiles(_) | Inner::TimeSerie(_) => { MetricType::Unsupported } @@ -103,12 +154,9 @@ impl From for LabeledMetric { /// Convert aggregated metrics into prometheus serialize one #[tracing::instrument(skip_all)] -pub fn convert_metrics_to_prometheus( - aggregated_metrics: AggregatedMetrics, - convert_backend_metrics: bool, -) -> String { +pub fn convert_metrics_to_prometheus(aggregated_metrics: AggregatedMetrics) -> String { debug!("Converting metrics to prometheus format"); - let labeled_metrics = apply_labels(aggregated_metrics, convert_backend_metrics); + let labeled_metrics = apply_labels(aggregated_metrics); let metric_names = get_unique_metric_names(&labeled_metrics); @@ -125,186 +173,54 @@ pub fn convert_metrics_to_prometheus( } /// assign worker_id and cluster_id as labels -fn apply_labels( - aggregated_metrics: AggregatedMetrics, - aggregate_backend_metrics: bool, -) -> Vec { +fn apply_labels(aggregated_metrics: AggregatedMetrics) -> Vec { let mut labeled_metrics = Vec::new(); // metrics of the main process for (metric_name, value) in aggregated_metrics.main.iter() { let mut labeled = LabeledMetric::from(value.clone()); - labeled.with_name(metric_name); - labeled.with_label("worker", "main"); + labeled.with_name(&format!("{}_main", metric_name)); labeled_metrics.push(labeled); } - if aggregate_backend_metrics { - return aggregate_metrics_by_cluster(aggregated_metrics); + // proxying metrics + for (metric_name, value) in aggregated_metrics.proxying.iter() { + let mut labeled = LabeledMetric::from(value.clone()); + labeled.with_name(&format!("{}_total", metric_name)); + + labeled_metrics.push(labeled); } - // if metrics are not aggregated, - // worker metrics - for (worker_id, worker_metrics) in aggregated_metrics.workers { - // proxy metrics (bytes in, accept queue…) - for (metric_name, value) in worker_metrics.proxy { + // cluster metrics (applications) + for (cluster_id, cluster_metrics) in aggregated_metrics.clusters { + for (metric_name, value) in cluster_metrics.cluster { let mut labeled = LabeledMetric::from(value.clone()); labeled.with_name(&metric_name); - labeled.with_label("worker", &worker_id); + labeled.with_label("cluster_id", &cluster_id); labeled_metrics.push(labeled); } - // cluster metrics (applications) - for (cluster_id, cluster_metrics) in worker_metrics.clusters { - for (metric_name, value) in cluster_metrics.cluster { + // backend metrics (several backends for a given cluster) + for backend_metrics in cluster_metrics.backends { + let BackendMetrics { + backend_id, + metrics, + } = backend_metrics; + + for (metric_name, value) in metrics { let mut labeled = LabeledMetric::from(value.clone()); labeled.with_name(&metric_name); labeled.with_label("cluster_id", &cluster_id); + labeled.with_label("backend_id", &backend_id); labeled_metrics.push(labeled); } - - // backend metrics (several backends for a given cluster) - for backend_metrics in cluster_metrics.backends { - let BackendMetrics { - backend_id, - metrics, - } = backend_metrics; - - for (metric_name, value) in metrics { - let mut labeled = LabeledMetric::from(value.clone()); - labeled.with_name(&metric_name); - labeled.with_label("cluster_id", &cluster_id); - labeled.with_label("backend_id", &backend_id); - labeled_metrics.push(labeled); - } - } } } labeled_metrics } -/// flatten all metrics to have only the cluster_id being a relevant label -fn aggregate_metrics_by_cluster(aggregated_metrics: AggregatedMetrics) -> Vec { - let mut labeled_metrics = Vec::new(); - - // cluster_id -> (metric_name -> value) - let mut acc: HashMap> = HashMap::new(); - - // PROXY WIDE METRICS - let mut proxy_wide_metrics: HashMap = HashMap::new(); - - for (metric_name, new_value) in &aggregated_metrics.main { - match proxy_wide_metrics.get_mut(metric_name) { - Some(old_value) => { - if let Some(updated_value) = aggregate_filtered_metrics(old_value, &new_value) { - proxy_wide_metrics.insert(metric_name.to_string(), updated_value); - } - } - None => { - proxy_wide_metrics.insert(metric_name.to_string(), new_value.to_owned()); - } - } - } - acc.insert("metrics_with_no_cluster".to_string(), proxy_wide_metrics); - - for (_worker_id, worker_metrics) in aggregated_metrics.workers { - for (metric_name, new_value) in &worker_metrics.proxy { - match acc.get_mut("metrics_with_no_cluster") { - Some(aggregated_map) => match aggregated_map.get_mut(metric_name) { - Some(old_value) => { - if let Some(updated_value) = - aggregate_filtered_metrics(old_value, &new_value) - { - aggregated_map.insert(metric_name.to_string(), updated_value); - } else { - aggregated_map.insert(metric_name.to_owned(), new_value.to_owned()); - } - } - None => { - aggregated_map.insert(metric_name.to_string(), new_value.to_owned()); - } - }, - None => {} - } - } - - for (cluster_id, cluster_metrics) in &worker_metrics.clusters { - for (metric_name, new_value) in &cluster_metrics.cluster { - match acc.get_mut(cluster_id) { - Some(aggregated_map) => { - if let Some(old_value) = aggregated_map.get_mut(metric_name) { - if let Some(updated_value) = - aggregate_filtered_metrics(old_value, &new_value) - { - aggregated_map.insert(metric_name.to_owned(), updated_value); - } - } else { - aggregated_map.insert(metric_name.to_owned(), new_value.to_owned()); - } - } - None => { - let mut new_map = HashMap::new(); - new_map.insert(metric_name.to_owned(), new_value.to_owned()); - acc.insert(cluster_id.to_owned(), new_map); - } - } - } - - for backend_metrics in &cluster_metrics.backends { - for (metric_name, new_value) in &backend_metrics.metrics { - match acc.get_mut(cluster_id) { - Some(aggregated_map) => { - if let Some(old_value) = aggregated_map.get_mut(metric_name) { - if let Some(updated_value) = - aggregate_filtered_metrics(old_value, &new_value) - { - aggregated_map.insert(metric_name.to_owned(), updated_value); - } - } else { - aggregated_map.insert(metric_name.to_owned(), new_value.to_owned()); - } - } - None => { - let mut new_map = HashMap::new(); - new_map.insert(metric_name.to_owned(), new_value.to_owned()); - acc.insert(cluster_id.to_owned(), new_map); - } - } - } - } - } - } - - for (cluster_id, metrics) in acc { - for (metric_name, value) in metrics { - let mut labeled = LabeledMetric::from(value.clone()); - labeled.with_name(&metric_name); - if &cluster_id != "metrics_with_no_cluster" { - labeled.with_label("cluster_id", &cluster_id); - } - labeled_metrics.push(labeled); - } - } - return labeled_metrics; -} - -fn aggregate_filtered_metrics( - left: &FilteredMetrics, - right: &FilteredMetrics, -) -> Option { - match (&left.inner, &right.inner) { - (Some(Inner::Gauge(a)), Some(Inner::Gauge(b))) => Some(FilteredMetrics { - inner: Some(Inner::Gauge(a + b)), - }), - (Some(Inner::Count(a)), Some(Inner::Count(b))) => Some(FilteredMetrics { - inner: Some(Inner::Count(a + b)), - }), - _ => None, - } -} - fn get_unique_metric_names(labeled_metrics: &Vec) -> Vec { let mut names = Vec::new(); for metric in labeled_metrics { @@ -350,20 +266,12 @@ fn replace_dots_with_underscores(str: &str) -> String { str.replace('.', "_") } -fn format_labels(labels: &[(String, String)]) -> String { - labels - .iter() - .map(|(name, value)| format!("{}=\"{}\"", name, value)) - .collect::>() - .join(",") -} - #[cfg(test)] mod test { use std::collections::BTreeMap; use sozu_command_lib::proto::command::{ - filtered_metrics::Inner, AggregatedMetrics, ClusterMetrics, FilteredMetrics, WorkerMetrics, + filtered_metrics::Inner, AggregatedMetrics, ClusterMetrics, FilteredMetrics, }; use super::*; @@ -387,20 +295,12 @@ mod test { let mut clusters = BTreeMap::new(); clusters.insert(cluster_id, cluster_metrics); - let worker_metrics = WorkerMetrics { - proxy: BTreeMap::new(), - clusters, - }; - - let mut workers = BTreeMap::new(); - workers.insert("WORKER-01".to_string(), worker_metrics); - let aggregated_metrics = AggregatedMetrics { - main: BTreeMap::new(), - workers, + clusters, + ..Default::default() }; - let prometheus_metrics = convert_metrics_to_prometheus(aggregated_metrics, true); + let prometheus_metrics = convert_metrics_to_prometheus(aggregated_metrics); let expected = r#"# TYPE http_response_status gauge http_response_status{cluster_id="http%3A%2F%2Fmy-cluster-id.com%2Fapi%3Fparam%3Dvalue"} 3 @@ -408,85 +308,25 @@ http_response_status{cluster_id="http%3A%2F%2Fmy-cluster-id.com%2Fapi%3Fparam%3D assert_eq!(expected.to_string(), prometheus_metrics); } -} -/* this is all false - -/// convert a Sōzu Percentiles struct into prometheus histogram lines: -/// ``` -/// # TYPE metric_name histogram -/// metric_name_bucket{le="0.5"} value -/// metric_name_bucket{le="0.9"} value -/// metric_name_bucket{le="0.99"} value -/// metric_name_bucket{le="0.999"} value -/// metric_name_bucket{le="0.9999"} value -/// metric_name_bucket{le="0.99999"} value -/// metric_name_bucket{le="1"} value -/// metric_name_sum sum-of-measurements -/// metric_name_count percentiles.samples -/// ``` -/// (additionnal labels not show between the brackets) -#[tracing::instrument(skip(percentiles))] -fn create_percentile_lines( - metric_name: &str, - labels: &[(&str, &str)], - percentiles: &Percentiles, -) -> String { - let bucket_name = format!("{}_bucket", metric_name); - let sum = 0; // we can not compute it as of version 0.15.3 of sozu-command-lib + #[test] + fn format_labels() { + let metric = FilteredMetrics { + inner: Some(Inner::Count(3)), + }; + let mut labeled_metric = LabeledMetric::from(metric); - let mut lines = String::new(); - let sample_line = create_metric_line_with_labels( - &format!("{}_samples", metric_name), - labels, - percentiles.samples, - ); - let p_50_line = create_histogram_line(&bucket_name, "0.5", labels, percentiles.p_50); - let p_90_line = create_histogram_line(&bucket_name, "0.9", labels, percentiles.p_90); - let p_99_line = create_histogram_line(&bucket_name, "0.99", labels, percentiles.p_99); - let p_99_9_line = create_histogram_line(&bucket_name, "0.999", labels, percentiles.p_99_9); - let p_99_99_line = create_histogram_line(&bucket_name, "0.9999", labels, percentiles.p_99_99); - let p_99_999_line = - create_histogram_line(&bucket_name, "0.99999", labels, percentiles.p_99_999); - let p_100_line = create_histogram_line(&bucket_name, "1", labels, percentiles.p_100); - let inf_line = create_histogram_line(&bucket_name, "+Inf", labels, percentiles.p_100); - let sum_line = create_metric_line_with_labels(&format!("{}_sum", metric_name), labels, sum); - lines.push_str(&sample_line); - lines.push('\n'); - lines.push_str(&p_50_line); - lines.push('\n'); - lines.push_str(&p_90_line); - lines.push('\n'); - lines.push_str(&p_99_line); - lines.push('\n'); - lines.push_str(&p_99_9_line); - lines.push('\n'); - lines.push_str(&p_99_99_line); - lines.push('\n'); - lines.push_str(&p_99_999_line); - lines.push('\n'); - lines.push_str(&p_100_line); - lines.push('\n'); - lines.push_str(&inf_line); - lines.push('\n'); - lines.push_str(&sum_line); - lines.push('\n'); + assert_eq!(labeled_metric.formatted_labels(), ""); - lines -} + labeled_metric.with_label("le", "3"); -fn create_histogram_line( - bucket_name: &str, - less_than: &str, - labels: &[(&str, &str)], - value: T, -) -> String -where - T: ToString, -{ - let mut labels = labels.to_owned(); - labels.push(("le", less_than)); - create_metric_line_with_labels(bucket_name, &labels, value) -} + assert_eq!(labeled_metric.formatted_labels(), r#"le="3""#); + + labeled_metric.with_label("cluster_id", "http://my-cluster-id.com/api?param=value"); -*/ + assert_eq!( + labeled_metric.formatted_labels(), + r#"le="3",cluster_id="http%3A%2F%2Fmy-cluster-id.com%2Fapi%3Fparam%3Dvalue""# + ) + } +}