diff --git a/src/svc/telemetry/prometheus.rs b/src/svc/telemetry/prometheus.rs index 06ea40b..48228f5 100644 --- a/src/svc/telemetry/prometheus.rs +++ b/src/svc/telemetry/prometheus.rs @@ -140,6 +140,11 @@ fn apply_labels( labeled_metrics.push(labeled); } + if aggregate_backend_metrics { + return aggregate_metrics_by_cluster(aggregated_metrics); + } + + // if metrics are not aggregated, // worker metrics for (worker_id, worker_metrics) in aggregated_metrics.workers { // proxy metrics (bytes in, accept queue…) @@ -159,60 +164,156 @@ fn apply_labels( labeled_metrics.push(labeled); } - if aggregate_backend_metrics { - let aggregated_values = cluster_metrics.backends.iter().fold( - HashMap::new(), - |mut acc, backend_metrics| { - for (metric_name, value) in &backend_metrics.metrics { - match acc.get_mut(&metric_name) { - Some(aggregated) => { - if let Some(new_value) = - aggregate_filtered_metrics(aggregated, value) - { - acc.insert(metric_name, new_value); - } - } - None => { - acc.insert(metric_name, value.to_owned()); - } - } - } - acc - }, - ); + // backend metrics (several backends for a given cluster) + for backend_metrics in cluster_metrics.backends { + let BackendMetrics { + backend_id, + metrics, + } = backend_metrics; - for (metric_name, value) in aggregated_values { + for (metric_name, value) in metrics { let mut labeled = LabeledMetric::from(value.clone()); - labeled.with_name(metric_name); + labeled.with_name(&metric_name); labeled.with_label("cluster_id", &cluster_id); + labeled.with_label("backend_id", &backend_id); labeled_metrics.push(labeled); } - } else { - // backend metrics (several backends for a given cluster) - for backend_metrics in cluster_metrics.backends { - let BackendMetrics { - backend_id, - metrics, - } = backend_metrics; - - for (metric_name, value) in metrics { - let mut labeled = LabeledMetric::from(value.clone()); - labeled.with_name(&metric_name); - labeled.with_label("cluster_id", &cluster_id); - labeled.with_label("backend_id", &backend_id); - labeled_metrics.push(labeled); + } + } + } + + labeled_metrics +} + +/// flatten all metrics to have only the cluster_id being a relevant label +fn aggregate_metrics_by_cluster(aggregated_metrics: AggregatedMetrics) -> Vec { + let mut labeled_metrics = Vec::new(); + + // cluster_id -> (metric_name -> value) + let mut acc: HashMap> = HashMap::new(); + + // PROXY WIDE METRICS + let mut proxy_wide_metrics: HashMap = HashMap::new(); + + for (metric_name, new_value) in &aggregated_metrics.main { + match proxy_wide_metrics.get_mut(metric_name) { + Some(old_value) => { + if let Some(updated_value) = aggregate_filtered_metrics(old_value, &new_value) { + proxy_wide_metrics.insert(metric_name.to_string(), updated_value); + } + } + None => { + proxy_wide_metrics.insert(metric_name.to_string(), new_value.to_owned()); + } + } + } + acc.insert("metrics_with_no_cluster".to_string(), proxy_wide_metrics); + + for (worker_id, worker_metrics) in aggregated_metrics.workers { + println!("iterating over worker {}", worker_id); + for (metric_name, new_value) in &worker_metrics.proxy { + match acc.get_mut("metrics_with_no_cluster") { + Some(aggregated_map) => match aggregated_map.get_mut(metric_name) { + Some(old_value) => { + if let Some(updated_value) = + aggregate_filtered_metrics(old_value, &new_value) + { + aggregated_map.insert(metric_name.to_string(), updated_value); + } else { + aggregated_map.insert(metric_name.to_owned(), new_value.to_owned()); + } + } + None => { + aggregated_map.insert(metric_name.to_string(), new_value.to_owned()); + } + }, + None => {} + } + } + + println!("with proxy wide metrics: {:#?}", acc); + + for (cluster_id, cluster_metrics) in &worker_metrics.clusters { + println!("iterating over cluster {}", cluster_id); + + for (metric_name, new_value) in &cluster_metrics.cluster { + println!( + "Registering metric for cluster {}, name: {}, value: {:?}", + cluster_id, metric_name, new_value + ); + match acc.get_mut(cluster_id) { + Some(aggregated_map) => { + if let Some(old_value) = aggregated_map.get_mut(metric_name) { + println!("old_value: {:?}, new_value: {:?}", old_value, new_value); + if let Some(updated_value) = + aggregate_filtered_metrics(old_value, &new_value) + { + aggregated_map.insert(metric_name.to_owned(), updated_value); + } + } else { + aggregated_map.insert(metric_name.to_owned(), new_value.to_owned()); + } + } + None => { + let mut new_map = HashMap::new(); + new_map.insert(metric_name.to_owned(), new_value.to_owned()); + acc.insert(cluster_id.to_owned(), new_map); + } + } + } + + println!("with cluster metrics: {:#?}", acc); + + for backend_metrics in &cluster_metrics.backends { + println!("iterating over backend {}", backend_metrics.backend_id); + + for (metric_name, new_value) in &backend_metrics.metrics { + println!( + "Registering metric for cluster {}, name: {}, value: {:?}", + cluster_id, metric_name, new_value + ); + match acc.get_mut(cluster_id) { + Some(aggregated_map) => { + if let Some(old_value) = aggregated_map.get_mut(metric_name) { + if let Some(updated_value) = + aggregate_filtered_metrics(old_value, &new_value) + { + aggregated_map.insert(metric_name.to_owned(), updated_value); + } + } else { + aggregated_map.insert(metric_name.to_owned(), new_value.to_owned()); + } + } + None => { + let mut new_map = HashMap::new(); + new_map.insert(metric_name.to_owned(), new_value.to_owned()); + acc.insert(cluster_id.to_owned(), new_map); + } } } } + println!("with backend metrics: {:#?}", acc); } } - labeled_metrics + + for (cluster_id, metrics) in acc { + for (metric_name, value) in metrics { + let mut labeled = LabeledMetric::from(value.clone()); + labeled.with_name(&metric_name); + if &cluster_id != "metrics_with_no_cluster" { + labeled.with_label("cluster_id", &cluster_id); + } + labeled_metrics.push(labeled); + } + } + return labeled_metrics; } fn aggregate_filtered_metrics( left: &FilteredMetrics, right: &FilteredMetrics, ) -> Option { + println!("aggregating {:?} and {:?}", left, right); match (&left.inner, &right.inner) { (Some(Inner::Gauge(a)), Some(Inner::Gauge(b))) => Some(FilteredMetrics { inner: Some(Inner::Gauge(a + b)),