Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent sync lookups from reverting to awaiting block #6443

Merged
merged 2 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 24 additions & 14 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::sync::Arc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{DataColumnSidecarList, SignedBeaconBlock};

use super::single_block_lookup::DownloadResult;
use super::single_block_lookup::{ComponentRequests, DownloadResult};
use super::SingleLookupId;

#[derive(Debug, Copy, Clone)]
Expand Down Expand Up @@ -42,7 +42,7 @@ pub trait RequestState<T: BeaconChainTypes> {
&self,
id: Id,
peer_id: PeerId,
downloaded_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
expected_blobs: usize,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError>;

Expand All @@ -61,7 +61,7 @@ pub trait RequestState<T: BeaconChainTypes> {
fn response_type() -> ResponseType;

/// A getter for the `BlockRequestState` or `BlobRequestState` associated with this trait.
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self;
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str>;

/// A getter for a reference to the `SingleLookupRequestState` associated with this trait.
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType>;
Expand All @@ -77,7 +77,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
&self,
id: SingleLookupId,
peer_id: PeerId,
_: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
_: usize,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> {
cx.block_lookup_request(id, peer_id, self.requested_block_root)
Expand Down Expand Up @@ -107,8 +107,8 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
fn response_type() -> ResponseType {
ResponseType::Block
}
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self {
&mut request.block_request_state
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str> {
Ok(&mut request.block_request_state)
}
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
&self.state
Expand All @@ -125,10 +125,10 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
&self,
id: Id,
peer_id: PeerId,
downloaded_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
expected_blobs: usize,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> {
cx.blob_lookup_request(id, peer_id, self.block_root, downloaded_block)
cx.blob_lookup_request(id, peer_id, self.block_root, expected_blobs)
.map_err(LookupRequestError::SendFailedNetwork)
}

Expand All @@ -150,8 +150,13 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
fn response_type() -> ResponseType {
ResponseType::Blob
}
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self {
&mut request.blob_request_state
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str> {
match &mut request.component_requests {
ComponentRequests::WaitingForBlock => Err("waiting for block"),
ComponentRequests::ActiveBlobRequest(request, _) => Ok(request),
ComponentRequests::ActiveCustodyRequest { .. } => Err("expecting custody request"),
ComponentRequests::NotNeeded { .. } => Err("not needed"),
}
}
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
&self.state
Expand All @@ -169,10 +174,10 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
id: Id,
// TODO(das): consider selecting peers that have custody but are in this set
_peer_id: PeerId,
downloaded_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
_: usize,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> {
cx.custody_lookup_request(id, self.block_root, downloaded_block)
cx.custody_lookup_request(id, self.block_root)
.map_err(LookupRequestError::SendFailedNetwork)
}

Expand Down Expand Up @@ -200,8 +205,13 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
fn response_type() -> ResponseType {
ResponseType::CustodyColumn
}
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self {
&mut request.custody_request_state
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str> {
match &mut request.component_requests {
ComponentRequests::WaitingForBlock => Err("waiting for block"),
ComponentRequests::ActiveBlobRequest { .. } => Err("expecting blob request"),
ComponentRequests::ActiveCustodyRequest(request) => Ok(request),
ComponentRequests::NotNeeded { .. } => Err("not needed"),
}
}
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
&self.state
Expand Down
8 changes: 6 additions & 2 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
};

let block_root = lookup.block_root();
let request_state = R::request_state_mut(lookup).get_state_mut();
let request_state = R::request_state_mut(lookup)
.map_err(|e| LookupRequestError::BadState(e.to_owned()))?
.get_state_mut();

match response {
Ok((response, peer_group, seen_timestamp)) => {
Expand Down Expand Up @@ -504,7 +506,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
};

let block_root = lookup.block_root();
let request_state = R::request_state_mut(lookup).get_state_mut();
let request_state = R::request_state_mut(lookup)
.map_err(|e| LookupRequestError::BadState(e.to_owned()))?
.get_state_mut();

debug!(
self.log,
Expand Down
117 changes: 97 additions & 20 deletions beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::sync::network_context::{
LookupRequestResult, PeerGroup, ReqId, RpcRequestSendError, SendErrorProcessor,
SyncNetworkContext,
};
use beacon_chain::BeaconChainTypes;
use beacon_chain::{BeaconChainTypes, BlockProcessStatus};
use derivative::Derivative;
use lighthouse_network::service::api_types::Id;
use rand::seq::IteratorRandom;
Expand Down Expand Up @@ -62,8 +62,7 @@ pub enum LookupRequestError {
pub struct SingleBlockLookup<T: BeaconChainTypes> {
pub id: Id,
pub block_request_state: BlockRequestState<T::EthSpec>,
pub blob_request_state: BlobRequestState<T::EthSpec>,
pub custody_request_state: CustodyRequestState<T::EthSpec>,
pub component_requests: ComponentRequests<T::EthSpec>,
/// Peers that claim to have imported this set of block components
#[derivative(Debug(format_with = "fmt_peer_set_as_len"))]
peers: HashSet<PeerId>,
Expand All @@ -72,6 +71,16 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
created: Instant,
}

#[derive(Debug)]
pub(crate) enum ComponentRequests<E: EthSpec> {
WaitingForBlock,
ActiveBlobRequest(BlobRequestState<E>, usize),
ActiveCustodyRequest(CustodyRequestState<E>),
// When printing in debug this state display the reason why it's not needed
#[allow(dead_code)]
NotNeeded(&'static str),
}

impl<T: BeaconChainTypes> SingleBlockLookup<T> {
pub fn new(
requested_block_root: Hash256,
Expand All @@ -82,8 +91,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
Self {
id,
block_request_state: BlockRequestState::new(requested_block_root),
blob_request_state: BlobRequestState::new(requested_block_root),
custody_request_state: CustodyRequestState::new(requested_block_root),
component_requests: ComponentRequests::WaitingForBlock,
peers: HashSet::from_iter(peers.iter().copied()),
block_root: requested_block_root,
awaiting_parent,
Expand Down Expand Up @@ -142,16 +150,28 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
/// Returns true if the block has already been downloaded.
pub fn all_components_processed(&self) -> bool {
self.block_request_state.state.is_processed()
&& self.blob_request_state.state.is_processed()
&& self.custody_request_state.state.is_processed()
&& match &self.component_requests {
ComponentRequests::WaitingForBlock => false,
ComponentRequests::ActiveBlobRequest(request, _) => request.state.is_processed(),
ComponentRequests::ActiveCustodyRequest(request) => request.state.is_processed(),
ComponentRequests::NotNeeded { .. } => true,
}
}

/// Returns true if this request is expecting some event to make progress
pub fn is_awaiting_event(&self) -> bool {
self.awaiting_parent.is_some()
|| self.block_request_state.state.is_awaiting_event()
|| self.blob_request_state.state.is_awaiting_event()
|| self.custody_request_state.state.is_awaiting_event()
|| match &self.component_requests {
ComponentRequests::WaitingForBlock => true,
ComponentRequests::ActiveBlobRequest(request, _) => {
request.state.is_awaiting_event()
}
ComponentRequests::ActiveCustodyRequest(request) => {
request.state.is_awaiting_event()
}
ComponentRequests::NotNeeded { .. } => false,
}
}

/// Makes progress on all requests of this lookup. Any error is not recoverable and must result
Expand All @@ -161,9 +181,66 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupResult, LookupRequestError> {
// TODO: Check what's necessary to download, specially for blobs
self.continue_request::<BlockRequestState<T::EthSpec>>(cx)?;
self.continue_request::<BlobRequestState<T::EthSpec>>(cx)?;
self.continue_request::<CustodyRequestState<T::EthSpec>>(cx)?;
self.continue_request::<BlockRequestState<T::EthSpec>>(cx, 0)?;

if let ComponentRequests::WaitingForBlock = self.component_requests {
let downloaded_block = self
.block_request_state
.state
.peek_downloaded_data()
.cloned();

if let Some(block) = downloaded_block.or_else(|| {
// If the block is already being processed or fully validated, retrieve how many blobs
// it expects. Consider any stage of the block. If the block root has been validated, we
// can assert that this is the correct value of `blob_kzg_commitments_count`.
match cx.chain.get_block_process_status(&self.block_root) {
BlockProcessStatus::Unknown => None,
BlockProcessStatus::NotValidated(block)
| BlockProcessStatus::ExecutionValidated(block) => Some(block.clone()),
}
}) {
let expected_blobs = block.num_expected_blobs();
let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
if expected_blobs == 0 {
self.component_requests = ComponentRequests::NotNeeded("no data");
}
if cx.chain.should_fetch_blobs(block_epoch) {
self.component_requests = ComponentRequests::ActiveBlobRequest(
BlobRequestState::new(self.block_root),
expected_blobs,
);
} else if cx.chain.should_fetch_custody_columns(block_epoch) {
self.component_requests = ComponentRequests::ActiveCustodyRequest(
CustodyRequestState::new(self.block_root),
);
} else {
self.component_requests = ComponentRequests::NotNeeded("outside da window");
}
} else {
// Wait to download the block before downloading blobs. Then we can be sure that the
// block has data, so there's no need to do "blind" requests for all possible blobs and
// latter handle the case where if the peer sent no blobs, penalize.
//
// Lookup sync event safety: Reaching this code means that a block is not in any pre-import
// cache nor in the request state of this lookup. Therefore, the block must either: (1) not
// be downloaded yet or (2) the block is already imported into the fork-choice.
// In case (1) the lookup must either successfully download the block or get dropped.
// In case (2) the block will be downloaded, processed, reach `DuplicateFullyImported`
// and get dropped as completed.
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved
}
}

match &self.component_requests {
ComponentRequests::WaitingForBlock => {} // do nothing
ComponentRequests::ActiveBlobRequest(_, expected_blobs) => {
self.continue_request::<BlobRequestState<T::EthSpec>>(cx, *expected_blobs)?
}
ComponentRequests::ActiveCustodyRequest(_) => {
self.continue_request::<CustodyRequestState<T::EthSpec>>(cx, 0)?
}
ComponentRequests::NotNeeded { .. } => {} // do nothing
}

// If all components of this lookup are already processed, there will be no future events
// that can make progress so it must be dropped. Consider the lookup completed.
Expand All @@ -179,15 +256,12 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
fn continue_request<R: RequestState<T>>(
&mut self,
cx: &mut SyncNetworkContext<T>,
expected_blobs: usize,
) -> Result<(), LookupRequestError> {
let id = self.id;
let awaiting_parent = self.awaiting_parent.is_some();
let downloaded_block = self
.block_request_state
.state
.peek_downloaded_data()
.cloned();
let request = R::request_state_mut(self);
let request =
R::request_state_mut(self).map_err(|e| LookupRequestError::BadState(e.to_owned()))?;

// Attempt to progress awaiting downloads
if request.get_state().is_awaiting_download() {
Expand All @@ -206,13 +280,16 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
// not receive any new peers for some time it will be dropped. If it receives a new
// peer it must attempt to make progress.
R::request_state_mut(self)
.map_err(|e| LookupRequestError::BadState(e.to_owned()))?
.get_state_mut()
.update_awaiting_download_status("no peers");
return Ok(());
};

let request = R::request_state_mut(self);
match request.make_request(id, peer_id, downloaded_block, cx)? {
let request = R::request_state_mut(self)
.map_err(|e| LookupRequestError::BadState(e.to_owned()))?;

match request.make_request(id, peer_id, expected_blobs, cx)? {
LookupRequestResult::RequestSent(req_id) => {
// Lookup sync event safety: If make_request returns `RequestSent`, we are
// guaranteed that `BlockLookups::on_download_response` will be called exactly
Expand Down
67 changes: 1 addition & 66 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,45 +626,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
lookup_id: SingleLookupId,
peer_id: PeerId,
block_root: Hash256,
downloaded_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
expected_blobs: usize,
) -> Result<LookupRequestResult, RpcRequestSendError> {
let Some(block) = downloaded_block.or_else(|| {
// If the block is already being processed or fully validated, retrieve how many blobs
// it expects. Consider any stage of the block. If the block root has been validated, we
// can assert that this is the correct value of `blob_kzg_commitments_count`.
match self.chain.get_block_process_status(&block_root) {
BlockProcessStatus::Unknown => None,
BlockProcessStatus::NotValidated(block)
| BlockProcessStatus::ExecutionValidated(block) => Some(block.clone()),
}
}) else {
// Wait to download the block before downloading blobs. Then we can be sure that the
// block has data, so there's no need to do "blind" requests for all possible blobs and
// latter handle the case where if the peer sent no blobs, penalize.
// - if `downloaded_block_expected_blobs` is Some = block is downloading or processing.
// - if `num_expected_blobs` returns Some = block is processed.
//
// Lookup sync event safety: Reaching this code means that a block is not in any pre-import
// cache nor in the request state of this lookup. Therefore, the block must either: (1) not
// be downloaded yet or (2) the block is already imported into the fork-choice.
// In case (1) the lookup must either successfully download the block or get dropped.
// In case (2) the block will be downloaded, processed, reach `DuplicateFullyImported`
// and get dropped as completed.
return Ok(LookupRequestResult::Pending("waiting for block download"));
};
let expected_blobs = block.num_expected_blobs();
let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());

// Check if we are in deneb, before peerdas and inside da window
if !self.chain.should_fetch_blobs(block_epoch) {
return Ok(LookupRequestResult::NoRequestNeeded("blobs not required"));
}

// No data required for this block
if expected_blobs == 0 {
return Ok(LookupRequestResult::NoRequestNeeded("no data"));
}

let imported_blob_indexes = self
.chain
.data_availability_checker
Expand Down Expand Up @@ -754,35 +717,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&mut self,
lookup_id: SingleLookupId,
block_root: Hash256,
downloaded_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
) -> Result<LookupRequestResult, RpcRequestSendError> {
let Some(block) =
downloaded_block.or_else(|| match self.chain.get_block_process_status(&block_root) {
BlockProcessStatus::Unknown => None,
BlockProcessStatus::NotValidated(block)
| BlockProcessStatus::ExecutionValidated(block) => Some(block.clone()),
})
else {
// Wait to download the block before downloading columns. Then we can be sure that the
// block has data, so there's no need to do "blind" requests for all possible columns and
// latter handle the case where if the peer sent no columns, penalize.
// - if `downloaded_block_expected_blobs` is Some = block is downloading or processing.
// - if `num_expected_blobs` returns Some = block is processed.
return Ok(LookupRequestResult::Pending("waiting for block download"));
};
let expected_blobs = block.num_expected_blobs();
let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());

// Check if we are into peerdas and inside da window
if !self.chain.should_fetch_custody_columns(block_epoch) {
return Ok(LookupRequestResult::NoRequestNeeded("columns not required"));
}

// No data required for this block
if expected_blobs == 0 {
return Ok(LookupRequestResult::NoRequestNeeded("no data"));
}

let custody_indexes_imported = self
.chain
.data_availability_checker
Expand Down
Loading