Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hierarchical state diffs in hot DB #6750

Open
wants to merge 5 commits into
base: drop-headtracker
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions account_manager/src/validator/slashing_protection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub fn cli_run<E: EthSpec>(
let slashing_protection_database =
SlashingDatabase::open_or_create(&slashing_protection_db_path).map_err(|e| {
format!(
"Unable to open database at {}: {:?}",
"Unable to open slashing protection database at {}: {:?}",
slashing_protection_db_path.display(),
e
)
Expand Down Expand Up @@ -198,7 +198,7 @@ pub fn cli_run<E: EthSpec>(
let slashing_protection_database = SlashingDatabase::open(&slashing_protection_db_path)
.map_err(|e| {
format!(
"Unable to open database at {}: {:?}",
"Unable to open slashing protection database at {}: {:?}",
slashing_protection_db_path.display(),
e
)
Expand Down
85 changes: 41 additions & 44 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ use std::fmt::Debug;
use std::fs;
use std::io::Write;
use std::sync::Arc;
use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp};
use store::{Error as DBError, KeyValueStore, StoreOp};
use strum::AsRefStr;
use task_executor::JoinHandle;
use types::{
Expand Down Expand Up @@ -1455,52 +1455,49 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {

let distance = block.slot().as_u64().saturating_sub(state.slot().as_u64());
for _ in 0..distance {
let state_root = if parent.beacon_block.slot() == state.slot() {
// If it happens that `pre_state` has *not* already been advanced forward a single
// slot, then there is no need to compute the state root for this
// `per_slot_processing` call since that state root is already stored in the parent
// block.
parent.beacon_block.state_root()
} else {
// This is a new state we've reached, so stage it for storage in the DB.
// Computing the state root here is time-equivalent to computing it during slot
// processing, but we get early access to it.
let state_root = state.update_tree_hash_cache()?;

// Store the state immediately, marking it as temporary, and staging the deletion
// of its temporary status as part of the larger atomic operation.
let txn_lock = chain.store.hot_db.begin_rw_transaction();
let state_already_exists =
chain.store.load_hot_state_summary(&state_root)?.is_some();

let state_batch = if state_already_exists {
// If the state exists, it could be temporary or permanent, but in neither case
// should we rewrite it or store a new temporary flag for it. We *will* stage
// the temporary flag for deletion because it's OK to double-delete the flag,
// and we don't mind if another thread gets there first.
vec![]
let state_root =
if parent.beacon_block.slot() == state.slot() {
// If it happens that `pre_state` has *not* already been advanced forward a single
// slot, then there is no need to compute the state root for this
// `per_slot_processing` call since that state root is already stored in the parent
// block.
parent.beacon_block.state_root()
} else {
vec![
if state.slot() % T::EthSpec::slots_per_epoch() == 0 {
StoreOp::PutState(state_root, &state)
} else {
StoreOp::PutStateSummary(
state_root,
HotStateSummary::new(&state_root, &state)?,
)
},
StoreOp::PutStateTemporaryFlag(state_root),
]
};
chain
.store
.do_atomically_with_block_and_blobs_cache(state_batch)?;
drop(txn_lock);
// This is a new state we've reached, so stage it for storage in the DB.
// Computing the state root here is time-equivalent to computing it during slot
// processing, but we get early access to it.
let state_root = state.update_tree_hash_cache()?;

// Store the state immediately, marking it as temporary, and staging the deletion
// of its temporary status as part of the larger atomic operation.
// TODO(hdiff): Is it necessary to do this read tx now? Also why is it necessary to
// check that the summary exists at all? Are double writes common? Can this txn
// lock deadlock with the `do_atomically` call?
let txn_lock = chain.store.hot_db.begin_rw_transaction();
let state_already_exists =
chain.store.load_hot_state_summary(&state_root)?.is_some();

if state_already_exists {
// If the state exists, it could be temporary or permanent, but in neither case
// should we rewrite it or store a new temporary flag for it. We *will* stage
// the temporary flag for deletion because it's OK to double-delete the flag,
// and we don't mind if another thread gets there first.
} else {
let mut ops = vec![];
// Recycle store codepath to create a state summary and store the state / diff
chain.store.store_hot_state(&state_root, &state, &mut ops)?;
// Additionally write a temporary flag as part of the atomic write
ops.extend(chain.store.convert_to_kv_batch(vec![
StoreOp::PutStateTemporaryFlag(state_root),
])?);
chain.store.hot_db.do_atomically(ops)?;
}
drop(txn_lock);

confirmed_state_roots.push(state_root);
confirmed_state_roots.push(state_root);

state_root
};
state_root
};

if let Some(summary) = per_slot_processing(&mut state, Some(state_root), &chain.spec)? {
// Expose Prometheus metrics.
Expand Down
61 changes: 39 additions & 22 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ use std::time::Duration;
use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
use task_executor::{ShutdownReason, TaskExecutor};
use types::{
BeaconBlock, BeaconState, BlobSidecarList, ChainSpec, Checkpoint, Epoch, EthSpec,
FixedBytesExtended, Hash256, Signature, SignedBeaconBlock, Slot,
BeaconBlock, BeaconState, BlobSidecarList, ChainSpec, Epoch, EthSpec, FixedBytesExtended,
Hash256, Signature, SignedBeaconBlock, Slot,
};

/// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing
Expand Down Expand Up @@ -386,21 +386,28 @@ where
}

/// Starts a new chain from a genesis state.
pub fn genesis_state(mut self, beacon_state: BeaconState<E>) -> Result<Self, String> {
pub fn genesis_state(mut self, mut beacon_state: BeaconState<E>) -> Result<Self, String> {
let store = self.store.clone().ok_or("genesis_state requires a store")?;

// Initialize anchor info before attempting to write the genesis state
let retain_historic_states = self.chain_config.reconstruct_historic_states;
let genesis_beacon_block = genesis_block(&mut beacon_state, &self.spec)?;
self.pending_io_batch.push(
store
.init_anchor_info(
genesis_beacon_block.message().parent_root(),
Slot::new(0),
retain_historic_states,
)
.map_err(|e| format!("Failed to initialize genesis anchor: {:?}", e))?,
);

let (genesis, updated_builder) = self.set_genesis_state(beacon_state)?;
self = updated_builder;

// Stage the database's metadata fields for atomic storage when `build` is called.
// Since v4.4.0 we will set the anchor with a dummy state upper limit in order to prevent
// historic states from being retained (unless `--reconstruct-historic-states` is set).
let retain_historic_states = self.chain_config.reconstruct_historic_states;
self.pending_io_batch.push(
store
.init_anchor_info(genesis.beacon_block.message(), retain_historic_states)
.map_err(|e| format!("Failed to initialize genesis anchor: {:?}", e))?,
);
self.pending_io_batch.push(
store
.init_blob_info(genesis.beacon_block.slot())
Expand Down Expand Up @@ -518,6 +525,14 @@ where
}
}

debug!(
log,
"Storing split from weak subjectivity state";
"slot" => weak_subj_slot,
"state_root" => ?weak_subj_state_root,
"block_root" => ?weak_subj_block_root,
);

// Set the store's split point *before* storing genesis so that genesis is stored
// immediately in the freezer DB.
store.set_split(weak_subj_slot, weak_subj_state_root, weak_subj_block_root);
Expand All @@ -539,6 +554,19 @@ where
.do_atomically(block_root_batch)
.map_err(|e| format!("Error writing frozen block roots: {e:?}"))?;

// Write the anchor to memory before calling `put_state` otherwise hot hdiff can't store
// states that do not align with the start_slot grid
let retain_historic_states = self.chain_config.reconstruct_historic_states;
self.pending_io_batch.push(
store
.init_anchor_info(
weak_subj_block.message().parent_root(),
weak_subj_slot,
retain_historic_states,
)
.map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?,
);

// Write the state, block and blobs non-atomically, it doesn't matter if they're forgotten
// about on a crash restart.
store
Expand All @@ -548,6 +576,8 @@ where
weak_subj_state.clone(),
)
.map_err(|e| format!("Failed to set checkpoint state as finalized state: {:?}", e))?;
// Note: post hot hdiff must update the anchor info before attempting to put_state otherwise
// the write will fail if the weak_subj_slot is not aligned with the snapshot moduli.
store
.put_state(&weak_subj_state_root, &weak_subj_state)
.map_err(|e| format!("Failed to store weak subjectivity state: {e:?}"))?;
Expand All @@ -563,13 +593,7 @@ where
// Stage the database's metadata fields for atomic storage when `build` is called.
// This prevents the database from restarting in an inconsistent state if the anchor
// info or split point is written before the `PersistedBeaconChain`.
let retain_historic_states = self.chain_config.reconstruct_historic_states;
self.pending_io_batch.push(store.store_split_in_batch());
self.pending_io_batch.push(
store
.init_anchor_info(weak_subj_block.message(), retain_historic_states)
.map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?,
);
self.pending_io_batch.push(
store
.init_blob_info(weak_subj_block.slot())
Expand All @@ -581,13 +605,6 @@ where
.map_err(|e| format!("Failed to initialize data column info: {:?}", e))?,
);

// Store pruning checkpoint to prevent attempting to prune before the anchor state.
self.pending_io_batch
.push(store.pruning_checkpoint_store_op(Checkpoint {
root: weak_subj_block_root,
epoch: weak_subj_state.slot().epoch(E::slots_per_epoch()),
}));

let snapshot = BeaconSnapshot {
beacon_block_root: weak_subj_block_root,
beacon_block: Arc::new(weak_subj_block),
Expand Down
52 changes: 37 additions & 15 deletions beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::errors::BeaconChainError;
use crate::summaries_dag::{
BlockSummariesDAG, DAGBlockSummary, DAGStateSummaryV22, Error as SummariesDagError,
BlockSummariesDAG, DAGBlockSummary, DAGStateSummary, Error as SummariesDagError,
StateSummariesDAG,
};
use parking_lot::Mutex;
use slog::{debug, error, info, warn, Logger};
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::mem;
use std::sync::{mpsc, Arc};
use std::thread;
Expand Down Expand Up @@ -462,7 +462,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
new_finalized_checkpoint: Checkpoint,
log: &Logger,
) -> Result<PruningOutcome, BeaconChainError> {
let split_state_root = store.get_split_info().state_root;
let new_finalized_slot = new_finalized_checkpoint
.epoch
.start_slot(E::slots_per_epoch());
Expand Down Expand Up @@ -494,7 +493,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
.load_hot_state_summaries()?
.into_iter()
.map(|(state_root, summary)| (state_root, summary.into()))
.collect::<Vec<(Hash256, DAGStateSummaryV22)>>();
.collect::<Vec<(Hash256, DAGStateSummary)>>();

// De-duplicate block roots to reduce block reads below
let summary_block_roots = HashSet::<Hash256>::from_iter(
Expand Down Expand Up @@ -528,18 +527,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
})
.collect::<Result<Vec<_>, BeaconChainError>>()?;

let parent_block_roots = blocks
.iter()
.map(|(block_root, block)| (*block_root, block.parent_root))
.collect::<HashMap<Hash256, Hash256>>();

(
StateSummariesDAG::new_from_v22(
state_summaries,
parent_block_roots,
split_state_root,
)
.map_err(PruningError::SummariesDagError)?,
StateSummariesDAG::new(state_summaries),
BlockSummariesDAG::new(&blocks),
)
};
Expand Down Expand Up @@ -585,10 +574,17 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
.min()
.ok_or(PruningError::EmptyFinalizedBlocks)?;

// Compute the set of finalized state roots that we must keep to make the dynamic HDiff system
// work.
let required_finalized_diff_state_slots = store
.hierarchy_hot
.closest_layer_points(new_finalized_slot, store.hot_hdiff_start_slot()?);

// We don't know which blocks are shared among abandoned chains, so we buffer and delete
// everything in one fell swoop.
let mut blocks_to_prune: HashSet<Hash256> = HashSet::new();
let mut states_to_prune: HashSet<(Slot, Hash256)> = HashSet::new();
let mut kept_summaries_for_hdiff = vec![];

for (slot, summaries) in state_summaries_dag.summaries_by_slot_ascending() {
for (state_root, summary) in summaries {
Expand All @@ -597,6 +593,30 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
// Keep this state is the post state of a viable head, or a state advance from a
// viable head.
false
} else if required_finalized_diff_state_slots.contains(&slot) {
// Keep this state and diff as it's necessary for the finalized portion of the
// HDiff links. `required_finalized_diff_state_slots` tracks the set of slots on
// each diff layer, and by checking `newly_finalized_state_roots` which only
// keep those on the finalized canonical chain. Checking the state root ensures
// we avoid lingering forks.

// In the diagram below, `o` are diffs by slot that we must keep. In the prior
// finalized section there's only one chain so we preserve them unconditionally.
// For the newly finalized chain, we check which of is canonical and only keep
// those. Slots below `min_finalized_state_slot` we don't have canonical
// information so we assume they are part of the finalized pruned chain.
//
// /-----o----
// o-------o------/-------o----
if slot < newly_finalized_states_min_slot
|| newly_finalized_state_roots.contains(&state_root)
{
// Track kept summaries to debug hdiff inconsistencies with "Extra pruning information"
kept_summaries_for_hdiff.push((state_root, slot));
false
} else {
true
}
} else {
// Everything else, prune
true
Expand Down Expand Up @@ -650,6 +670,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
"newly_finalized_blocks_min_slot" => newly_finalized_blocks_min_slot,
"newly_finalized_state_roots" => newly_finalized_state_roots.len(),
"newly_finalized_states_min_slot" => newly_finalized_states_min_slot,
"required_finalized_diff_state_slots" => ?required_finalized_diff_state_slots,
"kept_summaries_for_hdiff" => ?kept_summaries_for_hdiff,
"state_summaries_count" => state_summaries_dag.summaries_count(),
"finalized_and_descendant_block_roots" => finalized_and_descendant_block_roots.len(),
"blocks_to_prune_count" => blocks_to_prune.len(),
Expand Down
9 changes: 9 additions & 0 deletions beacon_node/beacon_chain/src/schema_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
mod migration_schema_v20;
mod migration_schema_v21;
mod migration_schema_v22;
mod migration_schema_v23;

use crate::beacon_chain::BeaconChainTypes;
use slog::Logger;
Expand Down Expand Up @@ -59,6 +60,14 @@ pub fn migrate_schema<T: BeaconChainTypes>(
// bumped inside the upgrade_to_v22 fn
migration_schema_v22::upgrade_to_v22::<T>(db.clone(), genesis_state_root, log)
}
(SchemaVersion(22), SchemaVersion(23)) => {
let ops = migration_schema_v23::upgrade_to_v23::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(23), SchemaVersion(22)) => {
let ops = migration_schema_v23::downgrade_to_v22::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
// Anything else is an error.
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
target_version: to,
Expand Down
Loading
Loading