Skip to content

Commit

Permalink
aggregate-backend-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Keksoj committed Oct 24, 2023
1 parent 0a4bca0 commit d259a1f
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 33 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
rust:
- stable
- beta
- 1.67.1 # MSRV
- 1.70.0 # MSRV
include:
- rust: nightly
experimental: true
Expand Down
3 changes: 3 additions & 0 deletions example.config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Socket address on which to listen
listening-address = "0.0.0.0:3000"

# aggregate backend metrics by cluster id
aggregate-backend-metrics = true

[sozu]
# Path to Sōzu's configuration file
configuration = "path/to/sozu/config.toml"
Expand Down
6 changes: 4 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ async fn main(args: Args) -> Result<(), Error> {
// -------------------------------------------------------------------------
// Initialize logging system
let _guard = match &config.sentry {
Some(sentry_ctx) => logging::initialize_with_sentry(args.verbosity as usize, sentry_ctx.to_owned())
.map_err(Error::Logging)?,
Some(sentry_ctx) => {
logging::initialize_with_sentry(args.verbosity as usize, sentry_ctx.to_owned())
.map_err(Error::Logging)?
}
None => logging::initialize(args.verbosity as usize)
.map(|_| LoggingInitGuard::default())
.map_err(Error::Logging)?,
Expand Down
2 changes: 2 additions & 0 deletions src/svc/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub struct Sozu {
pub struct ConnectorConfiguration {
#[serde(rename = "listening-address")]
pub listening_address: SocketAddr,
#[serde(rename = "aggregate-backend-metrics")]
pub aggregate_backend_metrics: bool,
#[serde(rename = "sozu")]
pub sozu: Sozu,
#[serde(rename = "sentry")]
Expand Down
7 changes: 5 additions & 2 deletions src/svc/http/server/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use sozu_client::Sender;
use sozu_command_lib::proto::command::{
self, request::RequestType, response_content::ContentType, QueryMetricsOptions, ResponseContent,
};
use tracing::{error, debug};
use tracing::{debug, error};

use crate::svc::{http::server, telemetry::prometheus::convert_metrics_to_prometheus};

Expand Down Expand Up @@ -97,7 +97,10 @@ pub async fn telemetry(State(state): State<server::State>, _req: Request<Body>)
content_type: Some(ContentType::Metrics(aggregated_metrics)),
}),
..
}) => convert_metrics_to_prometheus(aggregated_metrics),
}) => convert_metrics_to_prometheus(
aggregated_metrics,
state.config.aggregate_backend_metrics,
),
Ok(response) => {
let headers = res.headers_mut();
let message = serde_json::json!({
Expand Down
10 changes: 5 additions & 5 deletions src/svc/http/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ pub enum Error {
#[derive(Clone, Debug)]
pub struct State {
pub client: Client,
pub config: Arc<ConnectorConfiguration>,
}

impl From<Client> for State {
#[tracing::instrument]
fn from(client: Client) -> Self {
Self { client }
impl State {
fn new(client: Client, config: Arc<ConnectorConfiguration>) -> Self {
Self { client, config }
}
}

Expand All @@ -68,7 +68,7 @@ pub async fn serve(
debug!("Sōzu command socket is {:?}", opts.socket);

let client = Client::try_new(opts).await.map_err(Error::CreateClient)?;
let state = State::from(client);
let state = State::new(client, config.to_owned());

// -------------------------------------------------------------------------
// Create router
Expand Down
98 changes: 75 additions & 23 deletions src/svc/telemetry/prometheus.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::Display;
use std::{collections::HashMap, fmt::Display};

use sozu_command_lib::proto::command::{
filtered_metrics::Inner, AggregatedMetrics, BackendMetrics, FilteredMetrics,
Expand Down Expand Up @@ -59,11 +59,7 @@ impl LabeledMetric {
/// http_active_requests{worker="0"} 0
fn metric_line(&self) -> String {
let printable_metric_name = self.printable_name();
let formatted_labels: String = self
.labels
.iter()
.map(|(label_name, label_value)| format!("{}=\"{}\"", label_name, label_value))
.collect();
let formatted_labels: String = format_labels(&self.labels);
let value = match &self.value.inner {
Some(inner) => {
match inner {
Expand Down Expand Up @@ -107,9 +103,12 @@ impl From<FilteredMetrics> for LabeledMetric {

/// Convert aggregated metrics into prometheus serialize one
#[tracing::instrument(skip_all)]
pub fn convert_metrics_to_prometheus(aggregated_metrics: AggregatedMetrics) -> String {
pub fn convert_metrics_to_prometheus(
aggregated_metrics: AggregatedMetrics,
convert_backend_metrics: bool,
) -> String {
debug!("Converting metrics to prometheus format");
let labeled_metrics = apply_labels(aggregated_metrics);
let labeled_metrics = apply_labels(aggregated_metrics, convert_backend_metrics);

let metric_names = get_unique_metric_names(&labeled_metrics);

Expand All @@ -126,7 +125,10 @@ pub fn convert_metrics_to_prometheus(aggregated_metrics: AggregatedMetrics) -> S
}

/// assign worker_id and cluster_id as labels
fn apply_labels(aggregated_metrics: AggregatedMetrics) -> Vec<LabeledMetric> {
fn apply_labels(
aggregated_metrics: AggregatedMetrics,
aggregate_backend_metrics: bool,
) -> Vec<LabeledMetric> {
let mut labeled_metrics = Vec::new();

// metrics of the main process
Expand Down Expand Up @@ -157,26 +159,71 @@ fn apply_labels(aggregated_metrics: AggregatedMetrics) -> Vec<LabeledMetric> {
labeled_metrics.push(labeled);
}

// backend metrics (several backends for a given cluster)
for backend_metrics in cluster_metrics.backends {
let BackendMetrics {
backend_id,
metrics,
} = backend_metrics;

for (metric_name, value) in metrics {
if aggregate_backend_metrics {
let aggregated_values = cluster_metrics.backends.iter().fold(
HashMap::new(),
|mut acc, backend_metrics| {
for (metric_name, value) in &backend_metrics.metrics {
match acc.get_mut(&metric_name) {
Some(aggregated) => {
if let Some(new_value) =
aggregate_filtered_metrics(aggregated, value)
{
acc.insert(metric_name, new_value);
}
}
None => {
acc.insert(metric_name, value.to_owned());
}
}
}
acc
},
);

for (metric_name, value) in aggregated_values {
let mut labeled = LabeledMetric::from(value.clone());
labeled.with_name(&metric_name);
labeled.with_name(metric_name);
labeled.with_label("cluster_id", &cluster_id);
labeled.with_label("backend_id", &backend_id);
labeled_metrics.push(labeled);
}
} else {
// backend metrics (several backends for a given cluster)
for backend_metrics in cluster_metrics.backends {
let BackendMetrics {
backend_id,
metrics,
} = backend_metrics;

for (metric_name, value) in metrics {
let mut labeled = LabeledMetric::from(value.clone());
labeled.with_name(&metric_name);
labeled.with_label("cluster_id", &cluster_id);
labeled.with_label("backend_id", &backend_id);
labeled_metrics.push(labeled);
}
}
}
}
}
labeled_metrics
}

fn aggregate_filtered_metrics(
left: &FilteredMetrics,
right: &FilteredMetrics,
) -> Option<FilteredMetrics> {
match (&left.inner, &right.inner) {
(Some(Inner::Gauge(a)), Some(Inner::Gauge(b))) => Some(FilteredMetrics {
inner: Some(Inner::Gauge(a + b)),
}),
(Some(Inner::Count(a)), Some(Inner::Count(b))) => Some(FilteredMetrics {
inner: Some(Inner::Count(a + b)),
}),
_ => None,
}
}

fn get_unique_metric_names(labeled_metrics: &Vec<LabeledMetric>) -> Vec<String> {
let mut names = Vec::new();
for metric in labeled_metrics {
Expand Down Expand Up @@ -222,6 +269,14 @@ fn replace_dots_with_underscores(str: &str) -> String {
str.replace('.', "_")
}

fn format_labels(labels: &[(String, String)]) -> String {
labels
.iter()
.map(|(name, value)| format!("{}=\"{}\"", name, value))
.collect::<Vec<_>>()
.join(",")
}

#[cfg(test)]
mod test {
use std::collections::BTreeMap;
Expand Down Expand Up @@ -259,15 +314,12 @@ mod test {
let mut workers = BTreeMap::new();
workers.insert("WORKER-01".to_string(), worker_metrics);


let aggregated_metrics = AggregatedMetrics {
main: BTreeMap::new(),
workers,
};



let prometheus_metrics = convert_metrics_to_prometheus(aggregated_metrics);
let prometheus_metrics = convert_metrics_to_prometheus(aggregated_metrics, true);

let expected = r#"# TYPE http_response_status gauge
http_response_status{cluster_id="http%3A%2F%2Fmy-cluster-id.com%2Fapi%3Fparam%3Dvalue"} 3
Expand Down

0 comments on commit d259a1f

Please sign in to comment.