diff --git a/rust-toolchain b/rust-toolchain index d624c9f..9006c0b 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -1.67.1 \ No newline at end of file +1.70.0 \ No newline at end of file diff --git a/src/svc/http/server/handler.rs b/src/svc/http/server/handler.rs index 1318831..b32daec 100644 --- a/src/svc/http/server/handler.rs +++ b/src/svc/http/server/handler.rs @@ -14,7 +14,7 @@ use sozu_client::Sender; use sozu_command_lib::proto::command::{ self, request::RequestType, response_content::ContentType, QueryMetricsOptions, ResponseContent, }; -use tracing::error; +use tracing::{error, debug}; use crate::svc::{http::server, telemetry::prometheus::convert_metrics_to_prometheus}; @@ -85,7 +85,7 @@ pub async fn telemetry(State(state): State, _req: Request) // ------------------------------------------------------------------------- // Query Sōzu to get its internal metrics - + debug!("Querying Sōzu metrics"); let mut sozu_metrics = match state .client .send(RequestType::QueryMetrics(QueryMetricsOptions::default())) diff --git a/src/svc/http/server/mod.rs b/src/svc/http/server/mod.rs index 2badcea..8ff4178 100644 --- a/src/svc/http/server/mod.rs +++ b/src/svc/http/server/mod.rs @@ -12,7 +12,7 @@ use axum::{ use hyper::Server; use sozu_client::{channel::ConnectionProperties, config::canonicalize_command_socket, Client}; use sozu_command_lib::config::Config; -use tracing::info; +use tracing::{debug, info}; use crate::svc::config::ConnectorConfiguration; @@ -65,6 +65,7 @@ pub async fn serve( opts.socket = canonicalize_command_socket(&config.sozu.configuration, &sozu_config) .map_err(Error::CanonicalizeSocket)?; } + debug!("Sōzu command socket is {:?}", opts.socket); let client = Client::try_new(opts).await.map_err(Error::CreateClient)?; let state = State::from(client); diff --git a/src/svc/telemetry/prometheus.rs b/src/svc/telemetry/prometheus.rs index c9583e8..d177a8c 100644 --- a/src/svc/telemetry/prometheus.rs +++ b/src/svc/telemetry/prometheus.rs @@ -1,39 +1,158 @@ +use std::fmt::Display; + use sozu_command_lib::proto::command::{ - filtered_metrics::Inner, AggregatedMetrics, BackendMetrics, FilteredMetrics, Percentiles, + filtered_metrics::Inner, AggregatedMetrics, BackendMetrics, FilteredMetrics, }; +use tracing::debug; + +#[derive(PartialEq)] +enum MetricType { + Counter, + Gauge, + // Histogram, + Unsupported, +} + +impl Display for MetricType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match *self { + MetricType::Counter => write!(f, "counter"), + MetricType::Gauge => write!(f, "gauge"), + // MetricType::Histogram => write!(f, "histogram"), + MetricType::Unsupported => write!(f, "unsupported"), // should never happen + } + } +} + +/// convertible to prometheus metric in this form: +/// metric_name{label="something",second_lable="something-else"} value +struct LabeledMetric { + metric_name: String, + labels: Vec<(String, String)>, + value: FilteredMetrics, + metric_type: MetricType, +} + +impl LabeledMetric { + fn with_name(&mut self, name: &str) { + self.metric_name = name.to_owned(); + } + + fn with_label(&mut self, label_name: &str, label_value: &str) { + self.labels + .push((label_name.to_owned(), label_value.to_owned())); + } + + /// remove dots from the name, replace with underscores + fn printable_name(&self) -> String { + self.metric_name.replace('.', "_") + } + + /// # TYPE protocol_https gauge + fn type_line(&self) -> String { + let printable_metric_name = self.printable_name(); + format!("# TYPE {} {}", printable_metric_name, self.metric_type) + } + + /// http_active_requests{worker="0"} 0 + fn metric_line(&self) -> String { + let printable_metric_name = self.printable_name(); + let formatted_labels: String = self + .labels + .iter() + .map(|(label_name, label_value)| format!("{}=\"{}\"", label_name, label_value)) + .collect(); + let value = match &self.value.inner { + Some(inner) => { + match inner { + Inner::Gauge(value) => value.to_string(), + Inner::Count(value) => value.to_string(), + Inner::Time(_) | Inner::Percentiles(_) | Inner::TimeSerie(_) => { + // should not happen at that point + return String::new(); + } + } + } + None => return String::new(), + }; + format!( + "{}{{{}}} {}", + printable_metric_name, formatted_labels, value + ) + } +} + +impl From for LabeledMetric { + fn from(value: FilteredMetrics) -> Self { + let metric_type = match &value.inner { + Some(inner) => match inner { + Inner::Gauge(_) => MetricType::Gauge, + Inner::Count(_) => MetricType::Counter, + Inner::Time(_) | Inner::Percentiles(_) | Inner::TimeSerie(_) => { + MetricType::Unsupported + } + }, + None => MetricType::Unsupported, + }; + Self { + metric_name: String::new(), + labels: Vec::new(), + value, + metric_type, + } + } +} /// Convert aggregated metrics into prometheus serialize one #[tracing::instrument(skip_all)] pub fn convert_metrics_to_prometheus(aggregated_metrics: AggregatedMetrics) -> String { - let mut formatted_for_prometheus = "".to_string(); + debug!("Converting metrics to prometheus format"); + let labeled_metrics = apply_labels(aggregated_metrics); + + let metric_names = get_unique_metric_names(&labeled_metrics); + + let mut prometheus_metrics = String::new(); + + for metric_name in metric_names { + prometheus_metrics.push_str(&produce_lines_for_one_metric_name( + &labeled_metrics, + &metric_name, + )); + } + + prometheus_metrics +} + +/// assign worker_id and cluster_id as labels +fn apply_labels(aggregated_metrics: AggregatedMetrics) -> Vec { + let mut labeled_metrics = Vec::new(); // metrics of the main process - for (metric_name, filtered_metric) in aggregated_metrics.main.iter() { - let metric_lines = create_metric_lines(metric_name, &[("worker", "main")], filtered_metric); - formatted_for_prometheus.push_str(&metric_lines); + for (metric_name, value) in aggregated_metrics.main.iter() { + let mut labeled = LabeledMetric::from(value.clone()); + labeled.with_name(metric_name); + labeled.with_label("worker", "main"); + + labeled_metrics.push(labeled); } // worker metrics for (worker_id, worker_metrics) in aggregated_metrics.workers { // proxy metrics (bytes in, accept queue…) - for (metric_name, filtered_metric) in worker_metrics.proxy { - let metric_lines = create_metric_lines( - &metric_name, - &[("worker", worker_id.as_str())], - &filtered_metric, - ); - formatted_for_prometheus.push_str(&metric_lines); + for (metric_name, value) in worker_metrics.proxy { + let mut labeled = LabeledMetric::from(value.clone()); + labeled.with_name(&metric_name); + labeled.with_label("worker", &worker_id); + labeled_metrics.push(labeled); } // cluster metrics (applications) for (cluster_id, cluster_metrics) in worker_metrics.clusters { - for (metric_name, filtered_metric) in cluster_metrics.cluster { - let metric_lines = create_metric_lines( - &metric_name, - &[("cluster_id", cluster_id.as_str())], - &filtered_metric, - ); - formatted_for_prometheus.push_str(&metric_lines); + for (metric_name, value) in cluster_metrics.cluster { + let mut labeled = LabeledMetric::from(value.clone()); + labeled.with_name(&metric_name); + labeled.with_label("cluster_id", &cluster_id); + labeled_metrics.push(labeled); } // backend metrics (several backends for a given cluster) @@ -43,62 +162,66 @@ pub fn convert_metrics_to_prometheus(aggregated_metrics: AggregatedMetrics) -> S metrics, } = backend_metrics; - for (metric_name, filtered_metric) in metrics { - let metric_lines = create_metric_lines( - &metric_name, - &[ - ("cluster_id", cluster_id.as_str()), - ("backend_id", backend_id.as_str()), - ], - &filtered_metric, - ); - formatted_for_prometheus.push_str(&metric_lines); + for (metric_name, value) in metrics { + let mut labeled = LabeledMetric::from(value.clone()); + labeled.with_name(&metric_name); + labeled.with_label("cluster_id", &cluster_id); + labeled.with_label("backend_id", &backend_id); + labeled_metrics.push(labeled); } } } } + labeled_metrics +} - formatted_for_prometheus +fn get_unique_metric_names(labeled_metrics: &Vec) -> Vec { + let mut names = Vec::new(); + for metric in labeled_metrics { + if !names.contains(&metric.metric_name) { + names.push(metric.metric_name.clone()); + } + } + names } -#[tracing::instrument(skip(filtered_metric))] -fn create_metric_lines( +fn produce_lines_for_one_metric_name( + labeled_metrics: &Vec, metric_name: &str, - labels: &[(&str, &str)], - filtered_metric: &FilteredMetrics, ) -> String { let mut lines = String::new(); - // skip percentiles entirely to avoid newlines - if matches!(filtered_metric.inner, Some(Inner::Percentiles(_))) { - return lines; - } - let metric_name = replace_dots_with_underscores(metric_name); - let type_line = create_type_line(&metric_name, filtered_metric); - let metric_lines = match &filtered_metric.inner { - Some(inner) => match inner { - Inner::Gauge(value) => create_metric_line_with_labels(&metric_name, labels, value), - Inner::Count(value) => create_metric_line_with_labels(&metric_name, labels, value), - Inner::Time(value) => create_metric_line_with_labels(&metric_name, labels, value), - Inner::TimeSerie(value) => create_metric_line_with_labels(&metric_name, labels, value), - Inner::Percentiles(_percentiles) => { - // skip conversion of percentiles - // this was useless and misleading since percentiles are not a prometheus metric format - // TODO: convert Sōzu histograms once they are produced by sozu_command_lib - // create_percentile_lines(&metric_name, labels, percentiles) - String::new() - } - }, - None => "none".to_string(), // very very unlikely + // find the first item to produce the type line only once + let first_item = match labeled_metrics + .iter() + .find(|metric| metric.metric_name == metric_name) + { + Some(item) => item, + None => return String::new(), }; - lines.push_str(&type_line); - lines.push('\n'); - lines.push_str(&metric_lines); + if first_item.metric_type == MetricType::Unsupported { + return String::new(); + } + lines.push_str(&first_item.type_line()); lines.push('\n'); + + for metric in labeled_metrics { + if metric.metric_name == metric_name { + lines.push_str(&metric.metric_line()); + lines.push('\n'); + } + } + lines } -/// this is all false +#[tracing::instrument(skip_all)] +fn replace_dots_with_underscores(str: &str) -> String { + str.replace('.', "_") +} + +/* this is all false + /// convert a Sōzu Percentiles struct into prometheus histogram lines: /// ``` /// # TYPE metric_name histogram @@ -176,46 +299,4 @@ where create_metric_line_with_labels(bucket_name, &labels, value) } -#[tracing::instrument(skip_all)] -fn replace_dots_with_underscores(str: &str) -> String { - str.replace('.', "_") -} - -#[tracing::instrument(skip_all)] -fn get_metric_type(filtered_metric: &FilteredMetrics) -> String { - match &filtered_metric.inner { - Some(inner) => match inner { - Inner::Gauge(_) => "gauge".to_string(), - Inner::Count(_) => "counter".to_string(), - Inner::Time(_) => "time".to_string(), - Inner::Percentiles(_) => "histogram".to_string(), - Inner::TimeSerie(_) => "time series".to_string(), - }, - None => "none".to_string(), // very very unlikely - } -} - -// typically: -// # TYPE service_time percentiles -#[tracing::instrument(skip_all)] -fn create_type_line(name: &str, filtered_metric: &FilteredMetrics) -> String { - // temporary fix to skip conversion of percentiles - if matches!(filtered_metric.inner, Some(Inner::Percentiles(_))) { - return String::new(); - } - format!("# TYPE {} {}", name, get_metric_type(filtered_metric)) -} - -// typically: -// http_active_requests{worker="0"} 0 -#[tracing::instrument(skip_all)] -fn create_metric_line_with_labels(name: &str, labels: &[(&str, &str)], value: T) -> String -where - T: ToString, -{ - let formatted_labels: String = labels - .iter() - .map(|(label_name, label_value)| format!("{}=\"{}\"", label_name, label_value)) - .collect(); - format!("{}{{{}}} {}", name, formatted_labels, value.to_string()) -} +*/