Skip to content
This repository has been archived by the owner on Oct 22, 2024. It is now read-only.

Commit

Permalink
🧐 Add metrics for storage service
Browse files Browse the repository at this point in the history
  • Loading branch information
TilBlechschmidt committed Mar 22, 2021
1 parent 6959e21 commit f1e969d
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 17 deletions.
7 changes: 7 additions & 0 deletions core/src/libraries/helpers/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,4 +182,11 @@ pub mod metrics {
}
}
}

pub mod storage {
static_keys! {
CAPACITY = "metrics:storage:disk.bytes.total".to_string();
USAGE = "metrics:storage:disk.bytes.used".to_string();
}
}
}
4 changes: 2 additions & 2 deletions core/src/libraries/metrics/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ pub enum MetricsEntry {
OutgoingTraffic(u64),
RequestProcessed(Method, StatusCode),
SessionStarted(f64),
// TODO This needs to be used somewhere.
SessionStatusChange(SessionStatus),
StorageCapacityUpdated(String, f64),
StorageUsageUpdated(String, f64),
}
44 changes: 30 additions & 14 deletions core/src/libraries/metrics/processor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::entry::MetricsEntry;
use super::SESSION_STARTUP_HISTOGRAM_BUCKETS;
use crate::libraries::helpers::keys;
use crate::libraries::scheduling::{Job, TaskManager};
use crate::{
libraries::resources::{ResourceManager, ResourceManagerProvider},
Expand Down Expand Up @@ -59,25 +60,41 @@ impl<C, R> MetricsProcessor<C, R> {
) -> RedisResult<()> {
match entry {
MetricsEntry::IncomingTraffic(bytes) => {
con.hincr::<_, _, _, ()>("metrics:http:net.bytes.total", "in", bytes)
con.hincr::<_, _, _, ()>(&*keys::metrics::http::NET_BYTES_TOTAL, "in", bytes)
.await
}
MetricsEntry::OutgoingTraffic(bytes) => {
con.hincr::<_, _, _, ()>("metrics:http:net.bytes.total", "out", bytes)
con.hincr::<_, _, _, ()>(&*keys::metrics::http::NET_BYTES_TOTAL, "out", bytes)
.await
}
MetricsEntry::RequestProcessed(method, status) => {
let key = format!("metrics:http:requestsTotal:{}", method.as_str().to_owned());
con.hincr::<_, _, _, ()>(key, status.as_u16(), 1).await
}
MetricsEntry::SessionStatusChange(new_status) => {
con.hincr::<_, _, _, ()>("metrics:sessions:total", format!("{}", new_status), 1)
.await
con.hincr::<_, _, _, ()>(
keys::metrics::http::requests_total(method.as_str()),
status.as_u16(),
1,
)
.await
}
MetricsEntry::SessionStarted(elapsed_time) => {
self.process_session_startup_histogram_entry(con, elapsed_time)
.await
}
MetricsEntry::StorageCapacityUpdated(storage_id, capacity) => {
con.hset::<_, _, _, ()>(
&*keys::metrics::storage::CAPACITY,
storage_id,
format!("{:.0}", capacity),
)
.await
}
MetricsEntry::StorageUsageUpdated(storage_id, usage) => {
con.hset::<_, _, _, ()>(
&*keys::metrics::storage::USAGE,
storage_id,
format!("{:.0}", usage),
)
.await
}
}
}

Expand All @@ -86,20 +103,19 @@ impl<C, R> MetricsProcessor<C, R> {
con: &mut Redis,
elapsed_time: f64,
) -> RedisResult<()> {
let base_key = "metrics:sessions:startup.histogram";
let buckets_key = format!("{}:buckets", base_key);
let count_key = format!("{}:count", base_key);
let sum_key = format!("{}:sum", base_key);
let buckets_key = &*keys::metrics::session::startup_histogram::BUCKETS;
let count_key = &*keys::metrics::session::startup_histogram::COUNT;
let sum_key = &*keys::metrics::session::startup_histogram::SUM;

for bucket in SESSION_STARTUP_HISTOGRAM_BUCKETS.iter() {
let float_bucket: f64 = (*bucket).into();

if float_bucket > elapsed_time {
con.hincr::<_, _, _, ()>(&buckets_key, *bucket, 1).await?;
con.hincr::<_, _, _, ()>(buckets_key, *bucket, 1).await?;
}
}

con.hincr::<_, _, _, ()>(&buckets_key, "+Inf", 1).await?;
con.hincr::<_, _, _, ()>(buckets_key, "+Inf", 1).await?;
con.incr::<_, _, ()>(count_key, 1).await?;
con.incr::<_, _, ()>(sum_key, elapsed_time).await
}
Expand Down
5 changes: 5 additions & 0 deletions core/src/libraries/storage/storage_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ impl StorageHandler {
res
}

pub async fn used_bytes(&self) -> Result<f64, StorageError> {
let mut con = self.acquire_connection().await?;
Ok(database::used_bytes(&mut con).await?)
}

/// Runs a cleanup if the used bytes exceed the `size_threshold`
pub async fn maybe_cleanup(&self) -> Result<usize, StorageError> {
let mut con = self.acquire_connection().await?;
Expand Down
40 changes: 40 additions & 0 deletions core/src/services/metrics/data_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,43 @@ pub async fn slots_available<C: AsyncCommands + ConnectionLike>(con: &mut C) ->
values: vec![MetricValue::from_value(slot_count)],
}
}

