From 168d9f0eb780952605308c829910e1820d29e34d Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 2 Jan 2025 17:21:58 +0800 Subject: [PATCH] HDiff in the hot DB - squashed --- .../src/validator/slashing_protection.rs | 4 +- .../beacon_chain/src/block_verification.rs | 85 +-- beacon_node/beacon_chain/src/builder.rs | 46 +- beacon_node/beacon_chain/src/migrate.rs | 52 +- beacon_node/beacon_chain/src/schema_change.rs | 9 + .../src/schema_change/migration_schema_v23.rs | 234 ++++++ beacon_node/beacon_chain/src/summaries_dag.rs | 3 +- beacon_node/beacon_chain/tests/store_tests.rs | 10 +- beacon_node/store/src/config.rs | 10 +- beacon_node/store/src/errors.rs | 14 + beacon_node/store/src/garbage_collection.rs | 1 + beacon_node/store/src/hdiff.rs | 65 +- beacon_node/store/src/hot_cold_store.rs | 713 +++++++++++------- beacon_node/store/src/impls/beacon_state.rs | 19 +- beacon_node/store/src/lib.rs | 32 +- beacon_node/store/src/metadata.rs | 34 +- 16 files changed, 918 insertions(+), 413 deletions(-) create mode 100644 beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs diff --git a/account_manager/src/validator/slashing_protection.rs b/account_manager/src/validator/slashing_protection.rs index bcd860a4847..57d532d0ae2 100644 --- a/account_manager/src/validator/slashing_protection.rs +++ b/account_manager/src/validator/slashing_protection.rs @@ -90,7 +90,7 @@ pub fn cli_run( let slashing_protection_database = SlashingDatabase::open_or_create(&slashing_protection_db_path).map_err(|e| { format!( - "Unable to open database at {}: {:?}", + "Unable to open slashing protection database at {}: {:?}", slashing_protection_db_path.display(), e ) @@ -198,7 +198,7 @@ pub fn cli_run( let slashing_protection_database = SlashingDatabase::open(&slashing_protection_db_path) .map_err(|e| { format!( - "Unable to open database at {}: {:?}", + "Unable to open slashing protection database at {}: {:?}", slashing_protection_db_path.display(), e ) diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index ddb7bb614a3..eb79669e9e1 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -91,7 +91,7 @@ use std::fmt::Debug; use std::fs; use std::io::Write; use std::sync::Arc; -use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp}; +use store::{Error as DBError, KeyValueStore, StoreOp}; use strum::AsRefStr; use task_executor::JoinHandle; use types::{ @@ -1455,52 +1455,49 @@ impl ExecutionPendingBlock { let distance = block.slot().as_u64().saturating_sub(state.slot().as_u64()); for _ in 0..distance { - let state_root = if parent.beacon_block.slot() == state.slot() { - // If it happens that `pre_state` has *not* already been advanced forward a single - // slot, then there is no need to compute the state root for this - // `per_slot_processing` call since that state root is already stored in the parent - // block. - parent.beacon_block.state_root() - } else { - // This is a new state we've reached, so stage it for storage in the DB. - // Computing the state root here is time-equivalent to computing it during slot - // processing, but we get early access to it. - let state_root = state.update_tree_hash_cache()?; - - // Store the state immediately, marking it as temporary, and staging the deletion - // of its temporary status as part of the larger atomic operation. - let txn_lock = chain.store.hot_db.begin_rw_transaction(); - let state_already_exists = - chain.store.load_hot_state_summary(&state_root)?.is_some(); - - let state_batch = if state_already_exists { - // If the state exists, it could be temporary or permanent, but in neither case - // should we rewrite it or store a new temporary flag for it. We *will* stage - // the temporary flag for deletion because it's OK to double-delete the flag, - // and we don't mind if another thread gets there first. - vec![] + let state_root = + if parent.beacon_block.slot() == state.slot() { + // If it happens that `pre_state` has *not* already been advanced forward a single + // slot, then there is no need to compute the state root for this + // `per_slot_processing` call since that state root is already stored in the parent + // block. + parent.beacon_block.state_root() } else { - vec![ - if state.slot() % T::EthSpec::slots_per_epoch() == 0 { - StoreOp::PutState(state_root, &state) - } else { - StoreOp::PutStateSummary( - state_root, - HotStateSummary::new(&state_root, &state)?, - ) - }, - StoreOp::PutStateTemporaryFlag(state_root), - ] - }; - chain - .store - .do_atomically_with_block_and_blobs_cache(state_batch)?; - drop(txn_lock); + // This is a new state we've reached, so stage it for storage in the DB. + // Computing the state root here is time-equivalent to computing it during slot + // processing, but we get early access to it. + let state_root = state.update_tree_hash_cache()?; + + // Store the state immediately, marking it as temporary, and staging the deletion + // of its temporary status as part of the larger atomic operation. + // TODO(hdiff): Is it necessary to do this read tx now? Also why is it necessary to + // check that the summary exists at all? Are double writes common? Can this txn + // lock deadlock with the `do_atomically` call? + let txn_lock = chain.store.hot_db.begin_rw_transaction(); + let state_already_exists = + chain.store.load_hot_state_summary(&state_root)?.is_some(); + + if state_already_exists { + // If the state exists, it could be temporary or permanent, but in neither case + // should we rewrite it or store a new temporary flag for it. We *will* stage + // the temporary flag for deletion because it's OK to double-delete the flag, + // and we don't mind if another thread gets there first. + } else { + let mut ops = vec![]; + // Recycle store codepath to create a state summary and store the state / diff + chain.store.store_hot_state(&state_root, &state, &mut ops)?; + // Additionally write a temporary flag as part of the atomic write + ops.extend(chain.store.convert_to_kv_batch(vec![ + StoreOp::PutStateTemporaryFlag(state_root), + ])?); + chain.store.hot_db.do_atomically(ops)?; + } + drop(txn_lock); - confirmed_state_roots.push(state_root); + confirmed_state_roots.push(state_root); - state_root - }; + state_root + }; if let Some(summary) = per_slot_processing(&mut state, Some(state_root), &chain.spec)? { // Expose Prometheus metrics. diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index ab3bffb0ff3..167f162745f 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -38,8 +38,8 @@ use std::time::Duration; use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp}; use task_executor::{ShutdownReason, TaskExecutor}; use types::{ - BeaconBlock, BeaconState, BlobSidecarList, ChainSpec, Checkpoint, Epoch, EthSpec, - FixedBytesExtended, Hash256, Signature, SignedBeaconBlock, Slot, + BeaconBlock, BeaconState, BlobSidecarList, ChainSpec, Epoch, EthSpec, FixedBytesExtended, + Hash256, Signature, SignedBeaconBlock, Slot, }; /// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing @@ -398,7 +398,11 @@ where let retain_historic_states = self.chain_config.reconstruct_historic_states; self.pending_io_batch.push( store - .init_anchor_info(genesis.beacon_block.message(), retain_historic_states) + .init_anchor_info( + genesis.beacon_block.message(), + Slot::new(0), + retain_historic_states, + ) .map_err(|e| format!("Failed to initialize genesis anchor: {:?}", e))?, ); self.pending_io_batch.push( @@ -518,6 +522,14 @@ where } } + debug!( + log, + "Storing split from weak subjectivity state"; + "slot" => weak_subj_slot, + "state_root" => ?weak_subj_state_root, + "block_root" => ?weak_subj_block_root, + ); + // Set the store's split point *before* storing genesis so that genesis is stored // immediately in the freezer DB. store.set_split(weak_subj_slot, weak_subj_state_root, weak_subj_block_root); @@ -539,6 +551,19 @@ where .do_atomically(block_root_batch) .map_err(|e| format!("Error writing frozen block roots: {e:?}"))?; + // Write the anchor to memory before calling `put_state` otherwise hot hdiff can't store + // states that do not align with the start_slot grid + let retain_historic_states = self.chain_config.reconstruct_historic_states; + self.pending_io_batch.push( + store + .init_anchor_info( + weak_subj_block.message(), + weak_subj_slot, + retain_historic_states, + ) + .map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?, + ); + // Write the state, block and blobs non-atomically, it doesn't matter if they're forgotten // about on a crash restart. store @@ -548,6 +573,8 @@ where weak_subj_state.clone(), ) .map_err(|e| format!("Failed to set checkpoint state as finalized state: {:?}", e))?; + // Note: post hot hdiff must update the anchor info before attempting to put_state otherwise + // the write will fail if the weak_subj_slot is not aligned with the snapshot moduli. store .put_state(&weak_subj_state_root, &weak_subj_state) .map_err(|e| format!("Failed to store weak subjectivity state: {e:?}"))?; @@ -563,13 +590,7 @@ where // Stage the database's metadata fields for atomic storage when `build` is called. // This prevents the database from restarting in an inconsistent state if the anchor // info or split point is written before the `PersistedBeaconChain`. - let retain_historic_states = self.chain_config.reconstruct_historic_states; self.pending_io_batch.push(store.store_split_in_batch()); - self.pending_io_batch.push( - store - .init_anchor_info(weak_subj_block.message(), retain_historic_states) - .map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?, - ); self.pending_io_batch.push( store .init_blob_info(weak_subj_block.slot()) @@ -581,13 +602,6 @@ where .map_err(|e| format!("Failed to initialize data column info: {:?}", e))?, ); - // Store pruning checkpoint to prevent attempting to prune before the anchor state. - self.pending_io_batch - .push(store.pruning_checkpoint_store_op(Checkpoint { - root: weak_subj_block_root, - epoch: weak_subj_state.slot().epoch(E::slots_per_epoch()), - })); - let snapshot = BeaconSnapshot { beacon_block_root: weak_subj_block_root, beacon_block: Arc::new(weak_subj_block), diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 969f4206f36..8de60c0226b 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -1,11 +1,11 @@ use crate::errors::BeaconChainError; use crate::summaries_dag::{ - BlockSummariesDAG, DAGBlockSummary, DAGStateSummaryV22, Error as SummariesDagError, + BlockSummariesDAG, DAGBlockSummary, DAGStateSummary, Error as SummariesDagError, StateSummariesDAG, }; use parking_lot::Mutex; use slog::{debug, error, info, warn, Logger}; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::mem; use std::sync::{mpsc, Arc}; use std::thread; @@ -462,7 +462,6 @@ impl, Cold: ItemStore> BackgroundMigrator Result { - let split_state_root = store.get_split_info().state_root; let new_finalized_slot = new_finalized_checkpoint .epoch .start_slot(E::slots_per_epoch()); @@ -494,7 +493,7 @@ impl, Cold: ItemStore> BackgroundMigrator>(); + .collect::>(); // De-duplicate block roots to reduce block reads below let summary_block_roots = HashSet::::from_iter( @@ -528,18 +527,8 @@ impl, Cold: ItemStore> BackgroundMigrator, BeaconChainError>>()?; - let parent_block_roots = blocks - .iter() - .map(|(block_root, block)| (*block_root, block.parent_root)) - .collect::>(); - ( - StateSummariesDAG::new_from_v22( - state_summaries, - parent_block_roots, - split_state_root, - ) - .map_err(PruningError::SummariesDagError)?, + StateSummariesDAG::new(state_summaries), BlockSummariesDAG::new(&blocks), ) }; @@ -585,10 +574,17 @@ impl, Cold: ItemStore> BackgroundMigrator = HashSet::new(); let mut states_to_prune: HashSet<(Slot, Hash256)> = HashSet::new(); + let mut kept_summaries_for_hdiff = vec![]; for (slot, summaries) in state_summaries_dag.summaries_by_slot_ascending() { for (state_root, summary) in summaries { @@ -597,6 +593,30 @@ impl, Cold: ItemStore> BackgroundMigrator, Cold: ItemStore> BackgroundMigrator newly_finalized_blocks_min_slot, "newly_finalized_state_roots" => newly_finalized_state_roots.len(), "newly_finalized_states_min_slot" => newly_finalized_states_min_slot, + "required_finalized_diff_state_slots" => ?required_finalized_diff_state_slots, + "kept_summaries_for_hdiff" => ?kept_summaries_for_hdiff, "state_summaries_count" => state_summaries_dag.summaries_count(), "finalized_and_descendant_block_roots" => finalized_and_descendant_block_roots.len(), "blocks_to_prune_count" => blocks_to_prune.len(), diff --git a/beacon_node/beacon_chain/src/schema_change.rs b/beacon_node/beacon_chain/src/schema_change.rs index 95049012292..a887f5ce63c 100644 --- a/beacon_node/beacon_chain/src/schema_change.rs +++ b/beacon_node/beacon_chain/src/schema_change.rs @@ -2,6 +2,7 @@ mod migration_schema_v20; mod migration_schema_v21; mod migration_schema_v22; +mod migration_schema_v23; use crate::beacon_chain::BeaconChainTypes; use slog::Logger; @@ -59,6 +60,14 @@ pub fn migrate_schema( // bumped inside the upgrade_to_v22 fn migration_schema_v22::upgrade_to_v22::(db.clone(), genesis_state_root, log) } + (SchemaVersion(22), SchemaVersion(23)) => { + let ops = migration_schema_v23::upgrade_to_v23::(db.clone(), log)?; + db.store_schema_version_atomically(to, ops) + } + (SchemaVersion(23), SchemaVersion(22)) => { + let ops = migration_schema_v23::downgrade_to_v22::(db.clone(), log)?; + db.store_schema_version_atomically(to, ops) + } // Anything else is an error. (_, _) => Err(HotColdDBError::UnsupportedSchemaVersion { target_version: to, diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs new file mode 100644 index 00000000000..32bc1b4702e --- /dev/null +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs @@ -0,0 +1,234 @@ +use crate::{ + beacon_chain::BeaconChainTypes, + summaries_dag::{DAGStateSummaryV22, StateSummariesDAG}, +}; +use slog::{debug, info, Logger}; +use ssz::Decode; +use ssz_derive::{Decode, Encode}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::{Duration, Instant}, +}; +use store::{ + get_full_state_v22, get_key_for_col, hdiff::StorageStrategy, hot_cold_store::DiffBaseStateRoot, + DBColumn, Error, HotColdDB, HotStateSummary, KeyValueStore, KeyValueStoreOp, StoreItem, +}; +use types::{EthSpec, Hash256, Slot}; + +#[derive(Debug, Clone, Copy, Encode, Decode)] +pub struct HotStateSummaryV22 { + slot: Slot, + latest_block_root: Hash256, + epoch_boundary_state_root: Hash256, +} + +pub fn upgrade_to_v23( + db: Arc>, + log: Logger, +) -> Result, Error> { + let split = db.get_split_info(); + + // Update anchor_slot to the current finalized state + // TODO(hdiff): Is the anchor loaded already at this point? Should be set to split slot or to + // the finalized state slot? + let anchor_info = db.get_anchor_info(); + let mut new_anchor_info = anchor_info.clone(); + new_anchor_info.anchor_slot = split.slot; + db.compare_and_set_anchor_info_with_write(anchor_info, new_anchor_info)?; + + let state_summaries_dag = new_dag::(&db, split.state_root)?; + + // Sort summaries by slot so we have their ancestor diffs already stored when we store them. + // If the summaries are sorted topologically we can insert them into the DB like if they were a + // new state, re-using existing code. As states are likely to be sequential the diff cache + // should kick in making the migration more efficient. If we just iterate the column of + // summaries we may get distance state of each iteration. + let summaries_by_slot = state_summaries_dag.summaries_by_slot_ascending(); + debug!( + log, + "Starting hot states migration"; + "summaries_count" => state_summaries_dag.summaries_count(), + "slots_count" => summaries_by_slot.len(), + "min_slot" => ?summaries_by_slot.first_key_value().map(|(slot, _)| slot), + "max_slot" => ?summaries_by_slot.last_key_value().map(|(slot, _)| slot), + ); + + // Upgrade all hot DB state summaries to the new type: + // - Set all summaries of boundary states as `Snapshot` type + // - Set all others are `Replay` pointing to `epoch_boundary_state_root` + + let mut migrate_ops = vec![]; + let mut diffs_written = 0; + let mut summaries_written = 0; + let mut last_log_time = Instant::now(); + + for (slot, old_hot_state_summaries) in summaries_by_slot { + for (state_root, old_summary) in old_hot_state_summaries { + // 1. Store snapshot or diff at this slot (if required). + // TODO(hdiff): make sure lowest hot hierarchy config is >= 5 to prevent having to + // reconstruct states. + let storage_strategy = db.hot_storage_strategy(slot)?; + debug!( + log, + "Migrating state summary"; + "slot" => slot, + "state_root" => ?state_root, + "storage_strategy" => ?storage_strategy, + ); + + match storage_strategy { + StorageStrategy::DiffFrom(_) | StorageStrategy::Snapshot => { + // Load the full state and re-store it as a snapshot or diff. + let state = get_full_state_v22(&db.hot_db, &state_root, &db.spec)? + .ok_or(Error::MissingState(state_root))?; + + // Store immediately so that future diffs can load and diff from it. + let mut ops = vec![]; + // We must commit the hot state summary immediatelly, otherwise we can't diff + // against it and future writes will fail. That's why we write the new hot + // summaries in a different column to have both new and old data present at + // once. Otherwise if the process crashes during the migration the database will + // be broken. + db.store_hot_state_summary(&state_root, &state, &mut ops)?; + db.store_hot_state_diffs(&state_root, &state, &mut ops)?; + db.hot_db.do_atomically(ops)?; + diffs_written += 1; + } + StorageStrategy::ReplayFrom(_) => { + // Optimization: instead of having to load the state of each summary we load x32 + // less states by manually computing the HotStateSummary roots using the + // computed state dag. + // + // No need to store diffs for states that will be reconstructed by replaying + // blocks. + // 2. Convert the summary to the new format. + let latest_block_root = old_summary.latest_block_root; + let previous_state_root = if state_root == split.state_root { + Hash256::ZERO + } else { + state_summaries_dag + .previous_state_root(state_root) + .map_err(|e| { + Error::MigrationError(format!( + "error computing previous_state_root {e:?}" + )) + })? + }; + + let diff_base_state_root = + if let Some(diff_base_slot) = storage_strategy.diff_base_slot() { + DiffBaseStateRoot::new( + diff_base_slot, + state_summaries_dag + .ancestor_state_root_at_slot(state_root, diff_base_slot) + .map_err(|e| { + Error::MigrationError(format!( + "error computing ancestor_state_root_at_slot {e:?}" + )) + })?, + ) + } else { + DiffBaseStateRoot::zero() + }; + + let new_summary = HotStateSummary { + slot, + latest_block_root, + previous_state_root, + diff_base_state_root, + }; + let op = new_summary.as_kv_store_op(state_root); + // It's not ncessary to immediately commit the summaries of states that are + // ReplayFrom. However we do so for simplicity. + db.hot_db.do_atomically(vec![op])?; + } + } + + // 3. Stage old data for deletion. + if slot % T::EthSpec::slots_per_epoch() == 0 { + let state_key = + get_key_for_col(DBColumn::BeaconState.into(), state_root.as_slice()); + migrate_ops.push(KeyValueStoreOp::DeleteKey(state_key)); + } + + // Delete previous summaries + let state_summary_key = + get_key_for_col(DBColumn::BeaconStateSummary.into(), state_root.as_slice()); + migrate_ops.push(KeyValueStoreOp::DeleteKey(state_summary_key)); + + summaries_written += 1; + if last_log_time.elapsed() > Duration::from_secs(5) { + last_log_time = Instant::now(); + // TODO(hdiff): Display the slot distance between head and finalized, and head-tracker count + info!( + log, + "Hot states migration in progress"; + "diff_written" => diffs_written, + "summaries_written" => summaries_written, + ); + } + } + } + + // TODO(hdiff): Should run hot DB compaction after deleting potentially a lot of states. Or should wait + // for the next finality event? + info!( + log, + "Hot states migration complete"; + "diff_written" => diffs_written, + "summaries_written" => summaries_written, + ); + + Ok(migrate_ops) +} + +pub fn downgrade_to_v22( + _db: Arc>, + _log: Logger, +) -> Result, Error> { + panic!("downgrade not supported"); +} + +fn new_dag( + db: &HotColdDB, + split_state_root: Hash256, +) -> Result { + // Collect all sumaries for unfinalized states + let state_summaries_v22 = db + .hot_db + // Collect summaries from the legacy V22 column BeaconStateSummary + .iter_column::(DBColumn::BeaconStateSummary) + .map(|res| { + let (key, value) = res?; + let state_root: Hash256 = key; + let summary = HotStateSummaryV22::from_ssz_bytes(&value)?; + Ok(( + state_root, + DAGStateSummaryV22 { + slot: summary.slot, + latest_block_root: summary.latest_block_root, + }, + )) + }) + .collect::, Error>>()?; + + let block_roots = HashSet::::from_iter( + state_summaries_v22 + .iter() + .map(|(_, summary)| summary.latest_block_root), + ); + + // Construct block root to parent block root mapping. + let mut parent_block_roots = HashMap::new(); + for block_root in block_roots { + let blinded_block = db + .get_blinded_block(&block_root)? + .ok_or(Error::MissingBlock(block_root))?; + let parent_root = blinded_block.parent_root(); + parent_block_roots.insert(block_root, parent_root); + } + + StateSummariesDAG::new_from_v22(state_summaries_v22, parent_block_roots, split_state_root) + .map_err(|e| Error::MigrationError(format!("error computing states summaries dag {e:?}"))) +} diff --git a/beacon_node/beacon_chain/src/summaries_dag.rs b/beacon_node/beacon_chain/src/summaries_dag.rs index 0ec1cae7bcb..11da1dce4e6 100644 --- a/beacon_node/beacon_chain/src/summaries_dag.rs +++ b/beacon_node/beacon_chain/src/summaries_dag.rs @@ -196,11 +196,12 @@ impl StateSummariesDAG { } } -impl From for DAGStateSummaryV22 { +impl From for DAGStateSummary { fn from(value: HotStateSummary) -> Self { Self { slot: value.slot, latest_block_root: value.latest_block_root, + previous_state_root: value.previous_state_root, } } } diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index e1258ccdea7..1922a6ba464 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -486,14 +486,16 @@ async fn epoch_boundary_state_attestation_processing() { .unwrap() .expect("block exists"); let mut epoch_boundary_state = store - .load_epoch_boundary_state(&block.state_root()) + .load_hot_state(&block.state_root()) .expect("no error") - .expect("epoch boundary state exists"); + .expect("epoch boundary state exists") + .0; let ebs_state_root = epoch_boundary_state.update_tree_hash_cache().unwrap(); let mut ebs_of_ebs = store - .load_epoch_boundary_state(&ebs_state_root) + .load_hot_state(&ebs_state_root) .expect("no error") - .expect("ebs of ebs exists"); + .expect("ebs of ebs exists") + .0; ebs_of_ebs.apply_pending_mutations().unwrap(); assert_eq!(epoch_boundary_state, ebs_of_ebs); diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index 4f675305706..8b0bf47ea36 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -53,7 +53,7 @@ pub struct StoreConfig { /// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params. #[superstruct( - variants(V1, V22), + variants(V1, V22, V23), variant_attributes(derive(Debug, Clone, PartialEq, Eq, Encode, Decode)) )] #[derive(Clone, Debug, PartialEq, Eq)] @@ -61,10 +61,11 @@ pub struct OnDiskStoreConfig { #[superstruct(only(V1))] pub slots_per_restore_point: u64, /// Prefix byte to future-proof versions of the `OnDiskStoreConfig` post V1 - #[superstruct(only(V22))] + #[superstruct(only(V22, V23))] version_byte: u8, - #[superstruct(only(V22))] + #[superstruct(only(V22, V23))] pub hierarchy_config: HierarchyConfig, + // TODO(hdiff): Should persist the hot hierarchy_config too? } impl OnDiskStoreConfigV22 { @@ -210,6 +211,7 @@ impl StoreItem for OnDiskStoreConfig { match self { OnDiskStoreConfig::V1(value) => value.as_ssz_bytes(), OnDiskStoreConfig::V22(value) => value.as_ssz_bytes(), + OnDiskStoreConfig::V23(value) => value.as_ssz_bytes(), } } @@ -221,6 +223,8 @@ impl StoreItem for OnDiskStoreConfig { return Ok(Self::V1(value)); } + // TODO(hdiff): handle V23 conversion + Ok(Self::V22(OnDiskStoreConfigV22::from_ssz_bytes(bytes)?)) } } diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 6bb4edee6b2..75a6ea8d49e 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -45,6 +45,8 @@ pub enum Error { expected: Hash256, computed: Hash256, }, + MissingState(Hash256), + MissingHotStateSummary(Hash256), MissingGenesisState, MissingSnapshot(Slot), BlockReplayError(BlockReplayError), @@ -69,6 +71,18 @@ pub enum Error { RandaoMixOutOfBounds, GenesisStateUnknown, ArithError(safe_arith::ArithError), + MissmatchDiffBaseStateRoot { + expected_slot: Slot, + stored_slot: Slot, + }, + MigrationError(String), + LoadAnchorInfo(Box), + LoadSplit(Box), + LoadBlobInfo(Box), + LoadDataColumnInfo(Box), + LoadConfig(Box), + LoadHotStateSummary(Hash256, Box), + LoadHotStateSummaryForSplit(Box), } pub trait HandleUnavailable { diff --git a/beacon_node/store/src/garbage_collection.rs b/beacon_node/store/src/garbage_collection.rs index 5f8ed8f5e73..0a830c3f7ea 100644 --- a/beacon_node/store/src/garbage_collection.rs +++ b/beacon_node/store/src/garbage_collection.rs @@ -20,6 +20,7 @@ where self.iter_temporary_state_roots() .try_fold(vec![], |mut ops, state_root| { let state_root = state_root?; + // This states will never be used, safe to delete the hot diffs ops.push(StoreOp::DeleteState(state_root, None)); Result::<_, Error>::Ok(ops) })?; diff --git a/beacon_node/store/src/hdiff.rs b/beacon_node/store/src/hdiff.rs index a29e680eb51..6c24d9a1f09 100644 --- a/beacon_node/store/src/hdiff.rs +++ b/beacon_node/store/src/hdiff.rs @@ -27,6 +27,7 @@ pub enum Error { Compression(std::io::Error), InvalidSszState(ssz::DecodeError), InvalidBalancesLength, + LessThanStart(Slot, Slot), } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)] @@ -67,6 +68,10 @@ impl FromStr for HierarchyConfig { return Err("hierarchy-exponents must be in ascending order".to_string()); } + if exponents.is_empty() { + return Err("empty exponents".to_string()); + } + Ok(HierarchyConfig { exponents }) } } @@ -616,7 +621,19 @@ impl HierarchyConfig { } impl HierarchyModuli { - pub fn storage_strategy(&self, slot: Slot) -> Result { + /// * `slot` - Slot of the storage strategy + /// * `start_slot` - Slot before which states are not available. Initial snapshot point, which + /// may not be aligned to the hierarchy moduli values. Given an example of + /// exponents [5,13,21], to reconstruct state at slot 3,000,003: if start = 3,000,002 + /// layer 2 diff will point to the start snapshot instead of the layer 1 diff at + /// 2998272. + pub fn storage_strategy(&self, slot: Slot, start_slot: Slot) -> Result { + match slot.cmp(&start_slot) { + Ordering::Less => return Err(Error::LessThanStart(slot, start_slot)), + Ordering::Equal => return Ok(StorageStrategy::Snapshot), + Ordering::Greater => {} // continue + } + // last = full snapshot interval let last = self.moduli.last().copied().ok_or(Error::InvalidHierarchy)?; // first = most frequent diff layer, need to replay blocks from this layer @@ -638,14 +655,22 @@ impl HierarchyModuli { .find_map(|(&n_big, &n_small)| { if slot % n_small == 0 { // Diff from the previous layer. - Some(StorageStrategy::DiffFrom(slot / n_big * n_big)) + let from = slot / n_big * n_big; + // Or from start point + let from = std::cmp::max(from, start_slot); + Some(StorageStrategy::DiffFrom(from)) } else { // Keep trying with next layer None } }) // Exhausted layers, need to replay from most frequent layer - .unwrap_or(StorageStrategy::ReplayFrom(slot / first * first))) + .unwrap_or_else(|| { + let from = slot / first * first; + // Or from start point + let from = std::cmp::max(from, start_slot); + StorageStrategy::ReplayFrom(from) + })) } /// Return the smallest slot greater than or equal to `slot` at which a full snapshot should @@ -674,6 +699,18 @@ impl HierarchyModuli { |second_layer_moduli| Ok(slot % *second_layer_moduli == 0), ) } + + /// For each layer, returns the closest diff less that or equal to `slot`. + pub fn closest_layer_points(&self, slot: Slot, start_slot: Slot) -> Vec { + self.moduli + .iter() + .map(|&n| { + let from = slot / n * n; + // Or from start point + std::cmp::max(from, start_slot) + }) + .collect() + } } impl StorageStrategy { @@ -703,6 +740,15 @@ impl StorageStrategy { } .map(Slot::from) } + + /// Returns the slot that storage_strategy points to. + pub fn diff_base_slot(&self) -> Option { + match self { + Self::ReplayFrom(from) => Some(*from), + Self::DiffFrom(from) => Some(*from), + Self::Snapshot => None, + } + } } #[cfg(test)] @@ -714,34 +760,37 @@ mod tests { fn default_storage_strategy() { let config = HierarchyConfig::default(); config.validate().unwrap(); + let sslot = Slot::new(0); let moduli = config.to_moduli().unwrap(); // Full snapshots at multiples of 2^21. let snapshot_freq = Slot::new(1 << 21); assert_eq!( - moduli.storage_strategy(Slot::new(0)).unwrap(), + moduli.storage_strategy(Slot::new(0), sslot).unwrap(), StorageStrategy::Snapshot ); assert_eq!( - moduli.storage_strategy(snapshot_freq).unwrap(), + moduli.storage_strategy(snapshot_freq, sslot).unwrap(), StorageStrategy::Snapshot ); assert_eq!( - moduli.storage_strategy(snapshot_freq * 3).unwrap(), + moduli.storage_strategy(snapshot_freq * 3, sslot).unwrap(), StorageStrategy::Snapshot ); // Diffs should be from the previous layer (the snapshot in this case), and not the previous diff in the same layer. let first_layer = Slot::new(1 << 18); assert_eq!( - moduli.storage_strategy(first_layer * 2).unwrap(), + moduli.storage_strategy(first_layer * 2, sslot).unwrap(), StorageStrategy::DiffFrom(Slot::new(0)) ); let replay_strategy_slot = first_layer + 1; assert_eq!( - moduli.storage_strategy(replay_strategy_slot).unwrap(), + moduli + .storage_strategy(replay_strategy_slot, sslot) + .unwrap(), StorageStrategy::ReplayFrom(first_layer) ); } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index e22b5001d63..a6aecd5c64f 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -2,15 +2,14 @@ use crate::config::{OnDiskStoreConfig, StoreConfig}; use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator}; use crate::hdiff::{HDiff, HDiffBuffer, HierarchyModuli, StorageStrategy}; use crate::historic_state_cache::HistoricStateCache; -use crate::impls::beacon_state::{get_full_state, store_full_state}; use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator}; use crate::leveldb_store::{BytesKey, LevelDB}; use crate::memory_store::MemoryStore; use crate::metadata::{ - AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnInfo, PruningCheckpoint, SchemaVersion, + AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnInfo, SchemaVersion, ANCHOR_FOR_ARCHIVE_NODE, ANCHOR_INFO_KEY, ANCHOR_UNINITIALIZED, BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, DATA_COLUMN_INFO_KEY, - PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN, + SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN, }; use crate::state_cache::{PutStateOutcome, StateCache}; use crate::{ @@ -32,7 +31,7 @@ use state_processing::{ SlotProcessingError, }; use std::cmp::min; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::io::{Read, Write}; use std::marker::PhantomData; use std::num::NonZeroUsize; @@ -62,6 +61,7 @@ pub struct HotColdDB, Cold: ItemStore> { data_column_info: RwLock, pub(crate) config: StoreConfig, pub(crate) hierarchy: HierarchyModuli, + pub hierarchy_hot: HierarchyModuli, /// Cold database containing compact historical data. pub cold_db: Cold, /// Database containing blobs. If None, store falls back to use `cold_db`. @@ -82,7 +82,7 @@ pub struct HotColdDB, Cold: ItemStore> { /// HTTP API. historic_state_cache: Mutex>, /// Chain spec. - pub(crate) spec: Arc, + pub spec: Arc, /// Logger. pub log: Logger, /// Mere vessel for E. @@ -160,10 +160,15 @@ pub enum HotColdDBError { MissingRestorePoint(Hash256), MissingColdStateSummary(Hash256), MissingHotStateSummary(Hash256), + MissingHotState { + state_root: Hash256, + requested_by_state_summary: (Hash256, Slot), + }, + MissingHotStateSnapshot(Hash256, Slot), MissingEpochBoundaryState(Hash256), MissingPrevState(Hash256), MissingSplitState(Hash256, Slot), - MissingStateDiff(Hash256), + MissingHotHDiff(Hash256), MissingHDiff(Slot), MissingExecutionPayload(Hash256), MissingFullBlockExecutionPayloadPruned(Hash256, Slot), @@ -205,6 +210,9 @@ impl HotColdDB, MemoryStore> { config.verify::()?; let hierarchy = config.hierarchy_config.to_moduli()?; + // TODO(hdiff): Use different exponents + let hierarchy_hot = config.hierarchy_config.to_moduli()?; + warn!(log, "Using improper start slot 0"); let db = HotColdDB { split: RwLock::new(Split::default()), @@ -222,6 +230,7 @@ impl HotColdDB, MemoryStore> { )), config, hierarchy, + hierarchy_hot, spec, log, _phantom: PhantomData, @@ -245,12 +254,18 @@ impl HotColdDB, LevelDB> { spec: Arc, log: Logger, ) -> Result, Error> { + debug!(log, "Opening HotColdDB"); config.verify::()?; let hierarchy = config.hierarchy_config.to_moduli()?; + // TODO(hdiff): Use different exponents + let hierarchy_hot = config.hierarchy_config.to_moduli()?; + + debug!(log, "Opening LevelDB"; "hot_path" => ?hot_path); let hot_db = LevelDB::open(hot_path)?; let anchor_info = RwLock::new(Self::load_anchor_info(&hot_db)?); + debug!(log, "Loaded anchor info"; "anchor_info" => ?anchor_info); let db = HotColdDB { split: RwLock::new(Split::default()), @@ -268,6 +283,7 @@ impl HotColdDB, LevelDB> { )), config, hierarchy, + hierarchy_hot, spec, log, _phantom: PhantomData, @@ -280,7 +296,11 @@ impl HotColdDB, LevelDB> { // Load the previous split slot from the database (if any). This ensures we can // stop and restart correctly. This needs to occur *before* running any migrations // because some migrations load states and depend on the split. - if let Some(split) = db.load_split()? { + // + // V23: `load_split` needs to load a hot state summary, which need to be migrated from V22 + // to V23. Attempting to `load_split` here before the migration will trigger an SSZ decode + // error. Instead we load the partial split, and load the full split after the migration. + if let Some(split) = db.load_split_partial()? { *db.split.write() = split; info!( @@ -367,11 +387,24 @@ impl HotColdDB, LevelDB> { "from_version" => schema_version.as_u64(), "to_version" => CURRENT_SCHEMA_VERSION.as_u64(), ); - migrate_schema(db.clone(), schema_version, CURRENT_SCHEMA_VERSION)?; + migrate_schema(db.clone(), schema_version, CURRENT_SCHEMA_VERSION).map_err(|e| { + Error::MigrationError(format!( + "Migrating from {:?} to {:?}: {:?}", + schema_version, CURRENT_SCHEMA_VERSION, e + )) + })?; } else { db.store_schema_version(CURRENT_SCHEMA_VERSION)?; } + // Load the full split after the migration to set the `split.block_root` to the correct + // value. + if let Some(split) = db.load_split()? { + *db.split.write() = split; + + debug!(db.log, "Hot-Cold DB split initialized"; "split" => ?split); + } + // Ensure that any on-disk config is compatible with the supplied config. if let Some(disk_config) = db.load_config()? { let split = db.get_split_info(); @@ -403,6 +436,8 @@ impl HotColdDB, LevelDB> { info!(db.log, "Foreground compaction complete"); } + debug!(db.log, "Store anchor info"; "anchor" => ?db.get_anchor_info()); + Ok(db) } @@ -429,6 +464,21 @@ impl HotColdDB, LevelDB> { } impl, Cold: ItemStore> HotColdDB { + fn cold_storage_strategy(&self, slot: Slot) -> Result { + // The start slot for the freezer HDiff is always 0 + Ok(self.hierarchy.storage_strategy(slot, Slot::new(0))?) + } + + pub fn hot_storage_strategy(&self, slot: Slot) -> Result { + Ok(self + .hierarchy_hot + .storage_strategy(slot, self.hot_hdiff_start_slot())?) + } + + pub fn hot_hdiff_start_slot(&self) -> Slot { + self.anchor_info.read_recursive().anchor_slot + } + pub fn update_finalized_state( &self, state_root: Hash256, @@ -894,14 +944,6 @@ impl, Cold: ItemStore> HotColdDB } } - pub fn put_state_summary( - &self, - state_root: &Hash256, - summary: HotStateSummary, - ) -> Result<(), Error> { - self.hot_db.put(state_root, &summary).map_err(Into::into) - } - /// Store a state in the store. pub fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error> { self.put_state_possibly_temporary(state_root, state, false) @@ -1119,36 +1161,6 @@ impl, Cold: ItemStore> HotColdDB ) } - /// Load an epoch boundary state by using the hot state summary look-up. - /// - /// Will fall back to the cold DB if a hot state summary is not found. - pub fn load_epoch_boundary_state( - &self, - state_root: &Hash256, - ) -> Result>, Error> { - if let Some(HotStateSummary { - epoch_boundary_state_root, - .. - }) = self.load_hot_state_summary(state_root)? - { - // NOTE: minor inefficiency here because we load an unnecessary hot state summary - let (state, _) = self.load_hot_state(&epoch_boundary_state_root)?.ok_or( - HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root), - )?; - Ok(Some(state)) - } else { - // Try the cold DB - match self.load_cold_state_slot(state_root)? { - Some(state_slot) => { - let epoch_boundary_slot = - state_slot / E::slots_per_epoch() * E::slots_per_epoch(); - self.load_cold_state_by_slot(epoch_boundary_slot).map(Some) - } - None => Ok(None), - } - } - } - pub fn put_item(&self, key: &Hash256, item: &I) -> Result<(), Error> { self.hot_db.put(key, item) } @@ -1229,8 +1241,10 @@ impl, Cold: ItemStore> HotColdDB StoreOp::DeleteState(state_root, slot) => { // Delete the hot state summary. - let state_summary_key = - get_key_for_col(DBColumn::BeaconStateSummary.into(), state_root.as_slice()); + let state_summary_key = get_key_for_col( + DBColumn::BeaconStateHotSummary.into(), + state_root.as_slice(), + ); key_value_batch.push(KeyValueStoreOp::DeleteKey(state_summary_key)); // Delete the state temporary flag (if any). Temporary flags are commonly @@ -1241,10 +1255,36 @@ impl, Cold: ItemStore> HotColdDB ); key_value_batch.push(KeyValueStoreOp::DeleteKey(state_temp_key)); - if slot.map_or(true, |slot| slot % E::slots_per_epoch() == 0) { - let state_key = - get_key_for_col(DBColumn::BeaconState.into(), state_root.as_slice()); - key_value_batch.push(KeyValueStoreOp::DeleteKey(state_key)); + if let Some(slot) = slot { + match self.hot_storage_strategy(slot)? { + StorageStrategy::Snapshot => { + // Full state stored in this position + key_value_batch.push(KeyValueStoreOp::DeleteKey(get_key_for_col( + DBColumn::BeaconStateHotSnapshot.into(), + state_root.as_slice(), + ))); + } + StorageStrategy::DiffFrom(_) => { + // Diff stored in this position + key_value_batch.push(KeyValueStoreOp::DeleteKey(get_key_for_col( + DBColumn::BeaconStateHotDiff.into(), + state_root.as_slice(), + ))); + } + StorageStrategy::ReplayFrom(_) => { + // Nothing else to delete + } + } + } else { + // TODO(hdiff): should attempt to delete everything if slot is not available? + key_value_batch.push(KeyValueStoreOp::DeleteKey(get_key_for_col( + DBColumn::BeaconStateHotSnapshot.into(), + state_root.as_slice(), + ))); + key_value_batch.push(KeyValueStoreOp::DeleteKey(get_key_for_col( + DBColumn::BeaconStateHotDiff.into(), + state_root.as_slice(), + ))); } } @@ -1441,17 +1481,14 @@ impl, Cold: ItemStore> HotColdDB state: &BeaconState, ops: &mut Vec, ) -> Result<(), Error> { - // Put the state in the cache. - let block_root = state.get_latest_block_root(*state_root); - // Avoid storing states in the database if they already exist in the state cache. // The exception to this is the finalized state, which must exist in the cache before it // is stored on disk. - if let PutStateOutcome::Duplicate = - self.state_cache - .lock() - .put_state(*state_root, block_root, state)? - { + if let PutStateOutcome::Duplicate = self.state_cache.lock().put_state( + *state_root, + state.get_latest_block_root(*state_root), + state, + )? { debug!( self.log, "Skipping storage of cached state"; @@ -1461,27 +1498,101 @@ impl, Cold: ItemStore> HotColdDB return Ok(()); } - // On the epoch boundary, store the full state. - if state.slot() % E::slots_per_epoch() == 0 { - trace!( - self.log, - "Storing full state on epoch boundary"; - "slot" => state.slot().as_u64(), - "state_root" => format!("{:?}", state_root) - ); - store_full_state(state_root, state, ops)?; - } + self.store_hot_state_summary(state_root, state, ops)?; + self.store_hot_state_diffs(state_root, state, ops)?; + // TODO(hdiff): to debug + debug!( + self.log, + "Stored hot state summary and diffs"; + "state_root" => ?state_root, + "slot" => state.slot(), + "storage_strategy" => ?self.hot_storage_strategy(state.slot())? + ); + + Ok(()) + } + + /// Store a post-finalization state efficiently in the hot database. + pub fn store_hot_state_summary( + &self, + state_root: &Hash256, + state: &BeaconState, + ops: &mut Vec, + ) -> Result<(), Error> { // Store a summary of the state. // We store one even for the epoch boundary states, as we may need their slots // when doing a look up by state root. - let hot_state_summary = HotStateSummary::new(state_root, state)?; - let op = hot_state_summary.as_kv_store_op(*state_root); - ops.push(op); + let hot_state_summary = + HotStateSummary::new(state_root, state, self.hot_storage_strategy(state.slot())?)?; + ops.push(hot_state_summary.as_kv_store_op(*state_root)); + Ok(()) + } + + pub fn store_hot_state_diffs( + &self, + state_root: &Hash256, + state: &BeaconState, + ops: &mut Vec, + ) -> Result<(), Error> { + let slot = state.slot(); + match self.hot_storage_strategy(slot)? { + StorageStrategy::ReplayFrom(from_slot) => { + debug!( + self.log, + "Storing hot state"; + "strategy" => "replay", + "from_slot" => from_slot, + "slot" => slot, + ); + // Already have persisted the state summary, don't persist anything else + } + StorageStrategy::Snapshot => { + debug!( + self.log, + "Storing hot state"; + "strategy" => "snapshot", + "slot" => slot, + ); + self.store_hot_state_as_snapshot(state_root, state, ops)?; + } + StorageStrategy::DiffFrom(from_slot) => { + debug!( + self.log, + "Storing hot state"; + "strategy" => "diff", + "from_slot" => from_slot, + "slot" => slot, + ); + // TODO(hdiff): Max distance in a diff layer must be less than SlotsPerHistoricalRoot + // we should fix this by using the state summary iterator to go back + let from_root = *state + .get_state_root(from_slot) + .map_err(HotColdDBError::HotStateSummaryError)?; + + self.store_hot_state_as_diff(state_root, state, from_root, ops)?; + } + } Ok(()) } + fn store_hot_state_as_diff( + &self, + state_root: &Hash256, + state: &BeaconState, + from_root: Hash256, + ops: &mut Vec, + ) -> Result<(), Error> { + let base_buffer = self.load_hot_hdiff_buffer(from_root)?; + let target_buffer = HDiffBuffer::from_state(state.clone()); + let diff = HDiff::compute(&base_buffer, &target_buffer, &self.config)?; + let diff_bytes = diff.as_ssz_bytes(); + let key = get_key_for_col(DBColumn::BeaconStateHotDiff.into(), state_root.as_slice()); + ops.push(KeyValueStoreOp::PutKeyValue(key, diff_bytes)); + Ok(()) + } + /// Get a post-finalization state from the database or store. pub fn get_hot_state(&self, state_root: &Hash256) -> Result>, Error> { if let Some(state) = self.state_cache.lock().get_by_state_root(*state_root) { @@ -1517,6 +1628,67 @@ impl, Cold: ItemStore> HotColdDB } } + fn load_hot_hdiff_buffer(&self, state_root: Hash256) -> Result { + // FIXME(tree-states): Add cache of hot hdiff buffers + let Some(HotStateSummary { + slot, + diff_base_state_root, + .. + }) = self.load_hot_state_summary(&state_root)? + else { + let mut existing_summaries = self + .load_hot_state_summaries()? + .into_iter() + .map(|(state_root, summary)| (state_root, summary.slot)) + .collect::>(); + existing_summaries.sort_by(|a, b| a.1.cmp(&b.1)); + // Hot summaries should never be missing, dump the current list of summaries to ease debug + debug!( + self.log, + "MissingHotStateSummary"; + "requested" => ?state_root, + "existing_summaries" => ?existing_summaries, + ); + return Err(Error::MissingHotStateSummary(state_root)); + }; + + match self.hot_storage_strategy(slot)? { + StorageStrategy::Snapshot => { + // FIXME(tree-states): rename error + let state = self + .load_hot_state_as_snapshot(state_root)? + .ok_or(HotColdDBError::MissingHotStateSnapshot(state_root, slot))?; + let buffer = HDiffBuffer::from_state(state); + Ok(buffer) + } + StorageStrategy::DiffFrom(from_slot) => { + let from_state_root = diff_base_state_root.get_root(from_slot)?; + let mut buffer = self.load_hot_hdiff_buffer(from_state_root)?; + let diff = self.load_hot_hdiff(state_root)?; + diff.apply(&mut buffer, &self.config)?; + Ok(buffer) + } + StorageStrategy::ReplayFrom(from_slot) => { + let from_state_root = diff_base_state_root.get_root(from_slot)?; + self.load_hot_hdiff_buffer(from_state_root) + } + } + } + + fn load_hot_hdiff(&self, state_root: Hash256) -> Result { + let bytes = { + let _t = metrics::start_timer(&metrics::BEACON_HDIFF_READ_TIMES); + self.hot_db + .get_bytes(DBColumn::BeaconStateHotDiff.into(), state_root.as_slice())? + .ok_or(HotColdDBError::MissingHotHDiff(state_root))? + }; + let hdiff = { + let _t = metrics::start_timer(&metrics::BEACON_HDIFF_DECODE_TIMES); + HDiff::from_ssz_bytes(&bytes)? + }; + Ok(hdiff) + } + /// Load a post-finalization state from the hot database. /// /// Will replay blocks from the nearest epoch boundary. @@ -1538,59 +1710,33 @@ impl, Cold: ItemStore> HotColdDB if let Some(HotStateSummary { slot, latest_block_root, - epoch_boundary_state_root, + diff_base_state_root, + .. }) = self.load_hot_state_summary(state_root)? { - let mut boundary_state = - get_full_state(&self.hot_db, &epoch_boundary_state_root, &self.spec)?.ok_or( - HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root), - )?; + let mut state = match self.hot_storage_strategy(slot)? { + StorageStrategy::Snapshot | StorageStrategy::DiffFrom(_) => { + let buffer = self.load_hot_hdiff_buffer(*state_root)?; + buffer.as_state(&self.spec)? + } + StorageStrategy::ReplayFrom(from_slot) => { + let from_state_root = diff_base_state_root.get_root(from_slot)?; + + let (mut base_state, _) = self.load_hot_state(&from_state_root)?.ok_or( + HotColdDBError::MissingHotState { + state_root: from_state_root, + requested_by_state_summary: (*state_root, slot), + }, + )?; - // Immediately rebase the state from disk on the finalized state so that we can reuse - // parts of the tree for state root calculation in `replay_blocks`. - self.state_cache - .lock() - .rebase_on_finalized(&mut boundary_state, &self.spec)?; + // Immediately rebase the state from disk on the finalized state so that we can + // reuse parts of the tree for state root calculation in `replay_blocks`. + self.state_cache + .lock() + .rebase_on_finalized(&mut base_state, &self.spec)?; - // Optimization to avoid even *thinking* about replaying blocks if we're already - // on an epoch boundary. - let mut state = if slot % E::slots_per_epoch() == 0 { - boundary_state - } else { - // Cache ALL intermediate states that are reached during block replay. We may want - // to restrict this in future to only cache epoch boundary states. At worst we will - // cache up to 32 states for each state loaded, which should not flush out the cache - // entirely. - let state_cache_hook = |state_root, state: &mut BeaconState| { - // Ensure all caches are built before attempting to cache. - state.update_tree_hash_cache()?; - state.build_all_caches(&self.spec)?; - - let latest_block_root = state.get_latest_block_root(state_root); - if let PutStateOutcome::New = - self.state_cache - .lock() - .put_state(state_root, latest_block_root, state)? - { - debug!( - self.log, - "Cached ancestor state"; - "state_root" => ?state_root, - "slot" => slot, - ); - } - Ok(()) - }; - let blocks = - self.load_blocks_to_replay(boundary_state.slot(), slot, latest_block_root)?; - let _t = metrics::start_timer(&metrics::STORE_BEACON_REPLAY_HOT_BLOCKS_TIME); - self.replay_blocks( - boundary_state, - blocks, - slot, - no_state_root_iter(), - Some(Box::new(state_cache_hook)), - )? + self.load_hot_state_using_replay(base_state, slot, latest_block_root)? + } }; state.apply_pending_mutations()?; @@ -1600,6 +1746,22 @@ impl, Cold: ItemStore> HotColdDB } } + fn load_hot_state_using_replay( + &self, + base_state: BeaconState, + slot: Slot, + latest_block_root: Hash256, + ) -> Result, Error> { + if base_state.slot() == slot { + return Ok(base_state); + } + + let blocks = self.load_blocks_to_replay(base_state.slot(), slot, latest_block_root)?; + let _t = metrics::start_timer(&metrics::STORE_BEACON_REPLAY_HOT_BLOCKS_TIME); + // FIXME(tree-states): reconsider caching + self.replay_blocks(base_state, blocks, slot, no_state_root_iter(), None) + } + pub fn store_cold_state_summary( &self, state_root: &Hash256, @@ -1627,14 +1789,14 @@ impl, Cold: ItemStore> HotColdDB self.store_cold_state_summary(state_root, state.slot(), ops)?; let slot = state.slot(); - match self.hierarchy.storage_strategy(slot)? { + match self.cold_storage_strategy(slot)? { StorageStrategy::ReplayFrom(from) => { debug!( self.log, "Storing cold state"; "strategy" => "replay", "from_slot" => from, - "slot" => state.slot(), + "slot" => slot, ); // Already have persisted the state summary, don't persist anything else } @@ -1643,7 +1805,7 @@ impl, Cold: ItemStore> HotColdDB self.log, "Storing cold state"; "strategy" => "snapshot", - "slot" => state.slot(), + "slot" => slot, ); self.store_cold_state_as_snapshot(state, ops)?; } @@ -1653,7 +1815,7 @@ impl, Cold: ItemStore> HotColdDB "Storing cold state"; "strategy" => "diff", "from_slot" => from, - "slot" => state.slot(), + "slot" => slot, ); self.store_cold_state_as_diff(state, from, ops)?; } @@ -1706,6 +1868,54 @@ impl, Cold: ItemStore> HotColdDB } } + pub fn store_hot_state_as_snapshot( + &self, + state_root: &Hash256, + state: &BeaconState, + ops: &mut Vec, + ) -> Result<(), Error> { + let bytes = state.as_ssz_bytes(); + let compressed_value = { + let _timer = metrics::start_timer(&metrics::STORE_BEACON_STATE_FREEZER_COMPRESS_TIME); + let mut out = Vec::with_capacity(self.config.estimate_compressed_size(bytes.len())); + let mut encoder = Encoder::new(&mut out, self.config.compression_level) + .map_err(Error::Compression)?; + encoder.write_all(&bytes).map_err(Error::Compression)?; + encoder.finish().map_err(Error::Compression)?; + out + }; + + let key = get_key_for_col( + DBColumn::BeaconStateHotSnapshot.into(), + state_root.as_slice(), + ); + ops.push(KeyValueStoreOp::PutKeyValue(key, compressed_value)); + Ok(()) + } + + fn load_hot_state_bytes_as_snapshot( + &self, + state_root: Hash256, + ) -> Result>, Error> { + match self.hot_db.get_bytes( + DBColumn::BeaconStateHotSnapshot.into(), + state_root.as_slice(), + )? { + Some(bytes) => { + let _timer = + metrics::start_timer(&metrics::STORE_BEACON_STATE_FREEZER_DECOMPRESS_TIME); + let mut ssz_bytes = + Vec::with_capacity(self.config.estimate_decompressed_size(bytes.len())); + let mut decoder = Decoder::new(&*bytes).map_err(Error::Compression)?; + decoder + .read_to_end(&mut ssz_bytes) + .map_err(Error::Compression)?; + Ok(Some(ssz_bytes)) + } + None => Ok(None), + } + } + fn load_cold_state_as_snapshot(&self, slot: Slot) -> Result>, Error> { Ok(self .load_cold_state_bytes_as_snapshot(slot)? @@ -1713,6 +1923,16 @@ impl, Cold: ItemStore> HotColdDB .transpose()?) } + fn load_hot_state_as_snapshot( + &self, + state_root: Hash256, + ) -> Result>, Error> { + Ok(self + .load_hot_state_bytes_as_snapshot(state_root)? + .map(|bytes| BeaconState::from_ssz_bytes(&bytes, &self.spec)) + .transpose()?) + } + pub fn store_cold_state_as_diff( &self, state: &BeaconState, @@ -1753,7 +1973,7 @@ impl, Cold: ItemStore> HotColdDB /// /// Will reconstruct the state if it lies between restore points. pub fn load_cold_state_by_slot(&self, slot: Slot) -> Result, Error> { - let storage_strategy = self.hierarchy.storage_strategy(slot)?; + let storage_strategy = self.cold_storage_strategy(slot)?; // Search for a state from this slot or a recent prior slot in the historic state cache. let mut historic_state_cache = self.historic_state_cache.lock(); @@ -1782,7 +2002,7 @@ impl, Cold: ItemStore> HotColdDB // Load using the diff hierarchy. For states that require replay we recurse into this // function so that we can try to get their pre-state *as a state* rather than an hdiff // buffer. - match self.hierarchy.storage_strategy(slot)? { + match self.cold_storage_strategy(slot)? { StorageStrategy::Snapshot | StorageStrategy::DiffFrom(_) => { let buffer_timer = metrics::start_timer(&metrics::STORE_BEACON_HDIFF_BUFFER_LOAD_TIME); @@ -1888,7 +2108,7 @@ impl, Cold: ItemStore> HotColdDB // Load buffer for the previous state. // This amount of recursion (<10 levels) should be OK. let t = std::time::Instant::now(); - match self.hierarchy.storage_strategy(slot)? { + match self.cold_storage_strategy(slot)? { // Base case. StorageStrategy::Snapshot => { let state = self @@ -2163,10 +2383,9 @@ impl, Cold: ItemStore> HotColdDB pub fn init_anchor_info( &self, block: BeaconBlockRef<'_, E>, + anchor_slot: Slot, retain_historic_states: bool, ) -> Result { - let anchor_slot = block.slot(); - // Set the `state_upper_limit` to the slot of the *next* checkpoint. let next_snapshot_slot = self.hierarchy.next_snapshot_slot(anchor_slot)?; let state_upper_limit = if !retain_historic_states { @@ -2231,7 +2450,8 @@ impl, Cold: ItemStore> HotColdDB /// Load the anchor info from disk. fn load_anchor_info(hot_db: &Hot) -> Result { Ok(hot_db - .get(&ANCHOR_INFO_KEY)? + .get(&ANCHOR_INFO_KEY) + .map_err(|e| Error::LoadAnchorInfo(e.into()))? .unwrap_or(ANCHOR_UNINITIALIZED)) } @@ -2314,7 +2534,9 @@ impl, Cold: ItemStore> HotColdDB /// Load the blob info from disk, but do not set `self.blob_info`. fn load_blob_info(&self) -> Result, Error> { - self.hot_db.get(&BLOB_INFO_KEY) + self.hot_db + .get(&BLOB_INFO_KEY) + .map_err(|e| Error::LoadBlobInfo(e.into())) } /// Store the given `blob_info` to disk. @@ -2359,7 +2581,9 @@ impl, Cold: ItemStore> HotColdDB /// Load the blob info from disk, but do not set `self.data_column_info`. fn load_data_column_info(&self) -> Result, Error> { - self.hot_db.get(&DATA_COLUMN_INFO_KEY) + self.hot_db + .get(&DATA_COLUMN_INFO_KEY) + .map_err(|e| Error::LoadDataColumnInfo(e.into())) } /// Store the given `data_column_info` to disk. @@ -2418,7 +2642,9 @@ impl, Cold: ItemStore> HotColdDB /// Load previously-stored config from disk. fn load_config(&self) -> Result, Error> { - self.hot_db.get(&CONFIG_KEY) + self.hot_db + .get(&CONFIG_KEY) + .map_err(|e| Error::LoadConfig(e.into())) } /// Write the config to disk. @@ -2428,17 +2654,24 @@ impl, Cold: ItemStore> HotColdDB /// Load the split point from disk, sans block root. fn load_split_partial(&self) -> Result, Error> { - self.hot_db.get(&SPLIT_KEY) + self.hot_db + .get(&SPLIT_KEY) + .map_err(|e| Error::LoadSplit(e.into())) } /// Load the split point from disk, including block root. fn load_split(&self) -> Result, Error> { match self.load_split_partial()? { Some(mut split) => { + debug!(self.log, "Loaded split partial"; "split" => ?split); // Load the hot state summary to get the block root. - let summary = self.load_hot_state_summary(&split.state_root)?.ok_or( - HotColdDBError::MissingSplitState(split.state_root, split.slot), - )?; + let summary = self + .load_hot_state_summary(&split.state_root) + .map_err(|e| Error::LoadHotStateSummaryForSplit(e.into()))? + .ok_or(HotColdDBError::MissingSplitState( + split.state_root, + split.slot, + ))?; split.block_root = summary.latest_block_root; Ok(Some(split)) } @@ -2464,13 +2697,15 @@ impl, Cold: ItemStore> HotColdDB &self, state_root: &Hash256, ) -> Result, Error> { - self.hot_db.get(state_root) + self.hot_db + .get(state_root) + .map_err(|e| Error::LoadHotStateSummary(*state_root, e.into())) } /// Load all hot state summaries present in the hot DB pub fn load_hot_state_summaries(&self) -> Result, Error> { self.hot_db - .iter_column::(DBColumn::BeaconStateSummary) + .iter_column::(DBColumn::BeaconStateHotSummary) .map(|res| { let (state_root, value) = res?; let summary = HotStateSummary::from_ssz_bytes(&value)?; @@ -2540,25 +2775,6 @@ impl, Cold: ItemStore> HotColdDB self.config.compact_on_prune } - /// Load the checkpoint to begin pruning from (the "old finalized checkpoint"). - pub fn load_pruning_checkpoint(&self) -> Result, Error> { - Ok(self - .hot_db - .get(&PRUNING_CHECKPOINT_KEY)? - .map(|pc: PruningCheckpoint| pc.checkpoint)) - } - - /// Store the checkpoint to begin pruning from (the "old finalized checkpoint"). - pub fn store_pruning_checkpoint(&self, checkpoint: Checkpoint) -> Result<(), Error> { - self.hot_db - .do_atomically(vec![self.pruning_checkpoint_store_op(checkpoint)]) - } - - /// Create a staged store for the pruning checkpoint. - pub fn pruning_checkpoint_store_op(&self, checkpoint: Checkpoint) -> KeyValueStoreOp { - PruningCheckpoint { checkpoint }.as_kv_store_op(PRUNING_CHECKPOINT_KEY) - } - /// Load the timestamp of the last compaction as a `Duration` since the UNIX epoch. pub fn load_compaction_timestamp(&self) -> Result, Error> { Ok(self @@ -2641,7 +2857,7 @@ impl, Cold: ItemStore> HotColdDB "Pruning finalized payloads"; "info" => "you may notice degraded I/O performance while this runs" ); - let anchor_slot = self.get_anchor_info().anchor_slot; + let anchor_info = self.get_anchor_info(); let mut ops = vec![]; let mut last_pruned_block_root = None; @@ -2682,10 +2898,10 @@ impl, Cold: ItemStore> HotColdDB ops.push(StoreOp::DeleteExecutionPayload(block_root)); } - if slot == anchor_slot { + if slot < anchor_info.oldest_block_slot { info!( self.log, - "Payload pruning reached anchor state"; + "Payload pruning reached anchor oldest block slot"; "slot" => slot ); break; @@ -2914,6 +3130,7 @@ impl, Cold: ItemStore> HotColdDB let mut cold_ops = vec![]; let current_schema_columns = vec![ + DBColumn::BeaconStateHotSummary, DBColumn::BeaconColdStateSummary, DBColumn::BeaconStateSnapshot, DBColumn::BeaconStateDiff, @@ -2972,57 +3189,6 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } - - /// Prune states from the hot database which are prior to the split. - /// - /// This routine is important for cleaning up advanced states which are stored in the database - /// with a temporary flag. - pub fn prune_old_hot_states(&self) -> Result<(), Error> { - let split = self.get_split_info(); - debug!( - self.log, - "Database state pruning started"; - "split_slot" => split.slot, - ); - let mut state_delete_batch = vec![]; - for res in self - .hot_db - .iter_column::(DBColumn::BeaconStateSummary) - { - let (state_root, summary_bytes) = res?; - let summary = HotStateSummary::from_ssz_bytes(&summary_bytes)?; - - if summary.slot <= split.slot { - let old = summary.slot < split.slot; - let non_canonical = summary.slot == split.slot - && state_root != split.state_root - && !split.state_root.is_zero(); - if old || non_canonical { - let reason = if old { - "old dangling state" - } else { - "non-canonical" - }; - debug!( - self.log, - "Deleting state"; - "state_root" => ?state_root, - "slot" => summary.slot, - "reason" => reason, - ); - state_delete_batch.push(StoreOp::DeleteState(state_root, Some(summary.slot))); - } - } - } - let num_deleted_states = state_delete_batch.len(); - self.do_atomically_with_block_and_blobs_cache(state_delete_batch)?; - debug!( - self.log, - "Database state pruning complete"; - "num_deleted_states" => num_deleted_states, - ); - Ok(()) - } } /// Advance the split point of the store, moving new finalized states to the freezer. @@ -3058,10 +3224,7 @@ pub fn migrate_database, Cold: ItemStore>( return Err(HotColdDBError::FreezeSlotUnaligned(finalized_state.slot()).into()); } - let mut hot_db_ops = vec![]; let mut cold_db_block_ops = vec![]; - let mut epoch_boundary_blocks = HashSet::new(); - let mut non_checkpoint_block_roots = HashSet::new(); // Iterate in descending order until the current split slot let state_roots = RootsIterator::new(&store, finalized_state) @@ -3073,14 +3236,6 @@ pub fn migrate_database, Cold: ItemStore>( // Then, iterate states in slot ascending order, as they are stored wrt previous states. for (block_root, state_root, slot) in state_roots.into_iter().rev() { - // Delete the execution payload if payload pruning is enabled. At a skipped slot we may - // delete the payload for the finalized block itself, but that's OK as we only guarantee - // that payloads are present for slots >= the split slot. The payload fetching code is also - // forgiving of missing payloads. - if store.config.prune_payloads { - hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root)); - } - // Store the slot to block root mapping. cold_db_block_ops.push(KeyValueStoreOp::PutKeyValue( get_key_for_col( @@ -3090,25 +3245,6 @@ pub fn migrate_database, Cold: ItemStore>( block_root.as_slice().to_vec(), )); - // At a missed slot, `state_root_iter` will return the block root - // from the previous non-missed slot. This ensures that the block root at an - // epoch boundary is always a checkpoint block root. We keep track of block roots - // at epoch boundaries by storing them in the `epoch_boundary_blocks` hash set. - // We then ensure that block roots at the epoch boundary aren't included in the - // `non_checkpoint_block_roots` hash set. - if slot % E::slots_per_epoch() == 0 { - epoch_boundary_blocks.insert(block_root); - } else { - non_checkpoint_block_roots.insert(block_root); - } - - if epoch_boundary_blocks.contains(&block_root) { - non_checkpoint_block_roots.remove(&block_root); - } - - // Delete the old summary, and the full state if we lie on an epoch boundary. - hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot))); - // Do not try to store states if a restore point is yet to be stored, or will never be // stored (see `STATE_UPPER_LIMIT_NO_RETAIN`). Make an exception for the genesis state // which always needs to be copied from the hot DB to the freezer and should not be deleted. @@ -3123,7 +3259,7 @@ pub fn migrate_database, Cold: ItemStore>( // Calling `store_cold_state_summary` instead of `store_cold_state` for those allows us // to skip loading many hot states. if matches!( - store.hierarchy.storage_strategy(slot)?, + store.cold_storage_strategy(slot)?, StorageStrategy::ReplayFrom(..) ) { // Store slot -> state_root and state_root -> slot mappings. @@ -3141,19 +3277,6 @@ pub fn migrate_database, Cold: ItemStore>( store.cold_db.do_atomically(cold_db_ops)?; } - // Prune sync committee branch data for all non checkpoint block roots. - // Note that `non_checkpoint_block_roots` should only contain non checkpoint block roots - // as long as `finalized_state.slot()` is at an epoch boundary. If this were not the case - // we risk the chance of pruning a `sync_committee_branch` for a checkpoint block root. - // E.g. if `current_split_slot` = (Epoch A slot 0) and `finalized_state.slot()` = (Epoch C slot 31) - // and (Epoch D slot 0) is a skipped slot, we will have pruned a `sync_committee_branch` - // for a checkpoint block root. - non_checkpoint_block_roots - .into_iter() - .for_each(|block_root| { - hot_db_ops.push(StoreOp::DeleteSyncCommitteeBranch(block_root)); - }); - // Warning: Critical section. We have to take care not to put any of the two databases in an // inconsistent state if the OS process dies at any point during the freezing // procedure. @@ -3201,9 +3324,6 @@ pub fn migrate_database, Cold: ItemStore>( *split_guard = split; } - // Delete the blocks and states from the hot database if we got this far. - store.do_atomically_with_block_and_blobs_cache(hot_db_ops)?; - // Update the cache's view of the finalized state. store.update_finalized_state( finalized_state_root, @@ -3257,16 +3377,50 @@ fn no_state_root_iter() -> Option Self { + Self { slot, state_root } + } + + pub fn zero() -> Self { + Self { + slot: Slot::new(0), + state_root: Hash256::ZERO, + } + } + + pub fn get_root(&self, slot: Slot) -> Result { + if self.slot == slot { + Ok(self.state_root) + } else { + Err(Error::MissmatchDiffBaseStateRoot { + expected_slot: slot, + stored_slot: self.slot, + }) + } + } } impl StoreItem for HotStateSummary { fn db_column() -> DBColumn { - DBColumn::BeaconStateSummary + DBColumn::BeaconStateHotSummary } fn as_store_bytes(&self) -> Vec { @@ -3280,23 +3434,38 @@ impl StoreItem for HotStateSummary { impl HotStateSummary { /// Construct a new summary of the given state. - pub fn new(state_root: &Hash256, state: &BeaconState) -> Result { + pub fn new( + state_root: &Hash256, + state: &BeaconState, + storage_strategy: StorageStrategy, + ) -> Result { // Fill in the state root on the latest block header if necessary (this happens on all // slots where there isn't a skip). let latest_block_root = state.get_latest_block_root(*state_root); - let epoch_boundary_slot = state.slot() / E::slots_per_epoch() * E::slots_per_epoch(); - let epoch_boundary_state_root = if epoch_boundary_slot == state.slot() { - *state_root + + let get_state_root = |slot| { + if slot == state.slot() { + Ok(*state_root) + } else { + state + .get_state_root(slot) + .copied() + .map_err(HotColdDBError::HotStateSummaryError) + } + }; + let diff_base_slot = storage_strategy.diff_base_slot(); + let diff_base_state_root = if let Some(diff_base_slot) = diff_base_slot { + DiffBaseStateRoot::new(diff_base_slot, get_state_root(diff_base_slot)?) } else { - *state - .get_state_root(epoch_boundary_slot) - .map_err(HotColdDBError::HotStateSummaryError)? + DiffBaseStateRoot::zero() }; Ok(HotStateSummary { slot: state.slot(), latest_block_root, - epoch_boundary_state_root, + diff_base_state_root, + // Note: if genesis state, it will point to its own state root + previous_state_root: get_state_root(state.slot().saturating_sub(1_u64))?, }) } } diff --git a/beacon_node/store/src/impls/beacon_state.rs b/beacon_node/store/src/impls/beacon_state.rs index 48c289f2b2d..df51ad8ea9a 100644 --- a/beacon_node/store/src/impls/beacon_state.rs +++ b/beacon_node/store/src/impls/beacon_state.rs @@ -1,23 +1,8 @@ use crate::*; -use ssz::{DecodeError, Encode}; +use ssz::DecodeError; use ssz_derive::Encode; -pub fn store_full_state( - state_root: &Hash256, - state: &BeaconState, - ops: &mut Vec, -) -> Result<(), Error> { - let bytes = { - let _overhead_timer = metrics::start_timer(&metrics::BEACON_STATE_WRITE_OVERHEAD_TIMES); - StorageContainer::new(state).as_ssz_bytes() - }; - metrics::inc_counter_by(&metrics::BEACON_STATE_WRITE_BYTES, bytes.len() as u64); - metrics::inc_counter(&metrics::BEACON_STATE_WRITE_COUNT); - let key = get_key_for_col(DBColumn::BeaconState.into(), state_root.as_slice()); - ops.push(KeyValueStoreOp::PutKeyValue(key, bytes)); - Ok(()) -} - +// FIXME(tree-states): delete/move to migration pub fn get_full_state, E: EthSpec>( db: &KV, state_root: &Hash256, diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 09ae9a32dd0..35489ec3f97 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -35,7 +35,10 @@ pub use self::leveldb_store::LevelDB; pub use self::memory_store::MemoryStore; pub use crate::metadata::BlobInfo; pub use errors::Error; -pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer; +pub use impls::{ + beacon_state::get_full_state as get_full_state_v22, + beacon_state::StorageContainer as BeaconStateStorageContainer, +}; pub use metadata::AnchorInfo; pub use metrics::scrape_for_metrics; use parking_lot::MutexGuard; @@ -90,7 +93,7 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { // i.e. entries being created and deleted. for column in [ DBColumn::BeaconState, - DBColumn::BeaconStateSummary, + DBColumn::BeaconStateHotSummary, DBColumn::BeaconBlock, ] { self.compact_column(column)?; @@ -262,20 +265,40 @@ pub enum DBColumn { #[strum(serialize = "bdc")] BeaconDataColumn, /// For full `BeaconState`s in the hot database (finalized or fork-boundary states). + /// + /// DEPRECATED. #[strum(serialize = "ste")] BeaconState, + /// For compact `BeaconStateDiff`'s in the hot DB. + /// + /// hsd = Hot State Diff. + #[strum(serialize = "hsd")] + BeaconStateHotDiff, + /// For beacon state snapshots in the hot DB. + /// + /// hsn = Hot Snapshot. + #[strum(serialize = "hsn")] + BeaconStateHotSnapshot, /// For beacon state snapshots in the freezer DB. #[strum(serialize = "bsn")] BeaconStateSnapshot, /// For compact `BeaconStateDiff`s in the freezer DB. #[strum(serialize = "bsd")] BeaconStateDiff, - /// Mapping from state root to `HotStateSummary` in the hot DB. + /// DEPRECATED + /// + /// Mapping from state root to `HotStateSummaryV22` in the hot DB. /// /// Previously this column also served a role in the freezer DB, mapping state roots to /// `ColdStateSummary`. However that role is now filled by `BeaconColdStateSummary`. #[strum(serialize = "bss")] BeaconStateSummary, + /// Mapping from state root to `HotStateSummaryV23` in the hot DB. + /// + /// This column is populated after DB schema version 23 superseding `BeaconStateSummary`. The + /// new column is necessary to have a safe migration without data loss. + #[strum(serialize = "bs3")] + BeaconStateHotSummary, /// Mapping from state root to `ColdStateSummary` in the cold DB. #[strum(serialize = "bcs")] BeaconColdStateSummary, @@ -377,6 +400,9 @@ impl DBColumn { | Self::BeaconState | Self::BeaconBlob | Self::BeaconStateSummary + | Self::BeaconStateHotDiff + | Self::BeaconStateHotSnapshot + | Self::BeaconStateHotSummary | Self::BeaconColdStateSummary | Self::BeaconStateTemporary | Self::ExecPayload diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 3f076a767ac..5b456cae59c 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -2,9 +2,9 @@ use crate::{DBColumn, Error, StoreItem}; use serde::{Deserialize, Serialize}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; -use types::{Checkpoint, Hash256, Slot}; +use types::{Hash256, Slot}; -pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(22); +pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(23); // All the keys that get stored under the `BeaconMeta` column. // @@ -12,7 +12,8 @@ pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(22); pub const SCHEMA_VERSION_KEY: Hash256 = Hash256::repeat_byte(0); pub const CONFIG_KEY: Hash256 = Hash256::repeat_byte(1); pub const SPLIT_KEY: Hash256 = Hash256::repeat_byte(2); -pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3); +// DEPRECATED +// pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3); pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4); pub const ANCHOR_INFO_KEY: Hash256 = Hash256::repeat_byte(5); pub const BLOB_INFO_KEY: Hash256 = Hash256::repeat_byte(6); @@ -65,30 +66,6 @@ impl StoreItem for SchemaVersion { } } -/// The checkpoint used for pruning the database. -/// -/// Updated whenever pruning is successful. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct PruningCheckpoint { - pub checkpoint: Checkpoint, -} - -impl StoreItem for PruningCheckpoint { - fn db_column() -> DBColumn { - DBColumn::BeaconMeta - } - - fn as_store_bytes(&self) -> Vec { - self.checkpoint.as_ssz_bytes() - } - - fn from_store_bytes(bytes: &[u8]) -> Result { - Ok(PruningCheckpoint { - checkpoint: Checkpoint::from_ssz_bytes(bytes)?, - }) - } -} - /// The last time the database was compacted. pub struct CompactionTimestamp(pub u64); @@ -111,7 +88,8 @@ impl StoreItem for CompactionTimestamp { pub struct AnchorInfo { /// The slot at which the anchor state is present and which we cannot revert. Values on start: /// - Genesis start: 0 - /// - Checkpoint sync: Slot of the finalized checkpoint block + /// - Checkpoint sync: Slot of the finalized state advanced to the checkpoint epoch + /// - Existing DB prior to v23: Finalized state slot at the migration moment /// /// Immutable pub anchor_slot: Slot,