From 29ad4fcabe69a95e4491f2080e79ba9291aa9c9e Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Sat, 28 Dec 2024 21:33:09 +0800 Subject: [PATCH] Drop head tracker for summaries dag --- beacon_node/beacon_chain/src/beacon_chain.rs | 54 +- beacon_node/beacon_chain/src/builder.rs | 22 +- .../beacon_chain/src/canonical_head.rs | 1 - beacon_node/beacon_chain/src/head_tracker.rs | 214 ------- beacon_node/beacon_chain/src/lib.rs | 2 +- beacon_node/beacon_chain/src/migrate.rs | 557 +++++++++--------- .../src/persisted_beacon_chain.rs | 11 +- beacon_node/beacon_chain/src/summaries_dag.rs | 353 +++++++++++ beacon_node/store/src/hot_cold_store.rs | 12 + consensus/proto_array/src/proto_array.rs | 31 + .../src/proto_array_fork_choice.rs | 5 + 11 files changed, 708 insertions(+), 554 deletions(-) delete mode 100644 beacon_node/beacon_chain/src/head_tracker.rs create mode 100644 beacon_node/beacon_chain/src/summaries_dag.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 80766d57b33..428fbe57a27 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -33,7 +33,6 @@ use crate::events::ServerSentEventHandler; use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, PreparePayloadHandle}; use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult}; use crate::graffiti_calculator::GraffitiCalculator; -use crate::head_tracker::{HeadTracker, HeadTrackerReader, SszHeadTracker}; use crate::light_client_finality_update_verification::{ Error as LightClientFinalityUpdateError, VerifiedLightClientFinalityUpdate, }; @@ -456,8 +455,6 @@ pub struct BeaconChain { /// A handler for events generated by the beacon chain. This is only initialized when the /// HTTP server is enabled. pub event_handler: Option>, - /// Used to track the heads of the beacon chain. - pub(crate) head_tracker: Arc, /// Caches the attester shuffling for a given epoch and shuffling key root. pub shuffling_cache: RwLock, /// A cache of eth1 deposit data at epoch boundaries for deposit finalization @@ -618,50 +615,30 @@ impl BeaconChain { pub fn persist_head_and_fork_choice(&self) -> Result<(), Error> { let mut batch = vec![]; - let _head_timer = metrics::start_timer(&metrics::PERSIST_HEAD); - - // Hold a lock to head_tracker until it has been persisted to disk. Otherwise there's a race - // condition with the pruning thread which can result in a block present in the head tracker - // but absent in the DB. This inconsistency halts pruning and dramastically increases disk - // size. Ref: https://github.com/sigp/lighthouse/issues/4773 - let head_tracker = self.head_tracker.0.read(); - batch.push(self.persist_head_in_batch(&head_tracker)); - let _fork_choice_timer = metrics::start_timer(&metrics::PERSIST_FORK_CHOICE); batch.push(self.persist_fork_choice_in_batch()); self.store.hot_db.do_atomically(batch)?; - drop(head_tracker); Ok(()) } /// Return a `PersistedBeaconChain` without reference to a `BeaconChain`. - pub fn make_persisted_head( - genesis_block_root: Hash256, - head_tracker_reader: &HeadTrackerReader, - ) -> PersistedBeaconChain { + pub fn make_persisted_head(genesis_block_root: Hash256) -> PersistedBeaconChain { PersistedBeaconChain { _canonical_head_block_root: DUMMY_CANONICAL_HEAD_BLOCK_ROOT, genesis_block_root, - ssz_head_tracker: SszHeadTracker::from_map(head_tracker_reader), + ssz_head_tracker: <_>::default(), } } /// Return a database operation for writing the beacon chain head to disk. - pub fn persist_head_in_batch( - &self, - head_tracker_reader: &HeadTrackerReader, - ) -> KeyValueStoreOp { - Self::persist_head_in_batch_standalone(self.genesis_block_root, head_tracker_reader) + pub fn persist_head_in_batch(&self) -> KeyValueStoreOp { + Self::persist_head_in_batch_standalone(self.genesis_block_root) } - pub fn persist_head_in_batch_standalone( - genesis_block_root: Hash256, - head_tracker_reader: &HeadTrackerReader, - ) -> KeyValueStoreOp { - Self::make_persisted_head(genesis_block_root, head_tracker_reader) - .as_kv_store_op(BEACON_CHAIN_DB_KEY) + pub fn persist_head_in_batch_standalone(genesis_block_root: Hash256) -> KeyValueStoreOp { + Self::make_persisted_head(genesis_block_root).as_kv_store_op(BEACON_CHAIN_DB_KEY) } /// Load fork choice from disk, returning `None` if it isn't found. @@ -1405,12 +1382,21 @@ impl BeaconChain { /// /// Returns `(block_root, block_slot)`. pub fn heads(&self) -> Vec<(Hash256, Slot)> { - self.head_tracker.heads() + let head_slot = self.canonical_head.cached_head().head_slot(); + self.canonical_head + .fork_choice_read_lock() + .proto_array() + .viable_heads::(head_slot) + .iter() + .map(|node| (node.root, node.slot)) + .collect() } /// Only used in tests. pub fn knows_head(&self, block_hash: &SignedBeaconBlockHash) -> bool { - self.head_tracker.contains_head((*block_hash).into()) + self.heads() + .iter() + .any(|head| head.0 == Into::::into(*block_hash)) } /// Returns the `BeaconState` at the given slot. @@ -3989,9 +3975,6 @@ impl BeaconChain { // about it. let block_time_imported = timestamp_now(); - let parent_root = block.parent_root(); - let slot = block.slot(); - let current_eth1_finalization_data = Eth1FinalizationData { eth1_data: state.eth1_data().clone(), eth1_deposit_index: state.eth1_deposit_index(), @@ -4012,9 +3995,6 @@ impl BeaconChain { }); } - self.head_tracker - .register_block(block_root, parent_root, slot); - metrics::stop_timer(db_write_timer); metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 9d99ff9d8e0..ab3bffb0ff3 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -8,7 +8,6 @@ use crate::eth1_finalization_cache::Eth1FinalizationCache; use crate::fork_choice_signal::ForkChoiceSignalTx; use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary}; use crate::graffiti_calculator::{GraffitiCalculator, GraffitiOrigin}; -use crate::head_tracker::HeadTracker; use crate::light_client_server_cache::LightClientServerCache; use crate::migrate::{BackgroundMigrator, MigratorConfig}; use crate::observed_data_sidecars::ObservedDataSidecars; @@ -91,7 +90,6 @@ pub struct BeaconChainBuilder { slot_clock: Option, shutdown_sender: Option>, light_client_server_tx: Option>>, - head_tracker: Option, validator_pubkey_cache: Option>, spec: Arc, chain_config: ChainConfig, @@ -135,7 +133,6 @@ where slot_clock: None, shutdown_sender: None, light_client_server_tx: None, - head_tracker: None, validator_pubkey_cache: None, spec: Arc::new(E::default_spec()), chain_config: ChainConfig::default(), @@ -324,10 +321,6 @@ where self.genesis_block_root = Some(chain.genesis_block_root); self.genesis_state_root = Some(genesis_block.state_root()); - self.head_tracker = Some( - HeadTracker::from_ssz_container(&chain.ssz_head_tracker) - .map_err(|e| format!("Failed to decode head tracker for database: {:?}", e))?, - ); self.validator_pubkey_cache = Some(pubkey_cache); self.fork_choice = Some(fork_choice); @@ -724,7 +717,6 @@ where .genesis_state_root .ok_or("Cannot build without a genesis state root")?; let validator_monitor_config = self.validator_monitor_config.unwrap_or_default(); - let head_tracker = Arc::new(self.head_tracker.unwrap_or_default()); let beacon_proposer_cache: Arc> = <_>::default(); let mut validator_monitor = ValidatorMonitor::new( @@ -769,8 +761,6 @@ where &log, )?; - // Update head tracker. - head_tracker.register_block(block_root, block.parent_root(), block.slot()); (block_root, block, true) } Err(e) => return Err(descriptive_db_error("head block", &e)), @@ -826,12 +816,7 @@ where })?; let migrator_config = self.store_migrator_config.unwrap_or_default(); - let store_migrator = BackgroundMigrator::new( - store.clone(), - migrator_config, - genesis_block_root, - log.clone(), - ); + let store_migrator = BackgroundMigrator::new(store.clone(), migrator_config, log.clone()); if let Some(slot) = slot_clock.now() { validator_monitor.process_valid_state( @@ -856,11 +841,10 @@ where // // This *must* be stored before constructing the `BeaconChain`, so that its `Drop` instance // doesn't write a `PersistedBeaconChain` without the rest of the batch. - let head_tracker_reader = head_tracker.0.read(); self.pending_io_batch.push(BeaconChain::< Witness, >::persist_head_in_batch_standalone( - genesis_block_root, &head_tracker_reader + genesis_block_root )); self.pending_io_batch.push(BeaconChain::< Witness, @@ -871,7 +855,6 @@ where .hot_db .do_atomically(self.pending_io_batch) .map_err(|e| format!("Error writing chain & metadata to disk: {:?}", e))?; - drop(head_tracker_reader); let genesis_validators_root = head_snapshot.beacon_state.genesis_validators_root(); let genesis_time = head_snapshot.beacon_state.genesis_time(); @@ -952,7 +935,6 @@ where fork_choice_signal_tx, fork_choice_signal_rx, event_handler: self.event_handler, - head_tracker, shuffling_cache: RwLock::new(ShufflingCache::new( shuffling_cache_size, head_shuffling_ids, diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index 4f92f5ec8f9..9ed8c9c7430 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -992,7 +992,6 @@ impl BeaconChain { self.store_migrator.process_finalization( new_finalized_state_root.into(), new_view.finalized_checkpoint, - self.head_tracker.clone(), )?; // Prune blobs in the background. diff --git a/beacon_node/beacon_chain/src/head_tracker.rs b/beacon_node/beacon_chain/src/head_tracker.rs deleted file mode 100644 index 9c06ef33a18..00000000000 --- a/beacon_node/beacon_chain/src/head_tracker.rs +++ /dev/null @@ -1,214 +0,0 @@ -use parking_lot::{RwLock, RwLockReadGuard}; -use ssz_derive::{Decode, Encode}; -use std::collections::HashMap; -use types::{Hash256, Slot}; - -#[derive(Debug, PartialEq)] -pub enum Error { - MismatchingLengths { roots_len: usize, slots_len: usize }, -} - -/// Maintains a list of `BeaconChain` head block roots and slots. -/// -/// Each time a new block is imported, it should be applied to the `Self::register_block` function. -/// In order for this struct to be effective, every single block that is imported must be -/// registered here. -#[derive(Default, Debug)] -pub struct HeadTracker(pub RwLock>); - -pub type HeadTrackerReader<'a> = RwLockReadGuard<'a, HashMap>; - -impl HeadTracker { - /// Register a block with `Self`, so it may or may not be included in a `Self::heads` call. - /// - /// This function assumes that no block is imported without its parent having already been - /// imported. It cannot detect an error if this is not the case, it is the responsibility of - /// the upstream user. - pub fn register_block(&self, block_root: Hash256, parent_root: Hash256, slot: Slot) { - let mut map = self.0.write(); - map.remove(&parent_root); - map.insert(block_root, slot); - } - - /// Returns true iff `block_root` is a recognized head. - pub fn contains_head(&self, block_root: Hash256) -> bool { - self.0.read().contains_key(&block_root) - } - - /// Returns the list of heads in the chain. - pub fn heads(&self) -> Vec<(Hash256, Slot)> { - self.0 - .read() - .iter() - .map(|(root, slot)| (*root, *slot)) - .collect() - } - - /// Returns a `SszHeadTracker`, which contains all necessary information to restore the state - /// of `Self` at some later point. - /// - /// Should ONLY be used for tests, due to the potential for database races. - /// - /// See - #[cfg(test)] - pub fn to_ssz_container(&self) -> SszHeadTracker { - SszHeadTracker::from_map(&self.0.read()) - } - - /// Creates a new `Self` from the given `SszHeadTracker`, restoring `Self` to the same state of - /// the `Self` that created the `SszHeadTracker`. - pub fn from_ssz_container(ssz_container: &SszHeadTracker) -> Result { - let roots_len = ssz_container.roots.len(); - let slots_len = ssz_container.slots.len(); - - if roots_len != slots_len { - Err(Error::MismatchingLengths { - roots_len, - slots_len, - }) - } else { - let map = ssz_container - .roots - .iter() - .zip(ssz_container.slots.iter()) - .map(|(root, slot)| (*root, *slot)) - .collect::>(); - - Ok(Self(RwLock::new(map))) - } - } -} - -impl PartialEq for HeadTracker { - fn eq(&self, other: &HeadTracker) -> bool { - *self.0.read() == *other.0.read() - } -} - -/// Helper struct that is used to encode/decode the state of the `HeadTracker` as SSZ bytes. -/// -/// This is used when persisting the state of the `BeaconChain` to disk. -#[derive(Encode, Decode, Clone)] -pub struct SszHeadTracker { - roots: Vec, - slots: Vec, -} - -impl SszHeadTracker { - pub fn from_map(map: &HashMap) -> Self { - let (roots, slots) = map.iter().map(|(hash, slot)| (*hash, *slot)).unzip(); - SszHeadTracker { roots, slots } - } -} - -#[cfg(test)] -mod test { - use super::*; - use ssz::{Decode, Encode}; - use types::{BeaconBlock, EthSpec, FixedBytesExtended, MainnetEthSpec}; - - type E = MainnetEthSpec; - - #[test] - fn block_add() { - let spec = &E::default_spec(); - - let head_tracker = HeadTracker::default(); - - for i in 0..16 { - let mut block: BeaconBlock = BeaconBlock::empty(spec); - let block_root = Hash256::from_low_u64_be(i); - - *block.slot_mut() = Slot::new(i); - *block.parent_root_mut() = if i == 0 { - Hash256::random() - } else { - Hash256::from_low_u64_be(i - 1) - }; - - head_tracker.register_block(block_root, block.parent_root(), block.slot()); - } - - assert_eq!( - head_tracker.heads(), - vec![(Hash256::from_low_u64_be(15), Slot::new(15))], - "should only have one head" - ); - - let mut block: BeaconBlock = BeaconBlock::empty(spec); - let block_root = Hash256::from_low_u64_be(42); - *block.slot_mut() = Slot::new(15); - *block.parent_root_mut() = Hash256::from_low_u64_be(14); - head_tracker.register_block(block_root, block.parent_root(), block.slot()); - - let heads = head_tracker.heads(); - - assert_eq!(heads.len(), 2, "should only have two heads"); - assert!( - heads - .iter() - .any(|(root, slot)| *root == Hash256::from_low_u64_be(15) && *slot == Slot::new(15)), - "should contain first head" - ); - assert!( - heads - .iter() - .any(|(root, slot)| *root == Hash256::from_low_u64_be(42) && *slot == Slot::new(15)), - "should contain second head" - ); - } - - #[test] - fn empty_round_trip() { - let non_empty = HeadTracker::default(); - for i in 0..16 { - non_empty.0.write().insert(Hash256::random(), Slot::new(i)); - } - let bytes = non_empty.to_ssz_container().as_ssz_bytes(); - - assert_eq!( - HeadTracker::from_ssz_container( - &SszHeadTracker::from_ssz_bytes(&bytes).expect("should decode") - ), - Ok(non_empty), - "non_empty should pass round trip" - ); - } - - #[test] - fn non_empty_round_trip() { - let non_empty = HeadTracker::default(); - for i in 0..16 { - non_empty.0.write().insert(Hash256::random(), Slot::new(i)); - } - let bytes = non_empty.to_ssz_container().as_ssz_bytes(); - - assert_eq!( - HeadTracker::from_ssz_container( - &SszHeadTracker::from_ssz_bytes(&bytes).expect("should decode") - ), - Ok(non_empty), - "non_empty should pass round trip" - ); - } - - #[test] - fn bad_length() { - let container = SszHeadTracker { - roots: vec![Hash256::random()], - slots: vec![], - }; - let bytes = container.as_ssz_bytes(); - - assert_eq!( - HeadTracker::from_ssz_container( - &SszHeadTracker::from_ssz_bytes(&bytes).expect("should decode") - ), - Err(Error::MismatchingLengths { - roots_len: 1, - slots_len: 0 - }), - "should fail decoding with bad lengths" - ); - } -} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index d9728b9fd41..66d4fc26f98 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -32,7 +32,6 @@ pub mod fetch_blobs; pub mod fork_choice_signal; pub mod fork_revert; pub mod graffiti_calculator; -mod head_tracker; pub mod historical_blocks; pub mod kzg_utils; pub mod light_client_finality_update_verification; @@ -54,6 +53,7 @@ pub mod proposer_prep_service; pub mod schema_change; pub mod shuffling_cache; pub mod state_advance_timer; +pub mod summaries_dag; pub mod sync_committee_rewards; pub mod sync_committee_verification; pub mod test_utils; diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index bc4b8e1ed86..969f4206f36 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -1,7 +1,8 @@ -use crate::beacon_chain::BEACON_CHAIN_DB_KEY; use crate::errors::BeaconChainError; -use crate::head_tracker::{HeadTracker, SszHeadTracker}; -use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT}; +use crate::summaries_dag::{ + BlockSummariesDAG, DAGBlockSummary, DAGStateSummaryV22, Error as SummariesDagError, + StateSummariesDAG, +}; use parking_lot::Mutex; use slog::{debug, error, info, warn, Logger}; use std::collections::{HashMap, HashSet}; @@ -10,13 +11,9 @@ use std::sync::{mpsc, Arc}; use std::thread; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use store::hot_cold_store::{migrate_database, HotColdDBError}; -use store::iter::RootsIterator; -use store::{Error, ItemStore, StoreItem, StoreOp}; +use store::{Error, ItemStore, StoreOp}; pub use store::{HotColdDB, MemoryStore}; -use types::{ - BeaconState, BeaconStateError, BeaconStateHash, Checkpoint, Epoch, EthSpec, FixedBytesExtended, - Hash256, SignedBeaconBlockHash, Slot, -}; +use types::{BeaconState, BeaconStateHash, Checkpoint, Epoch, EthSpec, Hash256, Slot}; /// Compact at least this frequently, finalization permitting (7 days). const MAX_COMPACTION_PERIOD_SECONDS: u64 = 604800; @@ -42,8 +39,6 @@ pub struct BackgroundMigrator, Cold: ItemStore> prev_migration: Arc>, #[allow(clippy::type_complexity)] tx_thread: Option, thread::JoinHandle<()>)>>, - /// Genesis block root, for persisting the `PersistedBeaconChain`. - genesis_block_root: Hash256, log: Logger, } @@ -90,7 +85,7 @@ pub struct PrevMigration { pub enum PruningOutcome { /// The pruning succeeded and updated the pruning checkpoint from `old_finalized_checkpoint`. Successful { - old_finalized_checkpoint: Checkpoint, + old_finalized_checkpoint_epoch: Epoch, }, /// The run was aborted because the new finalized checkpoint is older than the previous one. OutOfOrderFinalization { @@ -117,6 +112,11 @@ pub enum PruningError { }, UnexpectedEqualStateRoots, UnexpectedUnequalStateRoots, + MissingSummaryForFinalizedCheckpoint(Hash256), + MissingBlindedBlock(Hash256), + SummariesDagError(SummariesDagError), + EmptyFinalizedStates, + EmptyFinalizedBlocks, } /// Message sent to the migration thread containing the information it needs to run. @@ -129,19 +129,12 @@ pub enum Notification { pub struct FinalizationNotification { finalized_state_root: BeaconStateHash, finalized_checkpoint: Checkpoint, - head_tracker: Arc, prev_migration: Arc>, - genesis_block_root: Hash256, } impl, Cold: ItemStore> BackgroundMigrator { /// Create a new `BackgroundMigrator` and spawn its thread if necessary. - pub fn new( - db: Arc>, - config: MigratorConfig, - genesis_block_root: Hash256, - log: Logger, - ) -> Self { + pub fn new(db: Arc>, config: MigratorConfig, log: Logger) -> Self { // Estimate last migration run from DB split slot. let prev_migration = Arc::new(Mutex::new(PrevMigration { epoch: db.get_split_slot().epoch(E::slots_per_epoch()), @@ -156,7 +149,6 @@ impl, Cold: ItemStore> BackgroundMigrator, Cold: ItemStore> BackgroundMigrator, ) -> Result<(), BeaconChainError> { let notif = FinalizationNotification { finalized_state_root, finalized_checkpoint, - head_tracker, prev_migration: self.prev_migration.clone(), - genesis_block_root: self.genesis_block_root, }; // Send to background thread if configured, otherwise run in foreground. @@ -333,18 +322,40 @@ impl, Cold: ItemStore> BackgroundMigrator {} + Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => { + debug!( + log, + "Database migration postponed, unaligned finalized block"; + "slot" => slot.as_u64() + ); + } + Err(e) => { + warn!( + log, + "Database migration failed"; + "error" => format!("{:?}", e) + ); + return; + } + }; + + let old_finalized_checkpoint_epoch = match Self::prune_hot_db( + db.clone(), + finalized_state_root.into(), &finalized_state, notif.finalized_checkpoint, - notif.genesis_block_root, log, ) { Ok(PruningOutcome::Successful { - old_finalized_checkpoint, - }) => old_finalized_checkpoint, + old_finalized_checkpoint_epoch, + }) => old_finalized_checkpoint_epoch, Ok(PruningOutcome::DeferredConcurrentHeadTrackerMutation) => { warn!( log, @@ -367,31 +378,7 @@ impl, Cold: ItemStore> BackgroundMigrator { - warn!(log, "Block pruning failed"; "error" => ?e); - return; - } - }; - - match migrate_database( - db.clone(), - finalized_state_root.into(), - finalized_block_root, - &finalized_state, - ) { - Ok(()) => {} - Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => { - debug!( - log, - "Database migration postponed, unaligned finalized block"; - "slot" => slot.as_u64() - ); - } - Err(e) => { - warn!( - log, - "Database migration failed"; - "error" => format!("{:?}", e) - ); + warn!(log, "Hot DB pruning failed"; "error" => ?e); return; } }; @@ -399,7 +386,7 @@ impl, Cold: ItemStore> BackgroundMigrator, Cold: ItemStore> BackgroundMigrator>, - head_tracker: Arc, - new_finalized_state_hash: BeaconStateHash, + new_finalized_state_hash: Hash256, new_finalized_state: &BeaconState, new_finalized_checkpoint: Checkpoint, - genesis_block_root: Hash256, log: &Logger, ) -> Result { - let old_finalized_checkpoint = - store - .load_pruning_checkpoint()? - .unwrap_or_else(|| Checkpoint { - epoch: Epoch::new(0), - root: Hash256::zero(), - }); - - let old_finalized_slot = old_finalized_checkpoint - .epoch - .start_slot(E::slots_per_epoch()); + let split_state_root = store.get_split_info().state_root; let new_finalized_slot = new_finalized_checkpoint .epoch .start_slot(E::slots_per_epoch()); - let new_finalized_block_hash = new_finalized_checkpoint.root.into(); // The finalized state must be for the epoch boundary slot, not the slot of the finalized // block. @@ -504,205 +477,190 @@ impl, Cold: ItemStore> BackgroundMigrator new_finalized_slot { - return Ok(PruningOutcome::OutOfOrderFinalization { - old_finalized_checkpoint, - new_finalized_checkpoint, - }); - } + // TODO(hdiff): if we remove the check of `old_finalized_slot > new_finalized_slot` can we + // ensure that a single pruning operation is running at once? If a pruning run is triggered + // with an old finalized checkpoint it can derive a stale hdiff set of slots and delete + // future ones that are necessary breaking the DB. debug!( log, "Starting database pruning"; - "old_finalized_epoch" => old_finalized_checkpoint.epoch, - "new_finalized_epoch" => new_finalized_checkpoint.epoch, + "new_finalized_checkpoint" => ?new_finalized_checkpoint, + "new_finalized_state_hash" => ?new_finalized_state_hash, ); - // For each slot between the new finalized checkpoint and the old finalized checkpoint, - // collect the beacon block root and state root of the canonical chain. - let newly_finalized_chain: HashMap = - std::iter::once(Ok(( - new_finalized_slot, - (new_finalized_block_hash, new_finalized_state_hash), - ))) - .chain(RootsIterator::new(&store, new_finalized_state).map(|res| { - res.map(|(block_root, state_root, slot)| { - (slot, (block_root.into(), state_root.into())) - }) - })) - .take_while(|res| { - res.as_ref() - .map_or(true, |(slot, _)| *slot >= old_finalized_slot) - }) - .collect::>()?; - // We don't know which blocks are shared among abandoned chains, so we buffer and delete - // everything in one fell swoop. - let mut abandoned_blocks: HashSet = HashSet::new(); - let mut abandoned_states: HashSet<(Slot, BeaconStateHash)> = HashSet::new(); - let mut abandoned_heads: HashSet = HashSet::new(); + let (state_summaries_dag, block_summaries_dag) = { + let state_summaries = store + .load_hot_state_summaries()? + .into_iter() + .map(|(state_root, summary)| (state_root, summary.into())) + .collect::>(); + + // De-duplicate block roots to reduce block reads below + let summary_block_roots = HashSet::::from_iter( + state_summaries + .iter() + .map(|(_, summary)| summary.latest_block_root), + ); - let heads = head_tracker.heads(); - debug!( - log, - "Extra pruning information"; - "old_finalized_root" => format!("{:?}", old_finalized_checkpoint.root), - "new_finalized_root" => format!("{:?}", new_finalized_checkpoint.root), - "head_count" => heads.len(), - ); + // Sanity check, there is at least one summary with the new finalized block root + if !summary_block_roots.contains(&new_finalized_checkpoint.root) { + return Err(BeaconChainError::PruningError( + PruningError::MissingSummaryForFinalizedCheckpoint( + new_finalized_checkpoint.root, + ), + )); + } - for (head_hash, head_slot) in heads { - // Load head block. If it fails with a decode error, it's likely a reverted block, - // so delete it from the head tracker but leave it and its states in the database - // This is suboptimal as it wastes disk space, but it's difficult to fix. A re-sync - // can be used to reclaim the space. - let head_state_root = match store.get_blinded_block(&head_hash) { - Ok(Some(block)) => block.state_root(), - Ok(None) => { - return Err(BeaconStateError::MissingBeaconBlock(head_hash.into()).into()) - } - Err(Error::SszDecodeError(e)) => { - warn!( - log, - "Forgetting invalid head block"; - "block_root" => ?head_hash, - "error" => ?e, - ); - abandoned_heads.insert(head_hash); - continue; - } - Err(e) => return Err(e.into()), - }; + let blocks = summary_block_roots + .iter() + .map(|block_root| { + let block = store + .get_blinded_block(block_root)? + .ok_or(PruningError::MissingBlindedBlock(*block_root))?; + Ok(( + *block_root, + DAGBlockSummary { + slot: block.slot(), + parent_root: block.parent_root(), + }, + )) + }) + .collect::, BeaconChainError>>()?; + + let parent_block_roots = blocks + .iter() + .map(|(block_root, block)| (*block_root, block.parent_root)) + .collect::>(); + + ( + StateSummariesDAG::new_from_v22( + state_summaries, + parent_block_roots, + split_state_root, + ) + .map_err(PruningError::SummariesDagError)?, + BlockSummariesDAG::new(&blocks), + ) + }; - let mut potentially_abandoned_head = Some(head_hash); - let mut potentially_abandoned_blocks = vec![]; - - // Iterate backwards from this head, staging blocks and states for deletion. - let iter = std::iter::once(Ok((head_hash, head_state_root, head_slot))) - .chain(RootsIterator::from_block(&store, head_hash)?); - - for maybe_tuple in iter { - let (block_root, state_root, slot) = maybe_tuple?; - let block_root = SignedBeaconBlockHash::from(block_root); - let state_root = BeaconStateHash::from(state_root); - - match newly_finalized_chain.get(&slot) { - // If there's no information about a slot on the finalized chain, then - // it should be because it's ahead of the new finalized slot. Stage - // the fork's block and state for possible deletion. - None => { - if slot > new_finalized_slot { - potentially_abandoned_blocks.push(( - slot, - Some(block_root), - Some(state_root), - )); - } else if slot >= old_finalized_slot { - return Err(PruningError::MissingInfoForCanonicalChain { slot }.into()); - } else { - // We must assume here any candidate chains include the old finalized - // checkpoint, i.e. there aren't any forks starting at a block that is a - // strict ancestor of old_finalized_checkpoint. - warn!( - log, - "Found a chain that should already have been pruned"; - "head_block_root" => format!("{:?}", head_hash), - "head_slot" => head_slot, - ); - potentially_abandoned_head.take(); - break; - } - } - Some((finalized_block_root, finalized_state_root)) => { - // This fork descends from a newly finalized block, we can stop. - if block_root == *finalized_block_root { - // Sanity check: if the slot and block root match, then the - // state roots should match too. - if state_root != *finalized_state_root { - return Err(PruningError::UnexpectedUnequalStateRoots.into()); - } + // From the DAG compute the list of roots that descend from finalized root up to the + // split slot. + + let finalized_and_descendant_block_roots = HashSet::::from_iter( + std::iter::once(new_finalized_checkpoint.root).chain( + // Note: The sanity check above for existance of at least one summary with + // new_finalized_checkpoint.root should ensure that this call never errors + block_summaries_dag + .descendant_block_roots_of(&new_finalized_checkpoint.root) + .map_err(PruningError::SummariesDagError)?, + ), + ); - // If the fork descends from the whole finalized chain, - // do not prune it. Otherwise continue to delete all - // of the blocks and states that have been staged for - // deletion so far. - if slot == new_finalized_slot { - potentially_abandoned_blocks.clear(); - potentially_abandoned_head.take(); - } - // If there are skipped slots on the fork to be pruned, then - // we will have just staged the common block for deletion. - // Unstage it. - else { - for (_, block_root, _) in - potentially_abandoned_blocks.iter_mut().rev() - { - if block_root.as_ref() == Some(finalized_block_root) { - *block_root = None; - } else { - break; - } - } - } - break; - } else { - if state_root == *finalized_state_root { - return Err(PruningError::UnexpectedEqualStateRoots.into()); - } - potentially_abandoned_blocks.push(( - slot, - Some(block_root), - Some(state_root), - )); - } - } - } - } + // Note: ancestors_of includes the finalized state root + let newly_finalized_state_summaries = state_summaries_dag + .ancestors_of(new_finalized_state_hash) + .map_err(PruningError::SummariesDagError)?; + let newly_finalized_state_roots = newly_finalized_state_summaries + .iter() + .map(|(root, _)| *root) + .collect::>(); + let newly_finalized_states_min_slot = *newly_finalized_state_summaries + .iter() + .map(|(_, slot)| slot) + .min() + .ok_or(PruningError::EmptyFinalizedStates)?; + + // Note: ancestors_of includes the finalized block + let newly_finalized_blocks = block_summaries_dag + .ancestors_of(new_finalized_checkpoint.root) + .map_err(PruningError::SummariesDagError)?; + let newly_finalized_block_roots = newly_finalized_blocks + .iter() + .map(|(root, _)| *root) + .collect::>(); + let newly_finalized_blocks_min_slot = *newly_finalized_blocks + .iter() + .map(|(_, slot)| slot) + .min() + .ok_or(PruningError::EmptyFinalizedBlocks)?; - if let Some(abandoned_head) = potentially_abandoned_head { - debug!( - log, - "Pruning head"; - "head_block_root" => format!("{:?}", abandoned_head), - "head_slot" => head_slot, - ); - abandoned_heads.insert(abandoned_head); - abandoned_blocks.extend( - potentially_abandoned_blocks - .iter() - .filter_map(|(_, maybe_block_hash, _)| *maybe_block_hash), - ); - abandoned_states.extend(potentially_abandoned_blocks.iter().filter_map( - |(slot, _, maybe_state_hash)| maybe_state_hash.map(|sr| (*slot, sr)), - )); + // We don't know which blocks are shared among abandoned chains, so we buffer and delete + // everything in one fell swoop. + let mut blocks_to_prune: HashSet = HashSet::new(); + let mut states_to_prune: HashSet<(Slot, Hash256)> = HashSet::new(); + + for (slot, summaries) in state_summaries_dag.summaries_by_slot_ascending() { + for (state_root, summary) in summaries { + let should_prune = + if finalized_and_descendant_block_roots.contains(&summary.latest_block_root) { + // Keep this state is the post state of a viable head, or a state advance from a + // viable head. + false + } else { + // Everything else, prune + true + }; + + if should_prune { + // States are migrated into the cold DB in the migrate step. All hot states + // prior to finalized can be pruned from the hot DB columns + states_to_prune.insert((slot, state_root)); + } } } - // Update the head tracker before the database, so that we maintain the invariant - // that a block present in the head tracker is present in the database. - // See https://github.com/sigp/lighthouse/issues/1557 - let mut head_tracker_lock = head_tracker.0.write(); - - // Check that all the heads to be deleted are still present. The absence of any - // head indicates a race, that will likely resolve itself, so we defer pruning until - // later. - for head_hash in &abandoned_heads { - if !head_tracker_lock.contains_key(head_hash) { - return Ok(PruningOutcome::DeferredConcurrentHeadTrackerMutation); + for (block_root, slot) in block_summaries_dag.iter() { + // Blocks both finalized and unfinalized are in the same DB column. We must only + // prune blocks from abandoned forks. Deriving block pruning from state + // summaries is tricky since now we keep some hot state summaries beyond + // finalization. We will only prune blocks that still have an associated hot + // state summary, are above prior finalization and not in the canonical chain. + let should_prune = if finalized_and_descendant_block_roots.contains(&block_root) { + // Keep unfinalized blocks descendant of finalized + finalized block itself + false + } else if newly_finalized_block_roots.contains(&block_root) { + // Keep recently finalized blocks + false + } else if slot < newly_finalized_blocks_min_slot + || newly_finalized_block_roots.contains(&block_root) + { + // Keep recently finalized blocks that we know are canonical. Blocks with slots < + // that `newly_finalized_blocks_min_slot` we don't have canonical information so we + // assume they are part of the finalized pruned chain + // + // Pruning those risks breaking the DB by deleting canonical blocks once the HDiff + // grid advances. If the pruning routine is correct this condition should never hit. + false + } else { + // Everything else, prune + true + }; + + if should_prune { + blocks_to_prune.insert(block_root); } } - // Then remove them for real. - for head_hash in abandoned_heads { - head_tracker_lock.remove(&head_hash); - } + debug!( + log, + "Extra pruning information"; + "new_finalized_checkpoint" => ?new_finalized_checkpoint, + "newly_finalized_blocks" => newly_finalized_blocks.len(), + "newly_finalized_blocks_min_slot" => newly_finalized_blocks_min_slot, + "newly_finalized_state_roots" => newly_finalized_state_roots.len(), + "newly_finalized_states_min_slot" => newly_finalized_states_min_slot, + "state_summaries_count" => state_summaries_dag.summaries_count(), + "finalized_and_descendant_block_roots" => finalized_and_descendant_block_roots.len(), + "blocks_to_prune_count" => blocks_to_prune.len(), + "states_to_prune_count" => states_to_prune.len(), + "blocks_to_prune" => ?blocks_to_prune, + "states_to_prune" => ?states_to_prune, + ); - let mut batch: Vec> = abandoned_blocks + let mut batch: Vec> = blocks_to_prune .into_iter() - .map(Into::into) - .flat_map(|block_root: Hash256| { + .flat_map(|block_root| { [ StoreOp::DeleteBlock(block_root), StoreOp::DeleteExecutionPayload(block_root), @@ -710,43 +668,84 @@ impl, Cold: ItemStore> BackgroundMigrator>, + ) { + for (block_root, _) in finalized_blocks { + // Delete the execution payload if payload pruning is enabled. At a skipped slot we may + // delete the payload for the finalized block itself, but that's OK as we only guarantee + // that payloads are present for slots >= the split slot. The payload fetching code is also + // forgiving of missing payloads. + hot_db_ops.push(StoreOp::DeleteExecutionPayload(*block_root)); + } + } + + fn prune_non_checkpoint_sync_committee_branches( + finalized_blocks_desc: &[(Hash256, Slot)], + hot_db_ops: &mut Vec>, + ) { + let mut epoch_boundary_blocks = HashSet::new(); + let mut non_checkpoint_block_roots = HashSet::new(); + + // Then, iterate states in slot ascending order, as they are stored wrt previous states. + for (block_root, slot) in finalized_blocks_desc.iter().rev() { + // At a missed slot, `state_root_iter` will return the block root + // from the previous non-missed slot. This ensures that the block root at an + // epoch boundary is always a checkpoint block root. We keep track of block roots + // at epoch boundaries by storing them in the `epoch_boundary_blocks` hash set. + // We then ensure that block roots at the epoch boundary aren't included in the + // `non_checkpoint_block_roots` hash set. + if *slot % E::slots_per_epoch() == 0 { + epoch_boundary_blocks.insert(block_root); + } else { + non_checkpoint_block_roots.insert(block_root); + } + + if epoch_boundary_blocks.contains(&block_root) { + non_checkpoint_block_roots.remove(&block_root); + } + } + + // Prune sync committee branch data for all non checkpoint block roots. + // Note that `non_checkpoint_block_roots` should only contain non checkpoint block roots + // as long as `finalized_state.slot()` is at an epoch boundary. If this were not the case + // we risk the chance of pruning a `sync_committee_branch` for a checkpoint block root. + // E.g. if `current_split_slot` = (Epoch A slot 0) and `finalized_state.slot()` = (Epoch C slot 31) + // and (Epoch D slot 0) is a skipped slot, we will have pruned a `sync_committee_branch` + // for a checkpoint block root. + non_checkpoint_block_roots + .into_iter() + .for_each(|block_root| { + hot_db_ops.push(StoreOp::DeleteSyncCommitteeBranch(*block_root)); + }); + } + /// Compact the database if it has been more than `COMPACTION_PERIOD_SECONDS` since it /// was last compacted. pub fn run_compaction( diff --git a/beacon_node/beacon_chain/src/persisted_beacon_chain.rs b/beacon_node/beacon_chain/src/persisted_beacon_chain.rs index adb68def0df..d834bd1af6a 100644 --- a/beacon_node/beacon_chain/src/persisted_beacon_chain.rs +++ b/beacon_node/beacon_chain/src/persisted_beacon_chain.rs @@ -1,8 +1,7 @@ -use crate::head_tracker::SszHeadTracker; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use store::{DBColumn, Error as StoreError, StoreItem}; -use types::Hash256; +use types::{Hash256, Slot}; /// Dummy value to use for the canonical head block root, see below. pub const DUMMY_CANONICAL_HEAD_BLOCK_ROOT: Hash256 = Hash256::repeat_byte(0xff); @@ -18,9 +17,17 @@ pub struct PersistedBeaconChain { /// https://github.com/sigp/lighthouse/issues/1784 pub _canonical_head_block_root: Hash256, pub genesis_block_root: Hash256, + /// DEPRECATED pub ssz_head_tracker: SszHeadTracker, } +/// DEPRECATED +#[derive(Encode, Decode, Clone, Default)] +pub struct SszHeadTracker { + roots: Vec, + slots: Vec, +} + impl StoreItem for PersistedBeaconChain { fn db_column() -> DBColumn { DBColumn::BeaconChain diff --git a/beacon_node/beacon_chain/src/summaries_dag.rs b/beacon_node/beacon_chain/src/summaries_dag.rs new file mode 100644 index 00000000000..0ec1cae7bcb --- /dev/null +++ b/beacon_node/beacon_chain/src/summaries_dag.rs @@ -0,0 +1,353 @@ +use std::{ + cmp::Ordering, + collections::{BTreeMap, HashMap}, +}; +use store::HotStateSummary; +use types::{Hash256, Slot}; + +#[derive(Debug, Clone, Copy)] +pub struct DAGStateSummary { + pub slot: Slot, + pub latest_block_root: Hash256, + pub previous_state_root: Hash256, +} + +#[derive(Debug, Clone, Copy)] +pub struct DAGStateSummaryV22 { + pub slot: Slot, + pub latest_block_root: Hash256, +} + +pub struct StateSummariesDAG { + // state_root -> state_summary + state_summaries_by_state_root: HashMap, + // block_root -> state slot -> (state_root, state summary) + state_summaries_by_block_root: HashMap>, +} + +#[derive(Debug)] +pub enum Error { + MissingParentBlockRoot(Hash256), + MissingStateSummary(Hash256), + MissingStateSummaryAtSlot(Hash256, Slot), + MissingChildBlockRoot(Hash256), + MissingBlock(Hash256), + RequestedSlotAboveSummary(Hash256, Slot), +} + +impl StateSummariesDAG { + pub fn new(state_summaries: Vec<(Hash256, DAGStateSummary)>) -> Self { + // Group them by latest block root, and sorted state slot + let mut state_summaries_by_block_root = HashMap::<_, BTreeMap<_, _>>::new(); + let mut state_summaries_by_state_root = HashMap::new(); + for (state_root, summary) in state_summaries.into_iter() { + let summaries = state_summaries_by_block_root + .entry(summary.latest_block_root) + .or_default(); + + // TODO(hdiff): error if existing + summaries.insert(summary.slot, (state_root, summary)); + + state_summaries_by_state_root.insert(state_root, summary); + } + + Self { + state_summaries_by_state_root, + state_summaries_by_block_root, + } + } + + pub fn new_from_v22( + state_summaries_v22: Vec<(Hash256, DAGStateSummaryV22)>, + parent_block_roots: HashMap, + base_root: Hash256, + ) -> Result { + // Group them by latest block root, and sorted state slot + let mut state_summaries_by_block_root = HashMap::<_, BTreeMap<_, _>>::new(); + for (state_root, summary) in state_summaries_v22.iter() { + let summaries = state_summaries_by_block_root + .entry(summary.latest_block_root) + .or_default(); + + // TODO(hdiff): error if existing + summaries.insert(summary.slot, (state_root, summary)); + } + + let state_summaries = state_summaries_v22 + .iter() + .map(|(state_root, summary)| { + let previous_state_root = if summary.slot == 0 || *state_root == base_root { + Hash256::ZERO + } else { + let previous_slot = summary.slot - 1; + + // Check the set of states in the same state's block root + let same_block_root_summaries = state_summaries_by_block_root + .get(&summary.latest_block_root) + .ok_or(Error::MissingStateSummary(summary.latest_block_root))?; + if let Some((state_root, _)) = same_block_root_summaries.get(&previous_slot) { + // Skipped slot: block root at previous slot is the same as latest block root. + **state_root + } else { + // Common case: not a skipped slot. + let parent_block_root = parent_block_roots + .get(&summary.latest_block_root) + .ok_or(Error::MissingParentBlockRoot(summary.latest_block_root))?; + *state_summaries_by_block_root + .get(parent_block_root) + .ok_or(Error::MissingStateSummary(*parent_block_root))? + .get(&previous_slot) + .ok_or(Error::MissingStateSummaryAtSlot( + *parent_block_root, + previous_slot, + ))? + .0 + } + }; + + Ok(( + *state_root, + DAGStateSummary { + slot: summary.slot, + latest_block_root: summary.latest_block_root, + previous_state_root, + }, + )) + }) + .collect::, _>>()?; + + Ok(Self::new(state_summaries)) + } + + pub fn summaries_count(&self) -> usize { + self.state_summaries_by_block_root + .values() + .map(|s| s.len()) + .sum() + } + + pub fn summaries_by_slot_ascending(&self) -> BTreeMap> { + let mut summaries = BTreeMap::>::new(); + for (slot, (state_root, summary)) in self + .state_summaries_by_block_root + .values() + .flat_map(|slot_map| slot_map.iter()) + { + summaries + .entry(*slot) + .or_default() + .push((*state_root, *summary)); + } + summaries + } + + pub fn previous_state_root(&self, state_root: Hash256) -> Result { + Ok(self + .state_summaries_by_state_root + .get(&state_root) + .ok_or(Error::MissingStateSummary(state_root))? + .previous_state_root) + } + + pub fn ancestor_state_root_at_slot( + &self, + mut state_root: Hash256, + ancestor_slot: Slot, + ) -> Result { + // Walk backwards until we reach the state at `ancestor_slot`. + loop { + let summary = self + .state_summaries_by_state_root + .get(&state_root) + .ok_or(Error::MissingStateSummary(state_root))?; + + // Assumes all summaries are contiguous + match summary.slot.cmp(&ancestor_slot) { + Ordering::Less => { + return Err(Error::RequestedSlotAboveSummary(state_root, ancestor_slot)) + } + Ordering::Equal => { + return Ok(state_root); + } + Ordering::Greater => { + state_root = summary.previous_state_root; + } + } + } + } + + /// Returns all ancestors of `state_root` INCLUDING `state_root` until the next parent is not + /// known. + pub fn ancestors_of(&self, mut state_root: Hash256) -> Result, Error> { + // Sanity check that the first summary exists + if !self.state_summaries_by_state_root.contains_key(&state_root) { + return Err(Error::MissingStateSummary(state_root)); + } + + let mut ancestors = vec![]; + loop { + if let Some(summary) = self.state_summaries_by_state_root.get(&state_root) { + ancestors.push((state_root, summary.slot)); + state_root = summary.previous_state_root + } else { + return Ok(ancestors); + } + } + } +} + +impl From for DAGStateSummaryV22 { + fn from(value: HotStateSummary) -> Self { + Self { + slot: value.slot, + latest_block_root: value.latest_block_root, + } + } +} + +#[derive(Debug, Clone, Copy)] +pub struct DAGBlockSummary { + pub slot: Slot, + pub parent_root: Hash256, +} + +pub struct BlockSummariesDAG { + // parent_block_root -> Vec + child_block_roots: HashMap>, + // block_root -> block + blocks_by_block_root: HashMap, +} + +impl BlockSummariesDAG { + pub fn new(blocks: &[(Hash256, DAGBlockSummary)]) -> Self { + // Construct block root to parent block root mapping. + let mut child_block_roots = HashMap::<_, Vec<_>>::new(); + let mut blocks_by_block_root = HashMap::new(); + + for (block_root, block) in blocks { + child_block_roots + .entry(block.parent_root) + .or_default() + .push((*block_root, *block)); + // Add empty entry for the child block + child_block_roots.entry(*block_root).or_default(); + + blocks_by_block_root.insert(*block_root, *block); + } + + Self { + child_block_roots, + blocks_by_block_root, + } + } + + pub fn descendant_block_roots_of(&self, block_root: &Hash256) -> Result, Error> { + let mut descendants = vec![]; + for (child_root, _) in self + .child_block_roots + .get(block_root) + .ok_or(Error::MissingChildBlockRoot(*block_root))? + { + descendants.push(*child_root); + descendants.extend(self.descendant_block_roots_of(child_root)?); + } + Ok(descendants) + } + + /// Returns all ancestors of `block_root` INCLUDING `block_root` until the next parent is not + /// known. + pub fn ancestors_of(&self, mut block_root: Hash256) -> Result, Error> { + // Sanity check that the first block exists + if !self.blocks_by_block_root.contains_key(&block_root) { + return Err(Error::MissingBlock(block_root)); + } + + let mut ancestors = vec![]; + loop { + if let Some(block) = self.blocks_by_block_root.get(&block_root) { + ancestors.push((block_root, block.slot)); + block_root = block.parent_root + } else { + return Ok(ancestors); + } + } + } + + pub fn iter(&self) -> impl Iterator + '_ { + self.blocks_by_block_root + .iter() + .map(|(block_root, block)| (*block_root, block.slot)) + } +} + +#[cfg(test)] +mod tests { + use super::{BlockSummariesDAG, DAGBlockSummary, DAGStateSummaryV22, Error, StateSummariesDAG}; + use bls::FixedBytesExtended; + use std::collections::HashMap; + use types::{Hash256, Slot}; + + fn root(n: u64) -> Hash256 { + Hash256::from_low_u64_le(n) + } + + fn block_with_parent(parent_root: Hash256) -> DAGBlockSummary { + DAGBlockSummary { + slot: Slot::new(0), + parent_root, + } + } + + #[test] + fn new_from_v22_empty() { + StateSummariesDAG::new_from_v22(vec![], HashMap::new(), Hash256::ZERO).unwrap(); + } + + #[test] + fn new_from_v22_one_state() { + let root_a = root(0xa); + let root_1 = root(1); + let root_2 = root(2); + let summary_1 = DAGStateSummaryV22 { + slot: Slot::new(1), + latest_block_root: root_1, + }; + // (child, parent) + let parents = HashMap::from_iter([(root_1, root_2)]); + + let dag = + StateSummariesDAG::new_from_v22(vec![(root_a, summary_1)], parents, root_a).unwrap(); + + // The parent of the root summary is ZERO + assert_eq!(dag.previous_state_root(root_a).unwrap(), Hash256::ZERO); + } + + #[test] + fn descendant_block_roots_of() { + let root_1 = root(1); + let root_2 = root(2); + let root_3 = root(3); + let parents = vec![(root_1, block_with_parent(root_2))]; + let dag = BlockSummariesDAG::new(&parents); + + // root 1 is known and has no childs + assert_eq!( + dag.descendant_block_roots_of(&root_1).unwrap(), + Vec::::new() + ); + // root 2 is known and has childs + assert_eq!( + dag.descendant_block_roots_of(&root_2).unwrap(), + vec![root_1] + ); + // root 3 is not known + { + let err = dag.descendant_block_roots_of(&root_3).unwrap_err(); + if let Error::MissingChildBlockRoot(_) = err { + // ok + } else { + panic!("unexpected err {err:?}"); + } + } + } +} diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index da3e6d4ebcb..e22b5001d63 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -2467,6 +2467,18 @@ impl, Cold: ItemStore> HotColdDB self.hot_db.get(state_root) } + /// Load all hot state summaries present in the hot DB + pub fn load_hot_state_summaries(&self) -> Result, Error> { + self.hot_db + .iter_column::(DBColumn::BeaconStateSummary) + .map(|res| { + let (state_root, value) = res?; + let summary = HotStateSummary::from_ssz_bytes(&value)?; + Ok((state_root, summary)) + }) + .collect() + } + /// Load the temporary flag for a state root, if one exists. /// /// Returns `Some` if the state is temporary, or `None` if the state is permanent or does not diff --git a/consensus/proto_array/src/proto_array.rs b/consensus/proto_array/src/proto_array.rs index 38ea1411994..4c679d10350 100644 --- a/consensus/proto_array/src/proto_array.rs +++ b/consensus/proto_array/src/proto_array.rs @@ -1041,6 +1041,37 @@ impl ProtoArray { }) .map(|node| node.root) } + + /// Returns all nodes that have zero children and are viable heads + pub fn viable_heads(&self, current_slot: Slot) -> Vec<&ProtoNode> { + self.nodes_without_children() + .into_iter() + .filter_map(|index| { + // An unknown index is not a viable head + if let Some(node) = self.nodes.get(index) { + if self.node_is_viable_for_head::(node, current_slot) { + return Some(node); + } + } + None + }) + .collect() + } + + /// Returns all node indices that have zero children. May include unviable nodes. + fn nodes_without_children(&self) -> Vec { + let mut childs_of = HashMap::<_, Vec<_>>::new(); + for (index, node) in self.nodes.iter().enumerate() { + if let Some(parent_index) = node.parent { + childs_of.entry(parent_index).or_default().push(index); + } + } + childs_of + .into_iter() + .filter(|(_, childs)| childs.is_empty()) + .map(|(index, _)| index) + .collect() + } } /// A helper method to calculate the proposer boost based on the given `justified_balances`. diff --git a/consensus/proto_array/src/proto_array_fork_choice.rs b/consensus/proto_array/src/proto_array_fork_choice.rs index 88d46603117..035817d5c28 100644 --- a/consensus/proto_array/src/proto_array_fork_choice.rs +++ b/consensus/proto_array/src/proto_array_fork_choice.rs @@ -885,6 +885,11 @@ impl ProtoArrayForkChoice { pub fn core_proto_array_mut(&mut self) -> &mut ProtoArray { &mut self.proto_array } + + /// Returns all nodes that have zero children and are viable heads + pub fn viable_heads(&self, current_slot: Slot) -> Vec<&ProtoNode> { + self.proto_array.viable_heads::(current_slot) + } } /// Returns a list of `deltas`, where there is one delta for each of the indices in