Skip to content

Commit

Permalink
fix(en): Fix recovery-related metrics (#2014)
Browse files Browse the repository at this point in the history
## What ❔

- Starts metrics exporter on EN immediately so that it covers snapshot
recovery.
- Fixes / extends Merkle tree recovery metrics (e.g., incorrectly
reported `nodes_by_nibble_count`).

## Why ❔

Improves metrics coverage.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
  • Loading branch information
slowli authored May 23, 2024
1 parent 4e96e32 commit 86355d6
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 39 deletions.
33 changes: 19 additions & 14 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,20 +673,6 @@ async fn init_tasks(
.await?;
}

if let Some(prometheus) = config.observability.prometheus() {
tracing::info!("Starting Prometheus exporter with configuration: {prometheus:?}");

let (prometheus_health_check, prometheus_health_updater) =
ReactiveHealthCheck::new("prometheus_exporter");
app_health.insert_component(prometheus_health_check)?;
task_handles.push(tokio::spawn(async move {
prometheus_health_updater.update(HealthStatus::Ready.into());
let result = prometheus.run(stop_receiver).await;
drop(prometheus_health_updater);
result
}));
}

Ok(())
}

Expand Down Expand Up @@ -882,6 +868,24 @@ async fn run_node(
([0, 0, 0, 0], config.required.healthcheck_port).into(),
app_health.clone(),
);
// Start exporting metrics at the very start so that e.g., snapshot recovery metrics are timely reported.
let prometheus_task = if let Some(prometheus) = config.observability.prometheus() {
tracing::info!("Starting Prometheus exporter with configuration: {prometheus:?}");

let (prometheus_health_check, prometheus_health_updater) =
ReactiveHealthCheck::new("prometheus_exporter");
app_health.insert_component(prometheus_health_check)?;
let stop_receiver_for_exporter = stop_receiver.clone();
Some(tokio::spawn(async move {
prometheus_health_updater.update(HealthStatus::Ready.into());
let result = prometheus.run(stop_receiver_for_exporter).await;
drop(prometheus_health_updater);
result
}))
} else {
None
};

// Start scraping Postgres metrics before store initialization as well.
let pool_for_metrics = singleton_pool_builder.build().await?;
let mut stop_receiver_for_metrics = stop_receiver.clone();
Expand Down Expand Up @@ -919,6 +923,7 @@ async fn run_node(
Ok(())
});
let mut task_handles = vec![metrics_task, validate_chain_ids_task, version_sync_task];
task_handles.extend(prometheus_task);

