Skip to content

Commit

Permalink
convert prometheus metrics with one type line for each metric
Browse files Browse the repository at this point in the history
debug statements
  • Loading branch information
Keksoj committed Oct 20, 2023
1 parent 973751e commit 632ba39
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 106 deletions.
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.67.1
1.70.0
4 changes: 2 additions & 2 deletions src/svc/http/server/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use sozu_client::Sender;
use sozu_command_lib::proto::command::{
self, request::RequestType, response_content::ContentType, QueryMetricsOptions, ResponseContent,
};
use tracing::error;
use tracing::{error, debug};

use crate::svc::{http::server, telemetry::prometheus::convert_metrics_to_prometheus};

Expand Down Expand Up @@ -85,7 +85,7 @@ pub async fn telemetry(State(state): State<server::State>, _req: Request<Body>)

// -------------------------------------------------------------------------
// Query Sōzu to get its internal metrics

debug!("Querying Sōzu metrics");
let mut sozu_metrics = match state
.client
.send(RequestType::QueryMetrics(QueryMetricsOptions::default()))
Expand Down
3 changes: 2 additions & 1 deletion src/svc/http/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use axum::{
use hyper::Server;
use sozu_client::{channel::ConnectionProperties, config::canonicalize_command_socket, Client};
use sozu_command_lib::config::Config;
use tracing::info;
use tracing::{debug, info};

use crate::svc::config::ConnectorConfiguration;

Expand Down Expand Up @@ -65,6 +65,7 @@ pub async fn serve(
opts.socket = canonicalize_command_socket(&config.sozu.configuration, &sozu_config)
.map_err(Error::CanonicalizeSocket)?;
}
debug!("Sōzu command socket is {:?}", opts.socket);

let client = Client::try_new(opts).await.map_err(Error::CreateClient)?;
let state = State::from(client);
Expand Down
285 changes: 183 additions & 102 deletions src/svc/telemetry/prometheus.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,158 @@
use std::fmt::Display;

use sozu_command_lib::proto::command::{
filtered_metrics::Inner, AggregatedMetrics, BackendMetrics, FilteredMetrics, Percentiles,
filtered_metrics::Inner, AggregatedMetrics, BackendMetrics, FilteredMetrics,
};
use tracing::debug;

#[derive(PartialEq)]
enum MetricType {
Counter,
Gauge,
// Histogram,
Unsupported,
}

impl Display for MetricType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match *self {
MetricType::Counter => write!(f, "counter"),
MetricType::Gauge => write!(f, "gauge"),
// MetricType::Histogram => write!(f, "histogram"),
MetricType::Unsupported => write!(f, "unsupported"), // should never happen
}
}
}

/// convertible to prometheus metric in this form:
/// metric_name{label="something",second_lable="something-else"} value
struct LabeledMetric {
metric_name: String,
labels: Vec<(String, String)>,
value: FilteredMetrics,
metric_type: MetricType,
}

impl LabeledMetric {
fn with_name(&mut self, name: &str) {
self.metric_name = name.to_owned();
}

fn with_label(&mut self, label_name: &str, label_value: &str) {
self.labels
.push((label_name.to_owned(), label_value.to_owned()));
}

/// remove dots from the name, replace with underscores
fn printable_name(&self) -> String {
self.metric_name.replace('.', "_")
}

/// # TYPE protocol_https gauge
fn type_line(&self) -> String {
let printable_metric_name = self.printable_name();
format!("# TYPE {} {}", printable_metric_name, self.metric_type)
}

/// http_active_requests{worker="0"} 0
fn metric_line(&self) -> String {
let printable_metric_name = self.printable_name();
let formatted_labels: String = self
.labels
.iter()
.map(|(label_name, label_value)| format!("{}=\"{}\"", label_name, label_value))
.collect();
let value = match &self.value.inner {
Some(inner) => {
match inner {
Inner::Gauge(value) => value.to_string(),
Inner::Count(value) => value.to_string(),
Inner::Time(_) | Inner::Percentiles(_) | Inner::TimeSerie(_) => {
// should not happen at that point
return String::new();
}
}
}
None => return String::new(),
};
format!(
"{}{{{}}} {}",
printable_metric_name, formatted_labels, value
)
}
}

