Skip to content

Commit

Permalink
Merge branch 'das-fetch-blobs' of github.com:jimmygchen/lighthouse in…
Browse files Browse the repository at this point in the history
…to das-fetch-blobs
  • Loading branch information
jimmygchen committed Sep 19, 2024
2 parents e76d21f + 3444281 commit 4b2956f
Show file tree
Hide file tree
Showing 36 changed files with 1,311 additions and 1,052 deletions.
252 changes: 47 additions & 205 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ derivative = "2"
dirs = "3"
either = "1.9"
rust_eth_kzg = "0.5.1"
discv5 = { version = "0.4.1", features = ["libp2p"] }
discv5 = { version = "0.7", features = ["libp2p"] }
env_logger = "0.9"
error-chain = "0.12"
ethereum_hashing = "0.7.0"
Expand Down
36 changes: 11 additions & 25 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7090,32 +7090,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
block_root: &Hash256,
) -> Result<Option<(LightClientBootstrap<T::EthSpec>, ForkName)>, Error> {
let Some(block) = self.get_blinded_block(block_root)? else {
return Ok(None);
};

let (state_root, slot) = (block.state_root(), block.slot());

let Some(mut state) = self.get_state(&state_root, Some(slot))? else {
return Ok(None);
};
let head_state = &self.head().snapshot.beacon_state;
let finalized_period = head_state
.finalized_checkpoint()
.epoch
.sync_committee_period(&self.spec)?;

let fork_name = state
.fork_name(&self.spec)
.map_err(Error::InconsistentFork)?;

match fork_name {
ForkName::Altair
| ForkName::Bellatrix
| ForkName::Capella
| ForkName::Deneb
| ForkName::Electra => {
LightClientBootstrap::from_beacon_state(&mut state, &block, &self.spec)
.map(|bootstrap| Some((bootstrap, fork_name)))
.map_err(Error::LightClientError)
}
ForkName::Base => Err(Error::UnsupportedFork),
}
self.light_client_server_cache.get_light_client_bootstrap(
&self.store,
block_root,
finalized_period,
&self.spec,
)
}