// Make sure that the node storage is initialized either via genesis or snapshot recovery.
ensure_storage_initialized(
Expand Down
70 changes: 53 additions & 17 deletions core/lib/merkle_tree/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,37 +67,37 @@ const LEAF_LEVEL_BUCKETS: Buckets = Buckets::linear(20.0..=40.0, 4.0);
#[metrics(prefix = "merkle_tree_extend_patch")]
struct TreeUpdateMetrics {
// Metrics related to the AR16MT tree architecture
/// Number of new leaves inserted during tree traversal while processing a single block.
/// Number of new leaves inserted during tree traversal while processing a single batch.
#[metrics(buckets = NODE_COUNT_BUCKETS)]
new_leaves: Histogram<u64>,
/// Number of new internal nodes inserted during tree traversal while processing a single block.
/// Number of new internal nodes inserted during tree traversal while processing a single batch.
#[metrics(buckets = NODE_COUNT_BUCKETS)]
new_internal_nodes: Histogram<u64>,
/// Number of existing leaves moved to a new location while processing a single block.
/// Number of existing leaves moved to a new location while processing a single batch.
#[metrics(buckets = NODE_COUNT_BUCKETS)]
moved_leaves: Histogram<u64>,
/// Number of existing leaves updated while processing a single block.
/// Number of existing leaves updated while processing a single batch.
#[metrics(buckets = NODE_COUNT_BUCKETS)]
updated_leaves: Histogram<u64>,
/// Average level of leaves moved or created while processing a single block.
/// Average level of leaves moved or created while processing a single batch.
#[metrics(buckets = LEAF_LEVEL_BUCKETS)]
avg_leaf_level: Histogram<f64>,
/// Maximum level of leaves moved or created while processing a single block.
/// Maximum level of leaves moved or created while processing a single batch.
#[metrics(buckets = LEAF_LEVEL_BUCKETS)]
max_leaf_level: Histogram<u64>,

// Metrics related to input instructions
/// Number of keys read while processing a single block (only applicable to the full operation mode).
/// Number of keys read while processing a single batch (only applicable to the full operation mode).
#[metrics(buckets = NODE_COUNT_BUCKETS)]
key_reads: Histogram<u64>,
/// Number of missing keys read while processing a single block (only applicable to the full
/// Number of missing keys read while processing a single batch (only applicable to the full
/// operation mode).
#[metrics(buckets = NODE_COUNT_BUCKETS)]
missing_key_reads: Histogram<u64>,
/// Number of nodes of previous versions read from the DB while processing a single block.
/// Number of nodes of previous versions read from the DB while processing a single batch.
#[metrics(buckets = NODE_COUNT_BUCKETS)]
db_reads: Histogram<u64>,
/// Number of nodes of the current version re-read from the patch set while processing a single block.
/// Number of nodes of the current version re-read from the patch set while processing a single batch.
#[metrics(buckets = NODE_COUNT_BUCKETS)]
patch_reads: Histogram<u64>,
}
Expand Down Expand Up @@ -194,13 +194,13 @@ impl ops::AddAssign for TreeUpdaterStats {
#[derive(Debug, Metrics)]
#[metrics(prefix = "merkle_tree")]
pub(crate) struct BlockTimings {
/// Time spent loading tree nodes from DB per block.
/// Time spent loading tree nodes from DB per batch.
#[metrics(buckets = Buckets::LATENCIES)]
pub load_nodes: Histogram<Duration>,
/// Time spent traversing the tree and creating new nodes per block.
/// Time spent traversing the tree and creating new nodes per batch.
#[metrics(buckets = Buckets::LATENCIES)]
pub extend_patch: Histogram<Duration>,
/// Time spent finalizing the block (mainly hash computations).
/// Time spent finalizing a batch (mainly hash computations).
#[metrics(buckets = Buckets::LATENCIES)]
pub finalize_patch: Histogram<Duration>,
}
Expand Down Expand Up @@ -233,13 +233,13 @@ impl fmt::Display for NibbleCount {
#[derive(Debug, Metrics)]
#[metrics(prefix = "merkle_tree_apply_patch")]
struct ApplyPatchMetrics {
/// Total number of nodes included into a RocksDB patch per block.
/// Total number of nodes included into a RocksDB patch per batch.
#[metrics(buckets = NODE_COUNT_BUCKETS)]
nodes: Histogram<u64>,
/// Number of nodes included into a RocksDB patch per block, grouped by the key nibble count.
/// Number of nodes included into a RocksDB patch per batch, grouped by the key nibble count.
#[metrics(buckets = NODE_COUNT_BUCKETS)]
nodes_by_nibble_count: Family<NibbleCount, Histogram<u64>>,
/// Total byte size of nodes included into a RocksDB patch per block, grouped by the key nibble count.
/// Total byte size of nodes included into a RocksDB patch per batch, grouped by the key nibble count.
#[metrics(buckets = BYTE_SIZE_BUCKETS)]
node_bytes: Family<NibbleCount, Histogram<u64>>,
/// Number of hashes in child references copied from previous tree versions. Allows to estimate
Expand Down Expand Up @@ -295,7 +295,7 @@ impl ApplyPatchStats {
for (nibble_count, stats) in node_bytes {
let label = NibbleCount::new(nibble_count);
metrics.nodes_by_nibble_count[&label].observe(stats.count);
metrics.nodes_by_nibble_count[&label].observe(stats.bytes);
metrics.node_bytes[&label].observe(stats.bytes);
}

metrics.copied_hashes.observe(self.copied_hashes);
Expand Down Expand Up @@ -359,3 +359,39 @@ pub(crate) struct PruningTimings {

#[vise::register]
pub(crate) static PRUNING_TIMINGS: Global<PruningTimings> = Global::new();

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)]
#[metrics(label = "stage", rename_all = "snake_case")]
pub(crate) enum RecoveryStage {
Extend,
ApplyPatch,
}

const CHUNK_SIZE_BUCKETS: Buckets = Buckets::values(&[
1_000.0,
2_000.0,
5_000.0,
10_000.0,
20_000.0,
50_000.0,
100_000.0,
200_000.0,
500_000.0,
1_000_000.0,
2_000_000.0,
5_000_000.0,
]);

#[derive(Debug, Metrics)]
#[metrics(prefix = "merkle_tree_recovery")]
pub(crate) struct RecoveryMetrics {
/// Number of entries in a recovered chunk.
#[metrics(buckets = CHUNK_SIZE_BUCKETS)]
pub chunk_size: Histogram<usize>,
/// Latency of a specific stage of recovery for a single chunk.
#[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)]
pub stage_latency: Family<RecoveryStage, Histogram<Duration>>,
}

#[vise::register]
pub(crate) static RECOVERY_METRICS: Global<RecoveryMetrics> = Global::new();
23 changes: 15 additions & 8 deletions core/lib/merkle_tree/src/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use zksync_crypto::hasher::blake2::Blake2Hasher;

use crate::{
hasher::{HashTree, HasherWithStats},
metrics::{RecoveryStage, RECOVERY_METRICS},
storage::{PatchSet, PruneDatabase, PrunePatchSet, Storage},
types::{Key, Manifest, Root, TreeEntry, TreeTags, ValueHash},
};
Expand Down Expand Up @@ -149,15 +150,18 @@ impl<DB: PruneDatabase, H: HashTree> MerkleTreeRecovery<DB, H> {
)]
pub fn extend_linear(&mut self, entries: Vec<TreeEntry>) {
tracing::debug!("Started extending tree");
RECOVERY_METRICS.chunk_size.observe(entries.len());

let started_at = Instant::now();
let stage_latency = RECOVERY_METRICS.stage_latency[&RecoveryStage::Extend].start();
let storage = Storage::new(&self.db, &self.hasher, self.recovered_version, false);
let patch = storage.extend_during_linear_recovery(entries);
tracing::debug!("Finished processing keys; took {:?}", started_at.elapsed());
let stage_latency = stage_latency.observe();
tracing::debug!("Finished processing keys; took {stage_latency:?}");

let started_at = Instant::now();
let stage_latency = RECOVERY_METRICS.stage_latency[&RecoveryStage::ApplyPatch].start();
self.db.apply_patch(patch);
tracing::debug!("Finished persisting to DB; took {:?}", started_at.elapsed());
let stage_latency = stage_latency.observe();
tracing::debug!("Finished persisting to DB; took {stage_latency:?}");
}

/// Extends a tree with a chunk of entries. Unlike [`Self::extend_linear()`], entries may be
Expand All @@ -172,15 +176,18 @@ impl<DB: PruneDatabase, H: HashTree> MerkleTreeRecovery<DB, H> {
)]
pub fn extend_random(&mut self, entries: Vec<TreeEntry>) {
tracing::debug!("Started extending tree");
RECOVERY_METRICS.chunk_size.observe(entries.len());

let started_at = Instant::now();
let stage_latency = RECOVERY_METRICS.stage_latency[&RecoveryStage::Extend].start();
let storage = Storage::new(&self.db, &self.hasher, self.recovered_version, false);
let patch = storage.extend_during_random_recovery(entries);
tracing::debug!("Finished processing keys; took {:?}", started_at.elapsed());
let stage_latency = stage_latency.observe();
tracing::debug!("Finished processing keys; took {stage_latency:?}");

let started_at = Instant::now();
let stage_latency = RECOVERY_METRICS.stage_latency[&RecoveryStage::ApplyPatch].start();
self.db.apply_patch(patch);
tracing::debug!("Finished persisting to DB; took {:?}", started_at.elapsed());
let stage_latency = stage_latency.observe();
tracing::debug!("Finished persisting to DB; took {stage_latency:?}");
}

/// Finalizes the recovery process marking it as complete in the tree manifest.
Expand Down

0 comments on commit 86355d6

Please sign in to comment.