diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 18a0ab173aa7..2b9ad8127391 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -673,20 +673,6 @@ async fn init_tasks( .await?; } - if let Some(prometheus) = config.observability.prometheus() { - tracing::info!("Starting Prometheus exporter with configuration: {prometheus:?}"); - - let (prometheus_health_check, prometheus_health_updater) = - ReactiveHealthCheck::new("prometheus_exporter"); - app_health.insert_component(prometheus_health_check)?; - task_handles.push(tokio::spawn(async move { - prometheus_health_updater.update(HealthStatus::Ready.into()); - let result = prometheus.run(stop_receiver).await; - drop(prometheus_health_updater); - result - })); - } - Ok(()) } @@ -882,6 +868,24 @@ async fn run_node( ([0, 0, 0, 0], config.required.healthcheck_port).into(), app_health.clone(), ); + // Start exporting metrics at the very start so that e.g., snapshot recovery metrics are timely reported. + let prometheus_task = if let Some(prometheus) = config.observability.prometheus() { + tracing::info!("Starting Prometheus exporter with configuration: {prometheus:?}"); + + let (prometheus_health_check, prometheus_health_updater) = + ReactiveHealthCheck::new("prometheus_exporter"); + app_health.insert_component(prometheus_health_check)?; + let stop_receiver_for_exporter = stop_receiver.clone(); + Some(tokio::spawn(async move { + prometheus_health_updater.update(HealthStatus::Ready.into()); + let result = prometheus.run(stop_receiver_for_exporter).await; + drop(prometheus_health_updater); + result + })) + } else { + None + }; + // Start scraping Postgres metrics before store initialization as well. let pool_for_metrics = singleton_pool_builder.build().await?; let mut stop_receiver_for_metrics = stop_receiver.clone(); @@ -919,6 +923,7 @@ async fn run_node( Ok(()) }); let mut task_handles = vec![metrics_task, validate_chain_ids_task, version_sync_task]; + task_handles.extend(prometheus_task); // Make sure that the node storage is initialized either via genesis or snapshot recovery. ensure_storage_initialized( diff --git a/core/lib/merkle_tree/src/metrics.rs b/core/lib/merkle_tree/src/metrics.rs index 8c8fdc4aeaa4..2190b9acaa07 100644 --- a/core/lib/merkle_tree/src/metrics.rs +++ b/core/lib/merkle_tree/src/metrics.rs @@ -67,37 +67,37 @@ const LEAF_LEVEL_BUCKETS: Buckets = Buckets::linear(20.0..=40.0, 4.0); #[metrics(prefix = "merkle_tree_extend_patch")] struct TreeUpdateMetrics { // Metrics related to the AR16MT tree architecture - /// Number of new leaves inserted during tree traversal while processing a single block. + /// Number of new leaves inserted during tree traversal while processing a single batch. #[metrics(buckets = NODE_COUNT_BUCKETS)] new_leaves: Histogram, - /// Number of new internal nodes inserted during tree traversal while processing a single block. + /// Number of new internal nodes inserted during tree traversal while processing a single batch. #[metrics(buckets = NODE_COUNT_BUCKETS)] new_internal_nodes: Histogram, - /// Number of existing leaves moved to a new location while processing a single block. + /// Number of existing leaves moved to a new location while processing a single batch. #[metrics(buckets = NODE_COUNT_BUCKETS)] moved_leaves: Histogram, - /// Number of existing leaves updated while processing a single block. + /// Number of existing leaves updated while processing a single batch. #[metrics(buckets = NODE_COUNT_BUCKETS)] updated_leaves: Histogram, - /// Average level of leaves moved or created while processing a single block. + /// Average level of leaves moved or created while processing a single batch. #[metrics(buckets = LEAF_LEVEL_BUCKETS)] avg_leaf_level: Histogram, - /// Maximum level of leaves moved or created while processing a single block. + /// Maximum level of leaves moved or created while processing a single batch. #[metrics(buckets = LEAF_LEVEL_BUCKETS)] max_leaf_level: Histogram, // Metrics related to input instructions - /// Number of keys read while processing a single block (only applicable to the full operation mode). + /// Number of keys read while processing a single batch (only applicable to the full operation mode). #[metrics(buckets = NODE_COUNT_BUCKETS)] key_reads: Histogram, - /// Number of missing keys read while processing a single block (only applicable to the full + /// Number of missing keys read while processing a single batch (only applicable to the full /// operation mode). #[metrics(buckets = NODE_COUNT_BUCKETS)] missing_key_reads: Histogram, - /// Number of nodes of previous versions read from the DB while processing a single block. + /// Number of nodes of previous versions read from the DB while processing a single batch. #[metrics(buckets = NODE_COUNT_BUCKETS)] db_reads: Histogram, - /// Number of nodes of the current version re-read from the patch set while processing a single block. + /// Number of nodes of the current version re-read from the patch set while processing a single batch. #[metrics(buckets = NODE_COUNT_BUCKETS)] patch_reads: Histogram, } @@ -194,13 +194,13 @@ impl ops::AddAssign for TreeUpdaterStats { #[derive(Debug, Metrics)] #[metrics(prefix = "merkle_tree")] pub(crate) struct BlockTimings { - /// Time spent loading tree nodes from DB per block. + /// Time spent loading tree nodes from DB per batch. #[metrics(buckets = Buckets::LATENCIES)] pub load_nodes: Histogram, - /// Time spent traversing the tree and creating new nodes per block. + /// Time spent traversing the tree and creating new nodes per batch. #[metrics(buckets = Buckets::LATENCIES)] pub extend_patch: Histogram, - /// Time spent finalizing the block (mainly hash computations). + /// Time spent finalizing a batch (mainly hash computations). #[metrics(buckets = Buckets::LATENCIES)] pub finalize_patch: Histogram, } @@ -233,13 +233,13 @@ impl fmt::Display for NibbleCount { #[derive(Debug, Metrics)] #[metrics(prefix = "merkle_tree_apply_patch")] struct ApplyPatchMetrics { - /// Total number of nodes included into a RocksDB patch per block. + /// Total number of nodes included into a RocksDB patch per batch. #[metrics(buckets = NODE_COUNT_BUCKETS)] nodes: Histogram, - /// Number of nodes included into a RocksDB patch per block, grouped by the key nibble count. + /// Number of nodes included into a RocksDB patch per batch, grouped by the key nibble count. #[metrics(buckets = NODE_COUNT_BUCKETS)] nodes_by_nibble_count: Family>, - /// Total byte size of nodes included into a RocksDB patch per block, grouped by the key nibble count. + /// Total byte size of nodes included into a RocksDB patch per batch, grouped by the key nibble count. #[metrics(buckets = BYTE_SIZE_BUCKETS)] node_bytes: Family>, /// Number of hashes in child references copied from previous tree versions. Allows to estimate @@ -295,7 +295,7 @@ impl ApplyPatchStats { for (nibble_count, stats) in node_bytes { let label = NibbleCount::new(nibble_count); metrics.nodes_by_nibble_count[&label].observe(stats.count); - metrics.nodes_by_nibble_count[&label].observe(stats.bytes); + metrics.node_bytes[&label].observe(stats.bytes); } metrics.copied_hashes.observe(self.copied_hashes); @@ -359,3 +359,39 @@ pub(crate) struct PruningTimings { #[vise::register] pub(crate) static PRUNING_TIMINGS: Global = Global::new(); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] +#[metrics(label = "stage", rename_all = "snake_case")] +pub(crate) enum RecoveryStage { + Extend, + ApplyPatch, +} + +const CHUNK_SIZE_BUCKETS: Buckets = Buckets::values(&[ + 1_000.0, + 2_000.0, + 5_000.0, + 10_000.0, + 20_000.0, + 50_000.0, + 100_000.0, + 200_000.0, + 500_000.0, + 1_000_000.0, + 2_000_000.0, + 5_000_000.0, +]); + +#[derive(Debug, Metrics)] +#[metrics(prefix = "merkle_tree_recovery")] +pub(crate) struct RecoveryMetrics { + /// Number of entries in a recovered chunk. + #[metrics(buckets = CHUNK_SIZE_BUCKETS)] + pub chunk_size: Histogram, + /// Latency of a specific stage of recovery for a single chunk. + #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] + pub stage_latency: Family>, +} + +#[vise::register] +pub(crate) static RECOVERY_METRICS: Global = Global::new(); diff --git a/core/lib/merkle_tree/src/recovery.rs b/core/lib/merkle_tree/src/recovery.rs index aecda593a254..8c4c7066be76 100644 --- a/core/lib/merkle_tree/src/recovery.rs +++ b/core/lib/merkle_tree/src/recovery.rs @@ -41,6 +41,7 @@ use zksync_crypto::hasher::blake2::Blake2Hasher; use crate::{ hasher::{HashTree, HasherWithStats}, + metrics::{RecoveryStage, RECOVERY_METRICS}, storage::{PatchSet, PruneDatabase, PrunePatchSet, Storage}, types::{Key, Manifest, Root, TreeEntry, TreeTags, ValueHash}, }; @@ -149,15 +150,18 @@ impl MerkleTreeRecovery { )] pub fn extend_linear(&mut self, entries: Vec) { tracing::debug!("Started extending tree"); + RECOVERY_METRICS.chunk_size.observe(entries.len()); - let started_at = Instant::now(); + let stage_latency = RECOVERY_METRICS.stage_latency[&RecoveryStage::Extend].start(); let storage = Storage::new(&self.db, &self.hasher, self.recovered_version, false); let patch = storage.extend_during_linear_recovery(entries); - tracing::debug!("Finished processing keys; took {:?}", started_at.elapsed()); + let stage_latency = stage_latency.observe(); + tracing::debug!("Finished processing keys; took {stage_latency:?}"); - let started_at = Instant::now(); + let stage_latency = RECOVERY_METRICS.stage_latency[&RecoveryStage::ApplyPatch].start(); self.db.apply_patch(patch); - tracing::debug!("Finished persisting to DB; took {:?}", started_at.elapsed()); + let stage_latency = stage_latency.observe(); + tracing::debug!("Finished persisting to DB; took {stage_latency:?}"); } /// Extends a tree with a chunk of entries. Unlike [`Self::extend_linear()`], entries may be @@ -172,15 +176,18 @@ impl MerkleTreeRecovery { )] pub fn extend_random(&mut self, entries: Vec) { tracing::debug!("Started extending tree"); + RECOVERY_METRICS.chunk_size.observe(entries.len()); - let started_at = Instant::now(); + let stage_latency = RECOVERY_METRICS.stage_latency[&RecoveryStage::Extend].start(); let storage = Storage::new(&self.db, &self.hasher, self.recovered_version, false); let patch = storage.extend_during_random_recovery(entries); - tracing::debug!("Finished processing keys; took {:?}", started_at.elapsed()); + let stage_latency = stage_latency.observe(); + tracing::debug!("Finished processing keys; took {stage_latency:?}"); - let started_at = Instant::now(); + let stage_latency = RECOVERY_METRICS.stage_latency[&RecoveryStage::ApplyPatch].start(); self.db.apply_patch(patch); - tracing::debug!("Finished persisting to DB; took {:?}", started_at.elapsed()); + let stage_latency = stage_latency.observe(); + tracing::debug!("Finished persisting to DB; took {stage_latency:?}"); } /// Finalizes the recovery process marking it as complete in the tree manifest.