pub fn metrics(&self) -> BeaconChainMetrics {
Expand Down
5 changes: 3 additions & 2 deletions beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ pub enum BeaconChainError {
UnableToPublish,
UnableToBuildColumnSidecar(String),
AvailabilityCheckError(AvailabilityCheckError),
LightClientError(LightClientError),
LightClientUpdateError(LightClientUpdateError),
LightClientBootstrapError(String),
UnsupportedFork,
MilhouseError(MilhouseError),
EmptyRpcCustodyColumns,
Expand Down Expand Up @@ -250,7 +251,7 @@ easy_from_to!(BlockReplayError, BeaconChainError);
easy_from_to!(InconsistentFork, BeaconChainError);
easy_from_to!(AvailabilityCheckError, BeaconChainError);
easy_from_to!(EpochCacheError, BeaconChainError);
easy_from_to!(LightClientError, BeaconChainError);
easy_from_to!(LightClientUpdateError, BeaconChainError);
easy_from_to!(MilhouseError, BeaconChainError);
easy_from_to!(AttestationError, BeaconChainError);

Expand Down
174 changes: 124 additions & 50 deletions beacon_node/beacon_chain/src/light_client_server_cache.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
use crate::errors::BeaconChainError;
use crate::{metrics, BeaconChainTypes, BeaconStore};
use eth2::types::light_client_update::CurrentSyncCommitteeProofLen;
use parking_lot::{Mutex, RwLock};
use safe_arith::SafeArith;
use slog::{debug, Logger};
use ssz::Decode;
use ssz::Encode;
use ssz_types::FixedVector;
use std::num::NonZeroUsize;
use std::sync::Arc;
use store::DBColumn;
use store::KeyValueStore;
use tree_hash::TreeHash;
use types::light_client_update::{
FinalizedRootProofLen, NextSyncCommitteeProofLen, FINALIZED_ROOT_INDEX,
NEXT_SYNC_COMMITTEE_INDEX,
FinalizedRootProofLen, NextSyncCommitteeProofLen, CURRENT_SYNC_COMMITTEE_INDEX,
FINALIZED_ROOT_INDEX, NEXT_SYNC_COMMITTEE_INDEX,
};
use types::non_zero_usize::new_non_zero_usize;
use types::{
BeaconBlockRef, BeaconState, ChainSpec, EthSpec, ForkName, Hash256, LightClientFinalityUpdate,
LightClientOptimisticUpdate, LightClientUpdate, Slot, SyncAggregate, SyncCommittee,
BeaconBlockRef, BeaconState, ChainSpec, Checkpoint, EthSpec, ForkName, Hash256,
LightClientBootstrap, LightClientFinalityUpdate, LightClientOptimisticUpdate,
LightClientUpdate, Slot, SyncAggregate, SyncCommittee,
};

/// A prev block cache miss requires to re-generate the state of the post-parent block. Items in the
Expand All @@ -28,7 +30,6 @@ const PREV_BLOCK_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(32);
/// This cache computes light client messages ahead of time, required to satisfy p2p and API
/// requests. These messages include proofs on historical states, so on-demand computation is
/// expensive.
///
pub struct LightClientServerCache<T: BeaconChainTypes> {
/// Tracks a single global latest finality update out of all imported blocks.
///
Expand All @@ -41,6 +42,8 @@ pub struct LightClientServerCache<T: BeaconChainTypes> {
latest_optimistic_update: RwLock<Option<LightClientOptimisticUpdate<T::EthSpec>>>,
/// Caches the most recent light client update
latest_light_client_update: RwLock<Option<LightClientUpdate<T::EthSpec>>>,
/// Caches the current sync committee,
latest_written_current_sync_committee: RwLock<Option<Arc<SyncCommittee<T::EthSpec>>>>,
/// Caches state proofs by block root
prev_block_cache: Mutex<lru::LruCache<Hash256, LightClientCachedData<T::EthSpec>>>,
}
Expand All @@ -51,6 +54,7 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
latest_finality_update: None.into(),
latest_optimistic_update: None.into(),
latest_light_client_update: None.into(),
latest_written_current_sync_committee: None.into(),
prev_block_cache: lru::LruCache::new(PREV_BLOCK_CACHE_SIZE).into(),
}
}
Expand Down Expand Up @@ -96,6 +100,10 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
let signature_slot = block_slot;
let attested_block_root = block_parent_root;

let sync_period = block_slot
.epoch(T::EthSpec::slots_per_epoch())
.sync_committee_period(chain_spec)?;

let attested_block = store.get_blinded_block(attested_block_root)?.ok_or(
BeaconChainError::DBInconsistent(format!(
"Block not available {:?}",
Expand All @@ -110,6 +118,18 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
attested_block.slot(),
)?;

let finalized_period = cached_parts
.finalized_checkpoint
.epoch
.sync_committee_period(chain_spec)?;

store.store_sync_committee_branch(
attested_block.message().tree_hash_root(),
&cached_parts.current_sync_committee_branch,
)?;

self.store_current_sync_committee(&store, &cached_parts, sync_period, finalized_period)?;

let attested_slot = attested_block.slot();

let maybe_finalized_block = store.get_blinded_block(&cached_parts.finalized_block_root)?;
Expand Down Expand Up @@ -178,57 +198,57 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {

// Spec: Full nodes SHOULD provide the best derivable LightClientUpdate (according to is_better_update)
// for each sync committee period
let prev_light_client_update = match &self.latest_light_client_update.read().clone() {
Some(prev_light_client_update) => Some(prev_light_client_update.clone()),
None => self.get_light_client_update(&store, sync_period, chain_spec)?,
};
let prev_light_client_update =
self.get_light_client_update(&store, sync_period, chain_spec)?;

let should_persist_light_client_update =
if let Some(prev_light_client_update) = prev_light_client_update {
let prev_sync_period = prev_light_client_update
.signature_slot()
.epoch(T::EthSpec::slots_per_epoch())
.sync_committee_period(chain_spec)?;

if sync_period != prev_sync_period {
true
} else {
prev_light_client_update
.is_better_light_client_update(&new_light_client_update, chain_spec)?
}
prev_light_client_update
.is_better_light_client_update(&new_light_client_update, chain_spec)?
} else {
true
};

if should_persist_light_client_update {
self.store_light_client_update(&store, sync_period, &new_light_client_update)?;
store.store_light_client_update(sync_period, &new_light_client_update)?;
*self.latest_light_client_update.write() = Some(new_light_client_update);
}

Ok(())
}

fn store_light_client_update(
fn store_current_sync_committee(
&self,
store: &BeaconStore<T>,
cached_parts: &LightClientCachedData<T::EthSpec>,
sync_committee_period: u64,
light_client_update: &LightClientUpdate<T::EthSpec>,
finalized_period: u64,
) -> Result<(), BeaconChainError> {
let column = DBColumn::LightClientUpdate;

store.hot_db.put_bytes(
column.into(),
&sync_committee_period.to_le_bytes(),
&light_client_update.as_ssz_bytes(),
)?;
if let Some(latest_sync_committee) =
self.latest_written_current_sync_committee.read().clone()
{
if latest_sync_committee == cached_parts.current_sync_committee {
return Ok(());
}
};

*self.latest_light_client_update.write() = Some(light_client_update.clone());
if finalized_period + 1 >= sync_committee_period {
store.store_sync_committee(
sync_committee_period,
&cached_parts.current_sync_committee,
)?;
*self.latest_written_current_sync_committee.write() =
Some(cached_parts.current_sync_committee.clone());
}

Ok(())
}

// Used to fetch the most recently persisted "best" light client update.
// Should not be used outside the light client server, as it also caches the fetched
// light client update.
/// Used to fetch the most recently persisted light client update for the given `sync_committee_period`.
/// It first checks the `latest_light_client_update` cache before querying the db.
///
/// Note: Should not be used outside the light client server, as it also caches the fetched
/// light client update.
fn get_light_client_update(
&self,
store: &BeaconStore<T>,
Expand All @@ -245,21 +265,7 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
}
}

let column = DBColumn::LightClientUpdate;
let res = store
.hot_db
.get_bytes(column.into(), &sync_committee_period.to_le_bytes())?;

if let Some(light_client_update_bytes) = res {
let epoch = sync_committee_period
.safe_mul(chain_spec.epochs_per_sync_committee_period.into())?;

let fork_name = chain_spec.fork_name_at_epoch(epoch.into());

let light_client_update =
LightClientUpdate::from_ssz_bytes(&light_client_update_bytes, &fork_name)
.map_err(store::errors::Error::SszDecodeError)?;

if let Some(light_client_update) = store.get_light_client_update(sync_committee_period)? {
*self.latest_light_client_update.write() = Some(light_client_update.clone());
return Ok(Some(light_client_update));
}
Expand Down Expand Up @@ -340,6 +346,65 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
pub fn get_latest_optimistic_update(&self) -> Option<LightClientOptimisticUpdate<T::EthSpec>> {
self.latest_optimistic_update.read().clone()
}

/// Fetches a light client bootstrap for a given finalized checkpoint `block_root`. We eagerly persist
/// `sync_committee_branch and `sync_committee` to allow for a more efficient bootstrap construction.
///
/// Note: It should be the case that a `sync_committee_branch` and `sync_committee` exist in the db
/// for a finalized checkpoint block root. However, we currently have no backfill mechanism for these values.
/// Therefore, `sync_committee_branch` and `sync_committee` are only persisted while a node is synced.
#[allow(clippy::type_complexity)]
pub fn get_light_client_bootstrap(
&self,
store: &BeaconStore<T>,
block_root: &Hash256,
finalized_period: u64,
chain_spec: &ChainSpec,
) -> Result<Option<(LightClientBootstrap<T::EthSpec>, ForkName)>, BeaconChainError> {
let Some(block) = store.get_blinded_block(block_root)? else {
return Err(BeaconChainError::LightClientBootstrapError(format!(
"Block root {block_root} not found"
)));
};

let (_, slot) = (block.state_root(), block.slot());

let fork_name = chain_spec.fork_name_at_slot::<T::EthSpec>(slot);

let sync_committee_period = block
.slot()
.epoch(T::EthSpec::slots_per_epoch())
.sync_committee_period(chain_spec)?;

let Some(current_sync_committee_branch) = store.get_sync_committee_branch(block_root)?
else {
return Err(BeaconChainError::LightClientBootstrapError(format!(
"Sync committee branch for block root {:?} not found",
block_root
)));
};

if sync_committee_period > finalized_period {
return Err(BeaconChainError::LightClientBootstrapError(
format!("The blocks sync committee period {sync_committee_period} is greater than the current finalized period {finalized_period}"),
));
}

let Some(current_sync_committee) = store.get_sync_committee(sync_committee_period)? else {
return Err(BeaconChainError::LightClientBootstrapError(format!(
"Sync committee for period {sync_committee_period} not found"
)));
};

let light_client_bootstrap = LightClientBootstrap::new(
&block,
Arc::new(current_sync_committee),
current_sync_committee_branch,
chain_spec,
)?;

Ok(Some((light_client_bootstrap, fork_name)))
}
}

impl<T: BeaconChainTypes> Default for LightClientServerCache<T> {
Expand All @@ -350,23 +415,32 @@ impl<T: BeaconChainTypes> Default for LightClientServerCache<T> {

type FinalityBranch = FixedVector<Hash256, FinalizedRootProofLen>;
type NextSyncCommitteeBranch = FixedVector<Hash256, NextSyncCommitteeProofLen>;
type CurrentSyncCommitteeBranch = FixedVector<Hash256, CurrentSyncCommitteeProofLen>;

#[derive(Clone)]
struct LightClientCachedData<E: EthSpec> {
finalized_checkpoint: Checkpoint,
finality_branch: FinalityBranch,
next_sync_committee_branch: NextSyncCommitteeBranch,
current_sync_committee_branch: CurrentSyncCommitteeBranch,
next_sync_committee: Arc<SyncCommittee<E>>,
current_sync_committee: Arc<SyncCommittee<E>>,
finalized_block_root: Hash256,
}

impl<E: EthSpec> LightClientCachedData<E> {
fn from_state(state: &mut BeaconState<E>) -> Result<Self, BeaconChainError> {
Ok(Self {
finalized_checkpoint: state.finalized_checkpoint(),
finality_branch: state.compute_merkle_proof(FINALIZED_ROOT_INDEX)?.into(),
next_sync_committee: state.next_sync_committee()?.clone(),
current_sync_committee: state.current_sync_committee()?.clone(),
next_sync_committee_branch: state
.compute_merkle_proof(NEXT_SYNC_COMMITTEE_INDEX)?
.into(),
current_sync_committee_branch: state
.compute_merkle_proof(CURRENT_SYNC_COMMITTEE_INDEX)?
.into(),
finalized_block_root: state.finalized_checkpoint().root,
})
}
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
StoreOp::DeleteBlock(block_root),
StoreOp::DeleteExecutionPayload(block_root),
StoreOp::DeleteBlobs(block_root),
StoreOp::DeleteSyncCommitteeBranch(block_root),
]
})
.chain(
Expand Down
Loading

0 comments on commit 4b2956f

Please sign in to comment.