Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aggregate backend metrics #11

Merged
merged 2 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
rust:
- stable
- beta
- 1.67.1 # MSRV
- 1.70.0 # MSRV
include:
- rust: nightly
experimental: true
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ description = "This application retrieve internals metrics of Sōzu and format t
homepage = "https://github.com/CleverCloud/sozu-prometheus-connector"
documentation = "https://github.com/CleverCloud/sozu-prometheus-connector/blob/main/README.md"
version = "0.1.3"
rust-version = "1.67.0"
rust-version = "1.70.0"
edition = "2021"
license-file = "LICENSE"
authors = ["Emmanuel Bosquet <[email protected]>", "Florentin Dubois <[email protected]>"]
Expand Down
3 changes: 3 additions & 0 deletions example.config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Socket address on which to listen
listening-address = "0.0.0.0:3000"

# aggregate backend metrics by cluster id
aggregate-backend-metrics = true

[sozu]
# Path to Sōzu's configuration file
configuration = "path/to/sozu/config.toml"
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.67.1
1.70.0
6 changes: 4 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ async fn main(args: Args) -> Result<(), Error> {
// -------------------------------------------------------------------------
// Initialize logging system
let _guard = match &config.sentry {
Some(sentry_ctx) => logging::initialize_with_sentry(args.verbosity as usize, sentry_ctx.to_owned())
.map_err(Error::Logging)?,
Some(sentry_ctx) => {
logging::initialize_with_sentry(args.verbosity as usize, sentry_ctx.to_owned())
.map_err(Error::Logging)?
}
None => logging::initialize(args.verbosity as usize)
.map(|_| LoggingInitGuard::default())
.map_err(Error::Logging)?,
Expand Down
2 changes: 2 additions & 0 deletions src/svc/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub struct Sozu {
pub struct ConnectorConfiguration {
#[serde(rename = "listening-address")]
pub listening_address: SocketAddr,
#[serde(rename = "aggregate-backend-metrics")]
pub aggregate_backend_metrics: bool,
#[serde(rename = "sozu")]
pub sozu: Sozu,
#[serde(rename = "sentry")]
Expand Down
7 changes: 5 additions & 2 deletions src/svc/http/server/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use sozu_client::Sender;
use sozu_command_lib::proto::command::{
self, request::RequestType, response_content::ContentType, QueryMetricsOptions, ResponseContent,
};
use tracing::{error, debug};
use tracing::{debug, error};

use crate::svc::{http::server, telemetry::prometheus::convert_metrics_to_prometheus};

Expand Down Expand Up @@ -97,7 +97,10 @@ pub async fn telemetry(State(state): State<server::State>, _req: Request<Body>)
content_type: Some(ContentType::Metrics(aggregated_metrics)),
}),
..
}) => convert_metrics_to_prometheus(aggregated_metrics),
}) => convert_metrics_to_prometheus(
aggregated_metrics,
state.config.aggregate_backend_metrics,
),
Ok(response) => {
let headers = res.headers_mut();
let message = serde_json::json!({
Expand Down
10 changes: 5 additions & 5 deletions src/svc/http/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ pub enum Error {
#[derive(Clone, Debug)]
pub struct State {
pub client: Client,
pub config: Arc<ConnectorConfiguration>,
}

impl From<Client> for State {
#[tracing::instrument]
fn from(client: Client) -> Self {
Self { client }
impl State {
fn new(client: Client, config: Arc<ConnectorConfiguration>) -> Self {
Self { client, config }
}
}

Expand All @@ -68,7 +68,7 @@ pub async fn serve(
debug!("Sōzu command socket is {:?}", opts.socket);

let client = Client::try_new(opts).await.map_err(Error::CreateClient)?;
let state = State::from(client);
let state = State::new(client, config.to_owned());

// -------------------------------------------------------------------------
// Create router
Expand Down
98 changes: 75 additions & 23 deletions src/svc/telemetry/prometheus.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::Display;
use std::{collections::HashMap, fmt::Display};

use sozu_command_lib::proto::command::{
filtered_metrics::Inner, AggregatedMetrics, BackendMetrics, FilteredMetrics,
Expand Down Expand Up @@ -59,11 +59,7 @@ impl LabeledMetric {
/// http_active_requests{worker="0"} 0
fn metric_line(&self) -> String {
let printable_metric_name = self.printable_name();
let formatted_labels: String = self
.labels
.iter()
.map(|(label_name, label_value)| format!("{}=\"{}\"", label_name, label_value))
.collect();
let formatted_labels: String = format_labels(&self.labels);
let value = match &self.value.inner {
Some(inner) => {
match inner {
Expand Down Expand Up @@ -107,9 +103,12 @@ impl From<FilteredMetrics> for LabeledMetric {

/// Convert aggregated metrics into prometheus serialize one
#[tracing::instrument(skip_all)]
pub fn convert_metrics_to_prometheus(aggregated_metrics: AggregatedMetrics) -> String {
pub fn convert_metrics_to_prometheus(
aggregated_metrics: AggregatedMetrics,
convert_backend_metrics: bool,
) -> String {
debug!("Converting metrics to prometheus format");
let labeled_metrics = apply_labels(aggregated_metrics);
let labeled_metrics = apply_labels(aggregated_metrics, convert_backend_metrics);

let metric_names = get_unique_metric_names(&labeled_metrics);

Expand All @@ -126,7 +125,10 @@ pub fn convert_metrics_to_prometheus(aggregated_metrics: AggregatedMetrics) -> S
}

/// assign worker_id and cluster_id as labels
fn apply_labels(aggregated_metrics: AggregatedMetrics) -> Vec<LabeledMetric> {
fn apply_labels(
aggregated_metrics: AggregatedMetrics,
aggregate_backend_metrics: bool,
) -> Vec<LabeledMetric> {
let mut labeled_metrics = Vec::new();

// metrics of the main process
Expand Down Expand Up @@ -157,26 +159,71 @@ fn apply_labels(aggregated_metrics: AggregatedMetrics) -> Vec<LabeledMetric> {
labeled_metrics.push(labeled);
}

// backend metrics (several backends for a given cluster)
for backend_metrics in cluster_metrics.backends {
let BackendMetrics {
backend_id,
metrics,
} = backend_metrics;

for (metric_name, value) in metrics {
if aggregate_backend_metrics {
let aggregated_values = cluster_metrics.backends.iter().fold(
HashMap::new(),
|mut acc, backend_metrics| {
for (metric_name, value) in &backend_metrics.metrics {
match acc.get_mut(&metric_name) {
Some(aggregated) => {
if let Some(new_value) =
aggregate_filtered_metrics(aggregated, value)
{
acc.insert(metric_name, new_value);
}
}
None => {
acc.insert(metric_name, value.to_owned());
}
}
}
acc
},
);

for (metric_name, value) in aggregated_values {
let mut labeled = LabeledMetric::from(value.clone());
labeled.with_name(&metric_name);
labeled.with_name(metric_name);
labeled.with_label("cluster_id", &cluster_id);
labeled.with_label("backend_id", &backend_id);
labeled_metrics.push(labeled);
}
} else {
// backend metrics (several backends for a given cluster)
for backend_metrics in cluster_metrics.backends {
let BackendMetrics {
backend_id,
metrics,
} = backend_metrics;

for (metric_name, value) in metrics {
let mut labeled = LabeledMetric::from(value.clone());
labeled.with_name(&metric_name);
labeled.with_label("cluster_id", &cluster_id);
labeled.with_label("backend_id", &backend_id);
labeled_metrics.push(labeled);
}
}
}
}
}
labeled_metrics
}

fn aggregate_filtered_metrics(
left: &FilteredMetrics,
right: &FilteredMetrics,
) -> Option<FilteredMetrics> {
match (&left.inner, &right.inner) {
(Some(Inner::Gauge(a)), Some(Inner::Gauge(b))) => Some(FilteredMetrics {
inner: Some(Inner::Gauge(a + b)),
}),
(Some(Inner::Count(a)), Some(Inner::Count(b))) => Some(FilteredMetrics {
inner: Some(Inner::Count(a + b)),
}),
_ => None,
}
}

fn get_unique_metric_names(labeled_metrics: &Vec<LabeledMetric>) -> Vec<String> {
let mut names = Vec::new();
for metric in labeled_metrics {
Expand Down Expand Up @@ -222,6 +269,14 @@ fn replace_dots_with_underscores(str: &str) -> String {
str.replace('.', "_")
}

fn format_labels(labels: &[(String, String)]) -> String {
labels
.iter()
.map(|(name, value)| format!("{}=\"{}\"", name, value))
.collect::<Vec<_>>()
.join(",")
}

#[cfg(test)]
mod test {
use std::collections::BTreeMap;
Expand Down Expand Up @@ -259,15 +314,12 @@ mod test {
let mut workers = BTreeMap::new();
workers.insert("WORKER-01".to_string(), worker_metrics);


let aggregated_metrics = AggregatedMetrics {
main: BTreeMap::new(),
workers,
};



let prometheus_metrics = convert_metrics_to_prometheus(aggregated_metrics);
let prometheus_metrics = convert_metrics_to_prometheus(aggregated_metrics, true);

let expected = r#"# TYPE http_response_status gauge
http_response_status{cluster_id="http%3A%2F%2Fmy-cluster-id.com%2Fapi%3Fparam%3Dvalue"} 3
Expand Down
Loading