Skip to content

Commit

Permalink
aggregate-metrics-by-cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
Keksoj committed Oct 25, 2023
1 parent 72ed168 commit 3b26d18
Showing 1 changed file with 119 additions and 38 deletions.
157 changes: 119 additions & 38 deletions src/svc/telemetry/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ fn apply_labels(
labeled_metrics.push(labeled);
}

if aggregate_backend_metrics {
return aggregate_metrics_by_cluster(aggregated_metrics);
}

// if metrics are not aggregated,
// worker metrics
for (worker_id, worker_metrics) in aggregated_metrics.workers {
// proxy metrics (bytes in, accept queue…)
Expand All @@ -159,54 +164,130 @@ fn apply_labels(
labeled_metrics.push(labeled);
}

if aggregate_backend_metrics {
let aggregated_values = cluster_metrics.backends.iter().fold(
HashMap::new(),
|mut acc, backend_metrics| {
for (metric_name, value) in &backend_metrics.metrics {
match acc.get_mut(&metric_name) {
Some(aggregated) => {
if let Some(new_value) =
aggregate_filtered_metrics(aggregated, value)
{
acc.insert(metric_name, new_value);
}
}
None => {
acc.insert(metric_name, value.to_owned());
}
}
}
acc
},
);
// backend metrics (several backends for a given cluster)
for backend_metrics in cluster_metrics.backends {
let BackendMetrics {
backend_id,
metrics,
} = backend_metrics;

for (metric_name, value) in aggregated_values {
for (metric_name, value) in metrics {
let mut labeled = LabeledMetric::from(value.clone());
labeled.with_name(metric_name);
labeled.with_name(&metric_name);
labeled.with_label("cluster_id", &cluster_id);
labeled.with_label("backend_id", &backend_id);
labeled_metrics.push(labeled);
}
} else {
// backend metrics (several backends for a given cluster)
for backend_metrics in cluster_metrics.backends {
let BackendMetrics {
backend_id,
metrics,
} = backend_metrics;

for (metric_name, value) in metrics {
let mut labeled = LabeledMetric::from(value.clone());
labeled.with_name(&metric_name);
labeled.with_label("cluster_id", &cluster_id);
labeled.with_label("backend_id", &backend_id);
labeled_metrics.push(labeled);
}
}
}

labeled_metrics
}

/// flatten all metrics to have only the cluster_id being a relevant label
fn aggregate_metrics_by_cluster(aggregated_metrics: AggregatedMetrics) -> Vec<LabeledMetric> {
let mut labeled_metrics = Vec::new();

// cluster_id -> (metric_name -> value)
let mut acc: HashMap<String, HashMap<String, FilteredMetrics>> = HashMap::new();

// PROXY WIDE METRICS
let mut proxy_wide_metrics: HashMap<String, FilteredMetrics> = HashMap::new();

for (metric_name, new_value) in &aggregated_metrics.main {
match proxy_wide_metrics.get_mut(metric_name) {
Some(old_value) => {
if let Some(updated_value) = aggregate_filtered_metrics(old_value, &new_value) {
proxy_wide_metrics.insert(metric_name.to_string(), updated_value);
}
}
None => {
proxy_wide_metrics.insert(metric_name.to_string(), new_value.to_owned());
}
}
}
acc.insert("metrics_with_no_cluster".to_string(), proxy_wide_metrics);

for (_worker_id, worker_metrics) in aggregated_metrics.workers {
for (metric_name, new_value) in &worker_metrics.proxy {
match acc.get_mut("metrics_with_no_cluster") {
Some(aggregated_map) => match aggregated_map.get_mut(metric_name) {
Some(old_value) => {
if let Some(updated_value) =
aggregate_filtered_metrics(old_value, &new_value)
{
aggregated_map.insert(metric_name.to_string(), updated_value);
} else {
aggregated_map.insert(metric_name.to_owned(), new_value.to_owned());
}
}
None => {
aggregated_map.insert(metric_name.to_string(), new_value.to_owned());
}
},
None => {}
}
}

for (cluster_id, cluster_metrics) in &worker_metrics.clusters {
for (metric_name, new_value) in &cluster_metrics.cluster {
match acc.get_mut(cluster_id) {
Some(aggregated_map) => {
if let Some(old_value) = aggregated_map.get_mut(metric_name) {
if let Some(updated_value) =
aggregate_filtered_metrics(old_value, &new_value)
{
aggregated_map.insert(metric_name.to_owned(), updated_value);
}
} else {
aggregated_map.insert(metric_name.to_owned(), new_value.to_owned());
}
}
None => {
let mut new_map = HashMap::new();
new_map.insert(metric_name.to_owned(), new_value.to_owned());
acc.insert(cluster_id.to_owned(), new_map);
}
}
}

for backend_metrics in &cluster_metrics.backends {
for (metric_name, new_value) in &backend_metrics.metrics {
match acc.get_mut(cluster_id) {
Some(aggregated_map) => {
if let Some(old_value) = aggregated_map.get_mut(metric_name) {
if let Some(updated_value) =
aggregate_filtered_metrics(old_value, &new_value)
{
aggregated_map.insert(metric_name.to_owned(), updated_value);
}
} else {
aggregated_map.insert(metric_name.to_owned(), new_value.to_owned());
}
}
None => {
let mut new_map = HashMap::new();
new_map.insert(metric_name.to_owned(), new_value.to_owned());
acc.insert(cluster_id.to_owned(), new_map);
}
}
}
}
}
}
labeled_metrics

for (cluster_id, metrics) in acc {
for (metric_name, value) in metrics {
let mut labeled = LabeledMetric::from(value.clone());
labeled.with_name(&metric_name);
if &cluster_id != "metrics_with_no_cluster" {
labeled.with_label("cluster_id", &cluster_id);
}
labeled_metrics.push(labeled);
}
}
return labeled_metrics;
}

fn aggregate_filtered_metrics(
Expand Down

0 comments on commit 3b26d18

Please sign in to comment.