impl From<FilteredMetrics> for LabeledMetric {
fn from(value: FilteredMetrics) -> Self {
let metric_type = match &value.inner {
Some(inner) => match inner {
Inner::Gauge(_) => MetricType::Gauge,
Inner::Count(_) => MetricType::Counter,
Inner::Time(_) | Inner::Percentiles(_) | Inner::TimeSerie(_) => {
MetricType::Unsupported
}
},
None => MetricType::Unsupported,
};
Self {
metric_name: String::new(),
labels: Vec::new(),
value,
metric_type,
}
}
}

/// Convert aggregated metrics into prometheus serialize one
#[tracing::instrument(skip_all)]
pub fn convert_metrics_to_prometheus(aggregated_metrics: AggregatedMetrics) -> String {
let mut formatted_for_prometheus = "".to_string();
debug!("Converting metrics to prometheus format");
let labeled_metrics = apply_labels(aggregated_metrics);

let metric_names = get_unique_metric_names(&labeled_metrics);

let mut prometheus_metrics = String::new();

for metric_name in metric_names {
prometheus_metrics.push_str(&produce_lines_for_one_metric_name(
&labeled_metrics,
&metric_name,
));
}

prometheus_metrics
}

/// assign worker_id and cluster_id as labels
fn apply_labels(aggregated_metrics: AggregatedMetrics) -> Vec<LabeledMetric> {
let mut labeled_metrics = Vec::new();

// metrics of the main process
for (metric_name, filtered_metric) in aggregated_metrics.main.iter() {
let metric_lines = create_metric_lines(metric_name, &[("worker", "main")], filtered_metric);
formatted_for_prometheus.push_str(&metric_lines);
for (metric_name, value) in aggregated_metrics.main.iter() {
let mut labeled = LabeledMetric::from(value.clone());
labeled.with_name(metric_name);
labeled.with_label("worker", "main");

labeled_metrics.push(labeled);
}

// worker metrics
for (worker_id, worker_metrics) in aggregated_metrics.workers {
// proxy metrics (bytes in, accept queue…)
for (metric_name, filtered_metric) in worker_metrics.proxy {
let metric_lines = create_metric_lines(
&metric_name,
&[("worker", worker_id.as_str())],
&filtered_metric,
);
formatted_for_prometheus.push_str(&metric_lines);
for (metric_name, value) in worker_metrics.proxy {
let mut labeled = LabeledMetric::from(value.clone());
labeled.with_name(&metric_name);
labeled.with_label("worker", &worker_id);
labeled_metrics.push(labeled);
}

// cluster metrics (applications)
for (cluster_id, cluster_metrics) in worker_metrics.clusters {
for (metric_name, filtered_metric) in cluster_metrics.cluster {
let metric_lines = create_metric_lines(
&metric_name,
&[("cluster_id", cluster_id.as_str())],
&filtered_metric,
);
formatted_for_prometheus.push_str(&metric_lines);
for (metric_name, value) in cluster_metrics.cluster {
let mut labeled = LabeledMetric::from(value.clone());
labeled.with_name(&metric_name);
labeled.with_label("cluster_id", &cluster_id);
labeled_metrics.push(labeled);
}

// backend metrics (several backends for a given cluster)
Expand All @@ -43,62 +162,66 @@ pub fn convert_metrics_to_prometheus(aggregated_metrics: AggregatedMetrics) -> S
metrics,
} = backend_metrics;

for (metric_name, filtered_metric) in metrics {
let metric_lines = create_metric_lines(
&metric_name,
&[
("cluster_id", cluster_id.as_str()),
("backend_id", backend_id.as_str()),
],
&filtered_metric,
);
formatted_for_prometheus.push_str(&metric_lines);
for (metric_name, value) in metrics {
let mut labeled = LabeledMetric::from(value.clone());
labeled.with_name(&metric_name);
labeled.with_label("cluster_id", &cluster_id);
labeled.with_label("backend_id", &backend_id);
labeled_metrics.push(labeled);
}
}
}
}
labeled_metrics
}

formatted_for_prometheus
fn get_unique_metric_names(labeled_metrics: &Vec<LabeledMetric>) -> Vec<String> {
let mut names = Vec::new();
for metric in labeled_metrics {
if !names.contains(&metric.metric_name) {
names.push(metric.metric_name.clone());
}
}
names
}

#[tracing::instrument(skip(filtered_metric))]
fn create_metric_lines(
fn produce_lines_for_one_metric_name(
labeled_metrics: &Vec<LabeledMetric>,
metric_name: &str,
labels: &[(&str, &str)],
filtered_metric: &FilteredMetrics,
) -> String {
let mut lines = String::new();
// skip percentiles entirely to avoid newlines
if matches!(filtered_metric.inner, Some(Inner::Percentiles(_))) {
return lines;
}

let metric_name = replace_dots_with_underscores(metric_name);
let type_line = create_type_line(&metric_name, filtered_metric);
let metric_lines = match &filtered_metric.inner {
Some(inner) => match inner {
Inner::Gauge(value) => create_metric_line_with_labels(&metric_name, labels, value),
Inner::Count(value) => create_metric_line_with_labels(&metric_name, labels, value),
Inner::Time(value) => create_metric_line_with_labels(&metric_name, labels, value),
Inner::TimeSerie(value) => create_metric_line_with_labels(&metric_name, labels, value),
Inner::Percentiles(_percentiles) => {
// skip conversion of percentiles
// this was useless and misleading since percentiles are not a prometheus metric format
// TODO: convert Sōzu histograms once they are produced by sozu_command_lib
// create_percentile_lines(&metric_name, labels, percentiles)
String::new()
}
},
None => "none".to_string(), // very very unlikely
// find the first item to produce the type line only once
let first_item = match labeled_metrics
.iter()
.find(|metric| metric.metric_name == metric_name)
{
Some(item) => item,
None => return String::new(),
};
lines.push_str(&type_line);
lines.push('\n');
lines.push_str(&metric_lines);
if first_item.metric_type == MetricType::Unsupported {
return String::new();
}
lines.push_str(&first_item.type_line());
lines.push('\n');

for metric in labeled_metrics {
if metric.metric_name == metric_name {
lines.push_str(&metric.metric_line());
lines.push('\n');
}
}

lines
}

/// this is all false
#[tracing::instrument(skip_all)]
fn replace_dots_with_underscores(str: &str) -> String {
str.replace('.', "_")
}

/* this is all false
/// convert a Sōzu Percentiles struct into prometheus histogram lines:
/// ```
/// # TYPE metric_name histogram
Expand Down Expand Up @@ -176,46 +299,4 @@ where
create_metric_line_with_labels(bucket_name, &labels, value)
}
#[tracing::instrument(skip_all)]
fn replace_dots_with_underscores(str: &str) -> String {
str.replace('.', "_")
}

#[tracing::instrument(skip_all)]
fn get_metric_type(filtered_metric: &FilteredMetrics) -> String {
match &filtered_metric.inner {
Some(inner) => match inner {
Inner::Gauge(_) => "gauge".to_string(),
Inner::Count(_) => "counter".to_string(),
Inner::Time(_) => "time".to_string(),
Inner::Percentiles(_) => "histogram".to_string(),
Inner::TimeSerie(_) => "time series".to_string(),
},
None => "none".to_string(), // very very unlikely
}
}

// typically:
// # TYPE service_time percentiles
#[tracing::instrument(skip_all)]
fn create_type_line(name: &str, filtered_metric: &FilteredMetrics) -> String {
// temporary fix to skip conversion of percentiles
if matches!(filtered_metric.inner, Some(Inner::Percentiles(_))) {
return String::new();
}
format!("# TYPE {} {}", name, get_metric_type(filtered_metric))
}

// typically:
// http_active_requests{worker="0"} 0
#[tracing::instrument(skip_all)]
fn create_metric_line_with_labels<T>(name: &str, labels: &[(&str, &str)], value: T) -> String
where
T: ToString,
{
let formatted_labels: String = labels
.iter()
.map(|(label_name, label_value)| format!("{}=\"{}\"", label_name, label_value))
.collect();
format!("{}{{{}}} {}", name, formatted_labels, value.to_string())
}
*/

0 comments on commit 632ba39

Please sign in to comment.