diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8b10411..4e81149 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -12,7 +12,7 @@ jobs: rust: - stable - beta - - 1.67.1 # MSRV + - 1.70.0 # MSRV include: - rust: nightly experimental: true diff --git a/example.config.toml b/example.config.toml index 5c7d102..da4d9f1 100644 --- a/example.config.toml +++ b/example.config.toml @@ -1,6 +1,9 @@ # Socket address on which to listen listening-address = "0.0.0.0:3000" +# aggregate backend metrics by cluster id +aggregate-backend-metrics = true + [sozu] # Path to Sōzu's configuration file configuration = "path/to/sozu/config.toml" diff --git a/src/main.rs b/src/main.rs index f1aecd4..90bac6a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -76,8 +76,10 @@ async fn main(args: Args) -> Result<(), Error> { // ------------------------------------------------------------------------- // Initialize logging system let _guard = match &config.sentry { - Some(sentry_ctx) => logging::initialize_with_sentry(args.verbosity as usize, sentry_ctx.to_owned()) - .map_err(Error::Logging)?, + Some(sentry_ctx) => { + logging::initialize_with_sentry(args.verbosity as usize, sentry_ctx.to_owned()) + .map_err(Error::Logging)? + } None => logging::initialize(args.verbosity as usize) .map(|_| LoggingInitGuard::default()) .map_err(Error::Logging)?, diff --git a/src/svc/config.rs b/src/svc/config.rs index 9bafe91..78bf7f3 100644 --- a/src/svc/config.rs +++ b/src/svc/config.rs @@ -42,6 +42,8 @@ pub struct Sozu { pub struct ConnectorConfiguration { #[serde(rename = "listening-address")] pub listening_address: SocketAddr, + #[serde(rename = "aggregate-backend-metrics")] + pub aggregate_backend_metrics: bool, #[serde(rename = "sozu")] pub sozu: Sozu, #[serde(rename = "sentry")] diff --git a/src/svc/http/server/handler.rs b/src/svc/http/server/handler.rs index b32daec..f8b07b4 100644 --- a/src/svc/http/server/handler.rs +++ b/src/svc/http/server/handler.rs @@ -14,7 +14,7 @@ use sozu_client::Sender; use sozu_command_lib::proto::command::{ self, request::RequestType, response_content::ContentType, QueryMetricsOptions, ResponseContent, }; -use tracing::{error, debug}; +use tracing::{debug, error}; use crate::svc::{http::server, telemetry::prometheus::convert_metrics_to_prometheus}; @@ -97,7 +97,10 @@ pub async fn telemetry(State(state): State, _req: Request) content_type: Some(ContentType::Metrics(aggregated_metrics)), }), .. - }) => convert_metrics_to_prometheus(aggregated_metrics), + }) => convert_metrics_to_prometheus( + aggregated_metrics, + state.config.aggregate_backend_metrics, + ), Ok(response) => { let headers = res.headers_mut(); let message = serde_json::json!({ diff --git a/src/svc/http/server/mod.rs b/src/svc/http/server/mod.rs index 8ff4178..bd87aff 100644 --- a/src/svc/http/server/mod.rs +++ b/src/svc/http/server/mod.rs @@ -40,12 +40,12 @@ pub enum Error { #[derive(Clone, Debug)] pub struct State { pub client: Client, + pub config: Arc, } -impl From for State { - #[tracing::instrument] - fn from(client: Client) -> Self { - Self { client } +impl State { + fn new(client: Client, config: Arc) -> Self { + Self { client, config } } } @@ -68,7 +68,7 @@ pub async fn serve( debug!("Sōzu command socket is {:?}", opts.socket); let client = Client::try_new(opts).await.map_err(Error::CreateClient)?; - let state = State::from(client); + let state = State::new(client, config.to_owned()); // ------------------------------------------------------------------------- // Create router diff --git a/src/svc/telemetry/prometheus.rs b/src/svc/telemetry/prometheus.rs index 8c9fd96..06ea40b 100644 --- a/src/svc/telemetry/prometheus.rs +++ b/src/svc/telemetry/prometheus.rs @@ -1,4 +1,4 @@ -use std::fmt::Display; +use std::{collections::HashMap, fmt::Display}; use sozu_command_lib::proto::command::{ filtered_metrics::Inner, AggregatedMetrics, BackendMetrics, FilteredMetrics, @@ -59,11 +59,7 @@ impl LabeledMetric { /// http_active_requests{worker="0"} 0 fn metric_line(&self) -> String { let printable_metric_name = self.printable_name(); - let formatted_labels: String = self - .labels - .iter() - .map(|(label_name, label_value)| format!("{}=\"{}\"", label_name, label_value)) - .collect(); + let formatted_labels: String = format_labels(&self.labels); let value = match &self.value.inner { Some(inner) => { match inner { @@ -107,9 +103,12 @@ impl From for LabeledMetric { /// Convert aggregated metrics into prometheus serialize one #[tracing::instrument(skip_all)] -pub fn convert_metrics_to_prometheus(aggregated_metrics: AggregatedMetrics) -> String { +pub fn convert_metrics_to_prometheus( + aggregated_metrics: AggregatedMetrics, + convert_backend_metrics: bool, +) -> String { debug!("Converting metrics to prometheus format"); - let labeled_metrics = apply_labels(aggregated_metrics); + let labeled_metrics = apply_labels(aggregated_metrics, convert_backend_metrics); let metric_names = get_unique_metric_names(&labeled_metrics); @@ -126,7 +125,10 @@ pub fn convert_metrics_to_prometheus(aggregated_metrics: AggregatedMetrics) -> S } /// assign worker_id and cluster_id as labels -fn apply_labels(aggregated_metrics: AggregatedMetrics) -> Vec { +fn apply_labels( + aggregated_metrics: AggregatedMetrics, + aggregate_backend_metrics: bool, +) -> Vec { let mut labeled_metrics = Vec::new(); // metrics of the main process @@ -157,26 +159,71 @@ fn apply_labels(aggregated_metrics: AggregatedMetrics) -> Vec { labeled_metrics.push(labeled); } - // backend metrics (several backends for a given cluster) - for backend_metrics in cluster_metrics.backends { - let BackendMetrics { - backend_id, - metrics, - } = backend_metrics; - - for (metric_name, value) in metrics { + if aggregate_backend_metrics { + let aggregated_values = cluster_metrics.backends.iter().fold( + HashMap::new(), + |mut acc, backend_metrics| { + for (metric_name, value) in &backend_metrics.metrics { + match acc.get_mut(&metric_name) { + Some(aggregated) => { + if let Some(new_value) = + aggregate_filtered_metrics(aggregated, value) + { + acc.insert(metric_name, new_value); + } + } + None => { + acc.insert(metric_name, value.to_owned()); + } + } + } + acc + }, + ); + + for (metric_name, value) in aggregated_values { let mut labeled = LabeledMetric::from(value.clone()); - labeled.with_name(&metric_name); + labeled.with_name(metric_name); labeled.with_label("cluster_id", &cluster_id); - labeled.with_label("backend_id", &backend_id); labeled_metrics.push(labeled); } + } else { + // backend metrics (several backends for a given cluster) + for backend_metrics in cluster_metrics.backends { + let BackendMetrics { + backend_id, + metrics, + } = backend_metrics; + + for (metric_name, value) in metrics { + let mut labeled = LabeledMetric::from(value.clone()); + labeled.with_name(&metric_name); + labeled.with_label("cluster_id", &cluster_id); + labeled.with_label("backend_id", &backend_id); + labeled_metrics.push(labeled); + } + } } } } labeled_metrics } +fn aggregate_filtered_metrics( + left: &FilteredMetrics, + right: &FilteredMetrics, +) -> Option { + match (&left.inner, &right.inner) { + (Some(Inner::Gauge(a)), Some(Inner::Gauge(b))) => Some(FilteredMetrics { + inner: Some(Inner::Gauge(a + b)), + }), + (Some(Inner::Count(a)), Some(Inner::Count(b))) => Some(FilteredMetrics { + inner: Some(Inner::Count(a + b)), + }), + _ => None, + } +} + fn get_unique_metric_names(labeled_metrics: &Vec) -> Vec { let mut names = Vec::new(); for metric in labeled_metrics { @@ -222,6 +269,14 @@ fn replace_dots_with_underscores(str: &str) -> String { str.replace('.', "_") } +fn format_labels(labels: &[(String, String)]) -> String { + labels + .iter() + .map(|(name, value)| format!("{}=\"{}\"", name, value)) + .collect::>() + .join(",") +} + #[cfg(test)] mod test { use std::collections::BTreeMap; @@ -259,15 +314,12 @@ mod test { let mut workers = BTreeMap::new(); workers.insert("WORKER-01".to_string(), worker_metrics); - let aggregated_metrics = AggregatedMetrics { main: BTreeMap::new(), workers, }; - - - let prometheus_metrics = convert_metrics_to_prometheus(aggregated_metrics); + let prometheus_metrics = convert_metrics_to_prometheus(aggregated_metrics, true); let expected = r#"# TYPE http_response_status gauge http_response_status{cluster_id="http%3A%2F%2Fmy-cluster-id.com%2Fapi%3Fparam%3Dvalue"} 3