From 3b26d185bd0a035b573eedfb7ef1cb254193208c Mon Sep 17 00:00:00 2001 From: Emmanuel Bosquet Date: Tue, 24 Oct 2023 21:20:08 +0200 Subject: [PATCH] aggregate-metrics-by-cluster --- src/svc/telemetry/prometheus.rs | 157 ++++++++++++++++++++++++-------- 1 file changed, 119 insertions(+), 38 deletions(-) diff --git a/src/svc/telemetry/prometheus.rs b/src/svc/telemetry/prometheus.rs index 06ea40b..0b780cf 100644 --- a/src/svc/telemetry/prometheus.rs +++ b/src/svc/telemetry/prometheus.rs @@ -140,6 +140,11 @@ fn apply_labels( labeled_metrics.push(labeled); } + if aggregate_backend_metrics { + return aggregate_metrics_by_cluster(aggregated_metrics); + } + + // if metrics are not aggregated, // worker metrics for (worker_id, worker_metrics) in aggregated_metrics.workers { // proxy metrics (bytes in, accept queue…) @@ -159,54 +164,130 @@ fn apply_labels( labeled_metrics.push(labeled); } - if aggregate_backend_metrics { - let aggregated_values = cluster_metrics.backends.iter().fold( - HashMap::new(), - |mut acc, backend_metrics| { - for (metric_name, value) in &backend_metrics.metrics { - match acc.get_mut(&metric_name) { - Some(aggregated) => { - if let Some(new_value) = - aggregate_filtered_metrics(aggregated, value) - { - acc.insert(metric_name, new_value); - } - } - None => { - acc.insert(metric_name, value.to_owned()); - } - } - } - acc - }, - ); + // backend metrics (several backends for a given cluster) + for backend_metrics in cluster_metrics.backends { + let BackendMetrics { + backend_id, + metrics, + } = backend_metrics; - for (metric_name, value) in aggregated_values { + for (metric_name, value) in metrics { let mut labeled = LabeledMetric::from(value.clone()); - labeled.with_name(metric_name); + labeled.with_name(&metric_name); labeled.with_label("cluster_id", &cluster_id); + labeled.with_label("backend_id", &backend_id); labeled_metrics.push(labeled); } - } else { - // backend metrics (several backends for a given cluster) - for backend_metrics in cluster_metrics.backends { - let BackendMetrics { - backend_id, - metrics, - } = backend_metrics; - - for (metric_name, value) in metrics { - let mut labeled = LabeledMetric::from(value.clone()); - labeled.with_name(&metric_name); - labeled.with_label("cluster_id", &cluster_id); - labeled.with_label("backend_id", &backend_id); - labeled_metrics.push(labeled); + } + } + } + + labeled_metrics +} + +/// flatten all metrics to have only the cluster_id being a relevant label +fn aggregate_metrics_by_cluster(aggregated_metrics: AggregatedMetrics) -> Vec { + let mut labeled_metrics = Vec::new(); + + // cluster_id -> (metric_name -> value) + let mut acc: HashMap> = HashMap::new(); + + // PROXY WIDE METRICS + let mut proxy_wide_metrics: HashMap = HashMap::new(); + + for (metric_name, new_value) in &aggregated_metrics.main { + match proxy_wide_metrics.get_mut(metric_name) { + Some(old_value) => { + if let Some(updated_value) = aggregate_filtered_metrics(old_value, &new_value) { + proxy_wide_metrics.insert(metric_name.to_string(), updated_value); + } + } + None => { + proxy_wide_metrics.insert(metric_name.to_string(), new_value.to_owned()); + } + } + } + acc.insert("metrics_with_no_cluster".to_string(), proxy_wide_metrics); + + for (_worker_id, worker_metrics) in aggregated_metrics.workers { + for (metric_name, new_value) in &worker_metrics.proxy { + match acc.get_mut("metrics_with_no_cluster") { + Some(aggregated_map) => match aggregated_map.get_mut(metric_name) { + Some(old_value) => { + if let Some(updated_value) = + aggregate_filtered_metrics(old_value, &new_value) + { + aggregated_map.insert(metric_name.to_string(), updated_value); + } else { + aggregated_map.insert(metric_name.to_owned(), new_value.to_owned()); + } + } + None => { + aggregated_map.insert(metric_name.to_string(), new_value.to_owned()); + } + }, + None => {} + } + } + + for (cluster_id, cluster_metrics) in &worker_metrics.clusters { + for (metric_name, new_value) in &cluster_metrics.cluster { + match acc.get_mut(cluster_id) { + Some(aggregated_map) => { + if let Some(old_value) = aggregated_map.get_mut(metric_name) { + if let Some(updated_value) = + aggregate_filtered_metrics(old_value, &new_value) + { + aggregated_map.insert(metric_name.to_owned(), updated_value); + } + } else { + aggregated_map.insert(metric_name.to_owned(), new_value.to_owned()); + } + } + None => { + let mut new_map = HashMap::new(); + new_map.insert(metric_name.to_owned(), new_value.to_owned()); + acc.insert(cluster_id.to_owned(), new_map); + } + } + } + + for backend_metrics in &cluster_metrics.backends { + for (metric_name, new_value) in &backend_metrics.metrics { + match acc.get_mut(cluster_id) { + Some(aggregated_map) => { + if let Some(old_value) = aggregated_map.get_mut(metric_name) { + if let Some(updated_value) = + aggregate_filtered_metrics(old_value, &new_value) + { + aggregated_map.insert(metric_name.to_owned(), updated_value); + } + } else { + aggregated_map.insert(metric_name.to_owned(), new_value.to_owned()); + } + } + None => { + let mut new_map = HashMap::new(); + new_map.insert(metric_name.to_owned(), new_value.to_owned()); + acc.insert(cluster_id.to_owned(), new_map); + } } } } } } - labeled_metrics + + for (cluster_id, metrics) in acc { + for (metric_name, value) in metrics { + let mut labeled = LabeledMetric::from(value.clone()); + labeled.with_name(&metric_name); + if &cluster_id != "metrics_with_no_cluster" { + labeled.with_label("cluster_id", &cluster_id); + } + labeled_metrics.push(labeled); + } + } + return labeled_metrics; } fn aggregate_filtered_metrics(