pub async fn storage_capacity<C: AsyncCommands + ConnectionLike>(con: &mut C) -> Metric {
let capacities: Vec<(String, f64)> = con
.hgetall(&*keys::metrics::storage::CAPACITY)
.await
.unwrap_or_default();

let values = capacities
.into_iter()
.map(|(id, capacity)| MetricValue::from_value(capacity).with_label("id", &id))
.collect();

Metric {
name: "webgrid_storage_disk_bytes_total".to_string(),
description: "Total number of bytes available for a given storage".to_string(),

metric_type: MetricType::Gauge,
values,
}
}

pub async fn storage_usage<C: AsyncCommands + ConnectionLike>(con: &mut C) -> Metric {
let usages: Vec<(String, f64)> = con
.hgetall(&*keys::metrics::storage::USAGE)
.await
.unwrap_or_default();

let values = usages
.into_iter()
.map(|(id, usage)| MetricValue::from_value(usage).with_label("id", &id))
.collect();

Metric {
name: "webgrid_storage_disk_bytes_used".to_string(),
description: "Bytes used in a given storage".to_string(),

metric_type: MetricType::Gauge,
values,
}
}
2 changes: 2 additions & 0 deletions core/src/services/metrics/jobs/metric_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ impl MetricHandlerJob {
sessions_terminated(&mut con).await,
slots_available(&mut con).await,
slots_total(&mut con).await,
storage_capacity(&mut con).await,
storage_usage(&mut con).await,
]
.iter()
.map(|metric| format!("{}", metric))
Expand Down
7 changes: 6 additions & 1 deletion core/src/services/storage/context.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use crate::libraries::lifecycle::{BeatValue, HeartBeat};
use crate::libraries::resources::DefaultResourceManager;
use crate::libraries::{helpers::keys, resources::ResourceManagerProvider};
use crate::libraries::{
lifecycle::{BeatValue, HeartBeat},
metrics::MetricsProcessor,
};

#[derive(Clone)]
pub struct Context {
resource_manager: DefaultResourceManager,
pub heart_beat: HeartBeat<Self, DefaultResourceManager>,
pub storage_id: String,
pub metrics: MetricsProcessor<Self, DefaultResourceManager>,
}

impl Context {
Expand All @@ -28,6 +32,7 @@ impl Context {
resource_manager: DefaultResourceManager::new(redis_url),
heart_beat,
storage_id,
metrics: MetricsProcessor::default(),
}
}
}
Expand Down
25 changes: 25 additions & 0 deletions core/src/services/storage/jobs/cleanup.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::super::Context;
use crate::libraries::metrics::MetricsEntry;
use crate::libraries::scheduling::{Job, TaskManager};
use crate::libraries::storage::StorageHandler;
use anyhow::Result;
Expand All @@ -21,6 +22,15 @@ impl Job for CleanupJob {
const NAME: &'static str = module_path!();

async fn execute(&self, manager: TaskManager<Self::Context>) -> Result<()> {
manager
.context
.metrics
.submit(MetricsEntry::StorageCapacityUpdated(
manager.context.storage_id.to_owned(),
self.size_threshold,
))
.ok();

let storage = StorageHandler::new(
self.storage_directory.clone(),
self.size_threshold,
Expand All @@ -34,20 +44,35 @@ impl Job for CleanupJob {
let scan_interval = 60 / 5 * 24; // every 24 hours

loop {
// Re-enumerate the filesystem to "fix" any drifts that may occur
if iteration_counter % scan_interval == 0 {
info!("Synchronising filesystem");
if let Err(e) = storage.scan_fs().await {
warn!("Error while synchronising file system: {}", e);
}
}

// Run the cleanup if the threshold is exceeded
debug!("Running cleanup cycle #{}", iteration_counter);
let file_count = storage.maybe_cleanup().await?;

if file_count > 0 {
info!("Cleaned up {} files", file_count);
}

// Update the storage metrics
if let Ok(usage) = storage.used_bytes().await {
manager
.context
.metrics
.submit(MetricsEntry::StorageUsageUpdated(
manager.context.storage_id.to_owned(),
usage,
))
.ok();
}

// Wait for the next cycle
sleep(Duration::from_secs(60 * 5)).await;
iteration_counter += 1;
}
Expand Down
2 changes: 2 additions & 0 deletions core/src/services/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,14 @@ pub async fn run(shared_options: SharedOptions, options: Options) -> Result<()>

let status_job = StatusServer::new(&scheduler, shared_options.status_server);
let heart_beat_job = context.heart_beat.clone();
let metrics_job = context.metrics.clone();
let server_job = ServerJob::new(options.port, options.storage_directory.clone());
let cleanup_job = CleanupJob::new(options.storage_directory, size_limit, cleanup_target);

schedule!(scheduler, context, {
status_job,
heart_beat_job,
metrics_job,
server_job,
cleanup_job
});
Expand Down

0 comments on commit f1e969d

Please sign in to comment.