From 043d52b7dfe0d270efe4c7bc36615279f849dad6 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 16 Sep 2024 13:39:47 +0200 Subject: [PATCH 1/9] Generalize sync ActiveRequests --- .../src/service/api_types.rs | 17 +- .../network/src/sync/block_lookups/tests.rs | 33 +-- beacon_node/network/src/sync/manager.rs | 16 +- .../network/src/sync/network_context.rs | 237 +++++++----------- .../src/sync/network_context/requests.rs | 168 ++++++++++++- .../network_context/requests/blobs_by_root.rs | 59 ++--- .../requests/blocks_by_root.rs | 43 ++-- .../requests/data_columns_by_root.rs | 58 +---- 8 files changed, 327 insertions(+), 304 deletions(-) diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 30400db3b66..3e7d19bbdd0 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -29,11 +29,6 @@ pub struct SingleLookupReqId { pub req_id: Id, } -/// Request ID for data_columns_by_root requests. Block lookup do not issue this requests directly. -/// Wrapping this particular req_id, ensures not mixing this requests with a custody req_id. -#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] -pub struct DataColumnsByRootRequestId(pub Id); - /// Id of rpc requests sent by sync to the network. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum SyncRequestId { @@ -42,11 +37,19 @@ pub enum SyncRequestId { /// Request searching for a set of blobs given a hash. SingleBlob { id: SingleLookupReqId }, /// Request searching for a set of data columns given a hash and list of column indices. - DataColumnsByRoot(DataColumnsByRootRequestId, DataColumnsByRootRequester), + DataColumnsByRoot(DataColumnsByRootRequestId), /// Range request that is composed by both a block range request and a blob range request. RangeBlockAndBlobs { id: Id }, } +/// Request ID for data_columns_by_root requests. Block lookup do not issue this requests directly. +/// Wrapping this particular req_id, ensures not mixing this requests with a custody req_id. +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct DataColumnsByRootRequestId { + pub id: Id, + pub requester: DataColumnsByRootRequester, +} + #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum DataColumnsByRootRequester { Sampling(SamplingId), @@ -247,6 +250,6 @@ impl slog::Value for RequestId { impl std::fmt::Display for DataColumnsByRootRequestId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0) + write!(f, "{}", self.id) } } diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 5b4f17ac0dd..f50bd888c00 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -24,8 +24,8 @@ use beacon_chain::{ use beacon_processor::WorkEvent; use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode}; use lighthouse_network::service::api_types::{ - AppRequestId, DataColumnsByRootRequester, Id, SamplingRequester, SingleLookupReqId, - SyncRequestId, + AppRequestId, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, SamplingRequester, + SingleLookupReqId, SyncRequestId, }; use lighthouse_network::types::SyncState; use lighthouse_network::{NetworkGlobals, Request}; @@ -713,10 +713,10 @@ impl TestRig { let first_dc = data_columns.first().unwrap(); let block_root = first_dc.block_root(); let sampling_request_id = match id.0 { - SyncRequestId::DataColumnsByRoot( - _, - _requester @ DataColumnsByRootRequester::Sampling(sampling_id), - ) => sampling_id.sampling_request_id, + SyncRequestId::DataColumnsByRoot(DataColumnsByRootRequestId { + requester: DataColumnsByRootRequester::Sampling(sampling_id), + .. + }) => sampling_id.sampling_request_id, _ => unreachable!(), }; self.complete_data_columns_by_root_request(id, data_columns); @@ -741,14 +741,15 @@ impl TestRig { data_columns: Vec>>, missing_components: bool, ) { - let lookup_id = - if let SyncRequestId::DataColumnsByRoot(_, DataColumnsByRootRequester::Custody(id)) = - ids.first().unwrap().0 - { - id.requester.0.lookup_id - } else { - panic!("not a custody requester") - }; + let lookup_id = if let SyncRequestId::DataColumnsByRoot(DataColumnsByRootRequestId { + requester: DataColumnsByRootRequester::Custody(id), + .. + }) = ids.first().unwrap().0 + { + id.requester.0.lookup_id + } else { + panic!("not a custody requester") + }; let first_column = data_columns.first().cloned().unwrap(); @@ -1339,7 +1340,7 @@ fn test_single_block_lookup_empty_response() { // The peer does not have the block. It should be penalized. r.single_lookup_block_response(id, peer_id, None); - r.expect_penalty(peer_id, "NoResponseReturned"); + r.expect_penalty(peer_id, "NotEnoughResponsesReturned"); // it should be retried let id = r.expect_block_lookup_request(block_root); // Send the right block this time. @@ -2698,7 +2699,7 @@ mod deneb_only { }; tester .empty_block_response() - .expect_penalty("NoResponseReturned") + .expect_penalty("NotEnoughResponsesReturned") .expect_block_request() .expect_no_blobs_request() .block_response_and_expect_blob_request() diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index ed91c73d8bf..718206873bc 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -380,13 +380,9 @@ impl SyncManager { SyncRequestId::SingleBlob { id } => { self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error)) } - SyncRequestId::DataColumnsByRoot(req_id, requester) => self - .on_data_columns_by_root_response( - req_id, - requester, - peer_id, - RpcEvent::RPCError(error), - ), + SyncRequestId::DataColumnsByRoot(req_id) => { + self.on_data_columns_by_root_response(req_id, peer_id, RpcEvent::RPCError(error)) + } SyncRequestId::RangeBlockAndBlobs { id } => { if let Some(sender_id) = self.network.range_request_failed(id) { match sender_id { @@ -991,10 +987,9 @@ impl SyncManager { seen_timestamp: Duration, ) { match request_id { - SyncRequestId::DataColumnsByRoot(req_id, requester) => { + SyncRequestId::DataColumnsByRoot(req_id) => { self.on_data_columns_by_root_response( req_id, - requester, peer_id, match data_column { Some(data_column) => RpcEvent::Response(data_column, seen_timestamp), @@ -1036,7 +1031,6 @@ impl SyncManager { fn on_data_columns_by_root_response( &mut self, req_id: DataColumnsByRootRequestId, - requester: DataColumnsByRootRequester, peer_id: PeerId, data_column: RpcEvent>>, ) { @@ -1044,7 +1038,7 @@ impl SyncManager { self.network .on_data_columns_by_root_response(req_id, peer_id, data_column) { - match requester { + match req_id.requester { DataColumnsByRootRequester::Sampling(id) => { if let Some((requester, result)) = self.sampling diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index b9f6d180c13..bfb9bd44a6f 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -2,7 +2,6 @@ //! channel and stores a global RPC ID to perform requests. use self::custody::{ActiveCustodyRequest, Error as CustodyRequestError}; -use self::requests::{ActiveBlobsByRootRequest, ActiveBlocksByRootRequest}; pub use self::requests::{BlocksByRootSingleRequest, DataColumnsByRootSingleBlockRequest}; use super::block_sidecar_coupling::RangeBlockComponentsRequest; use super::manager::BlockProcessType; @@ -25,8 +24,11 @@ use lighthouse_network::service::api_types::{ use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; use rand::seq::SliceRandom; use rand::thread_rng; -use requests::ActiveDataColumnsByRootRequest; pub use requests::LookupVerifyError; +use requests::{ + ActiveRequests, BlobsByRootRequestItems, BlocksByRootRequestItems, + DataColumnsByRootRequestItems, +}; use slog::{debug, error, warn}; use std::collections::hash_map::Entry; use std::collections::HashMap; @@ -164,18 +166,17 @@ pub struct SyncNetworkContext { request_id: Id, /// A mapping of active BlocksByRoot requests, including both current slot and parent lookups. - blocks_by_root_requests: FnvHashMap, - + blocks_by_root_requests: + ActiveRequests>, /// A mapping of active BlobsByRoot requests, including both current slot and parent lookups. - blobs_by_root_requests: FnvHashMap>, + blobs_by_root_requests: ActiveRequests>, + /// A mapping of active DataColumnsByRoot requests + data_columns_by_root_requests: + ActiveRequests>, /// Mapping of active custody column requests for a block root custody_by_root_requests: FnvHashMap>, - /// A mapping of active DataColumnsByRoot requests - data_columns_by_root_requests: - FnvHashMap>, - /// BlocksByRange requests paired with BlobsByRange range_block_components_requests: FnvHashMap)>, @@ -223,9 +224,17 @@ impl SyncNetworkContext { network_send, execution_engine_state: EngineState::Online, // always assume `Online` at the start request_id: 1, - blocks_by_root_requests: <_>::default(), - blobs_by_root_requests: <_>::default(), - data_columns_by_root_requests: <_>::default(), + // true = enfore max_requests as returned for blocks_by_root. We always request a single + // block and the peer must have it. + blocks_by_root_requests: ActiveRequests::new(true, "blocks_by_root"), + // true = enfore max_requests are returned for blobs_by_root. We only issue requests for + // blocks after we know the block has data, and only request peers after they claim to + // have imported the block+blobs. + blobs_by_root_requests: ActiveRequests::new(true, "blobs_by_root"), + // true = enforce max_requests are returned data_columns_by_root. We only issue requests + // for blocks after we know the block has data, and only request peers after they claim to + // have imported the block+columns and claim to be custodians + data_columns_by_root_requests: ActiveRequests::new(true, "data_columns_by_root"), custody_by_root_requests: <_>::default(), range_block_components_requests: FnvHashMap::default(), network_beacon_processor, @@ -249,34 +258,19 @@ impl SyncNetworkContext { let failed_block_ids = self .blocks_by_root_requests - .iter() - .filter_map(|(id, request)| { - if request.peer_id == *peer_id { - Some(SyncRequestId::SingleBlock { id: *id }) - } else { - None - } - }); + .active_requests_of_peer(peer_id) + .into_iter() + .map(|id| SyncRequestId::SingleBlock { id: *id }); let failed_blob_ids = self .blobs_by_root_requests - .iter() - .filter_map(|(id, request)| { - if request.peer_id == *peer_id { - Some(SyncRequestId::SingleBlob { id: *id }) - } else { - None - } - }); - let failed_data_column_by_root_ids = - self.data_columns_by_root_requests - .iter() - .filter_map(|(req_id, request)| { - if request.peer_id == *peer_id { - Some(SyncRequestId::DataColumnsByRoot(*req_id, request.requester)) - } else { - None - } - }); + .active_requests_of_peer(peer_id) + .into_iter() + .map(|id| SyncRequestId::SingleBlob { id: *id }); + let failed_data_column_by_root_ids = self + .data_columns_by_root_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|req_id| SyncRequestId::DataColumnsByRoot(*req_id)); failed_range_ids .chain(failed_block_ids) @@ -579,7 +573,7 @@ impl SyncNetworkContext { .map_err(|_| RpcRequestSendError::NetworkSendError)?; self.blocks_by_root_requests - .insert(id, ActiveBlocksByRootRequest::new(request, peer_id)); + .insert(id, peer_id, BlocksByRootRequestItems::new(request)); Ok(LookupRequestResult::RequestSent(req_id)) } @@ -677,7 +671,7 @@ impl SyncNetworkContext { .map_err(|_| RpcRequestSendError::NetworkSendError)?; self.blobs_by_root_requests - .insert(id, ActiveBlobsByRootRequest::new(request, peer_id)); + .insert(id, peer_id, BlobsByRootRequestItems::new(request)); Ok(LookupRequestResult::RequestSent(req_id)) } @@ -689,7 +683,10 @@ impl SyncNetworkContext { peer_id: PeerId, request: DataColumnsByRootSingleBlockRequest, ) -> Result, &'static str> { - let req_id = DataColumnsByRootRequestId(self.next_id()); + let req_id = DataColumnsByRootRequestId { + id: self.next_id(), + requester, + }; debug!( self.log, "Sending DataColumnsByRoot Request"; @@ -704,12 +701,13 @@ impl SyncNetworkContext { self.send_network_msg(NetworkMessage::SendRequest { peer_id, request: Request::DataColumnsByRoot(request.clone().into_request(&self.chain.spec)), - request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(req_id, requester)), + request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(req_id)), })?; self.data_columns_by_root_requests.insert( req_id, - ActiveDataColumnsByRootRequest::new(request, peer_id, requester), + peer_id, + DataColumnsByRootRequestItems::new(request), ); Ok(LookupRequestResult::RequestSent(req_id)) @@ -917,142 +915,77 @@ impl SyncNetworkContext { // Request handlers - pub fn on_single_block_response( + pub(crate) fn on_single_block_response( &mut self, - request_id: SingleLookupReqId, + id: SingleLookupReqId, peer_id: PeerId, rpc_event: RpcEvent>>, ) -> Option>>> { - let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else { - metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["blocks_by_root"]); - return None; - }; - - let resp = match rpc_event { - RpcEvent::Response(block, seen_timestamp) => { - match request.get_mut().add_response(block) { - Ok(block) => Ok((block, seen_timestamp)), - Err(e) => { - // The request must be dropped after receiving an error. - request.remove(); - Err(e.into()) - } + let r = self.blocks_by_root_requests.on_response(id, rpc_event); + let r = match r { + // Enforce that exactly one chunk = one block is returned. ReqResp behavior limits the + // response count to at most 1. + Some(Ok((mut blocks, seen_timestamp))) => match blocks.pop() { + Some(block) => Some(Ok((block, seen_timestamp))), + // Should never happen, `blocks_by_root_requests` enforces that we receive at least + // 1 chunk. + None => Some(Err(LookupVerifyError::NotEnoughResponsesReturned { + actual: 0, } - } - RpcEvent::StreamTermination => match request.remove().terminate() { - Ok(_) => return None, - Err(e) => Err(e.into()), + .into())), }, - RpcEvent::RPCError(e) => { - request.remove(); - Err(e.into()) - } + Some(Err(e)) => Some(Err(e)), + None => None, }; - - if let Err(RpcResponseError::VerifyError(e)) = &resp { + if let Some(Err(RpcResponseError::VerifyError(e))) = &r { self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } - Some(resp) + r } - pub fn on_single_blob_response( + pub(crate) fn on_single_blob_response( &mut self, - request_id: SingleLookupReqId, + id: SingleLookupReqId, peer_id: PeerId, rpc_event: RpcEvent>>, ) -> Option>> { - let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else { - metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["blobs_by_root"]); - return None; - }; - - let resp = match rpc_event { - RpcEvent::Response(blob, seen_timestamp) => { - let request = request.get_mut(); - match request.add_response(blob) { - Ok(Some(blobs)) => to_fixed_blob_sidecar_list(blobs) - .map(|blobs| (blobs, seen_timestamp)) - .map_err(|e| (e.into(), request.resolve())), - Ok(None) => return None, - Err(e) => Err((e.into(), request.resolve())), - } - } - RpcEvent::StreamTermination => match request.remove().terminate() { - Ok(_) => return None, - // (err, false = not resolved) because terminate returns Ok() if resolved - Err(e) => Err((e.into(), false)), + let r = self.blobs_by_root_requests.on_response(id, rpc_event); + let r = match r { + Some(Ok((blobs, seen_timestamp))) => match to_fixed_blob_sidecar_list(blobs) { + Ok(blobs) => Some(Ok((blobs, seen_timestamp))), + Err(e) => Some(Err(e.into())), }, - RpcEvent::RPCError(e) => Err((e.into(), request.remove().resolve())), + Some(Err(e)) => Some(Err(e)), + None => None, }; - - match resp { - Ok(resp) => Some(Ok(resp)), - // Track if this request has already returned some value downstream. Ensure that - // downstream code only receives a single Result per request. If the serving peer does - // multiple penalizable actions per request, downscore and return None. This allows to - // catch if a peer is returning more blobs than requested or if the excess blobs are - // invalid. - Err((e, resolved)) => { - if let RpcResponseError::VerifyError(e) = &e { - self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); - } - if resolved { - None - } else { - Some(Err(e)) - } - } + if let Some(Err(RpcResponseError::VerifyError(e))) = &r { + self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } + r } #[allow(clippy::type_complexity)] - pub fn on_data_columns_by_root_response( + pub(crate) fn on_data_columns_by_root_response( &mut self, id: DataColumnsByRootRequestId, - _peer_id: PeerId, + peer_id: PeerId, rpc_event: RpcEvent>>, ) -> Option>>>> { - let Entry::Occupied(mut request) = self.data_columns_by_root_requests.entry(id) else { - return None; - }; - - let resp = match rpc_event { - RpcEvent::Response(data_column, seen_timestamp) => { - let request = request.get_mut(); - match request.add_response(data_column) { - Ok(Some(data_columns)) => Ok((data_columns, seen_timestamp)), - Ok(None) => return None, - Err(e) => Err((e.into(), request.resolve())), - } - } - RpcEvent::StreamTermination => match request.remove().terminate() { - Ok(_) => return None, - // (err, false = not resolved) because terminate returns Ok() if resolved - Err(e) => Err((e.into(), false)), - }, - RpcEvent::RPCError(e) => Err((e.into(), request.remove().resolve())), - }; + let resp = self + .data_columns_by_root_requests + .on_response(id, rpc_event); + self.report_rpc_response_errors(resp, peer_id) + } - match resp { - Ok(resp) => Some(Ok(resp)), - // Track if this request has already returned some value downstream. Ensure that - // downstream code only receives a single Result per request. If the serving peer does - // multiple penalizable actions per request, downscore and return None. This allows to - // catch if a peer is returning more columns than requested or if the excess blobs are - // invalid. - Err((e, resolved)) => { - if let RpcResponseError::VerifyError(_e) = &e { - // TODO(das): this is a bug, we should not penalise peer in this case. - // confirm this can be removed. - // self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); - } - if resolved { - None - } else { - Some(Err(e)) - } - } + fn report_rpc_response_errors( + &mut self, + resp: Option>, + peer_id: PeerId, + ) -> Option> { + if let Some(Err(RpcResponseError::VerifyError(e))) = &resp { + self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } + resp } /// Insert a downloaded column into an active custody request. Then make progress on the diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 0c2f59d143f..16c4cde44d4 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -1,23 +1,181 @@ +use std::{collections::hash_map::Entry, hash::Hash}; + +use beacon_chain::validator_monitor::timestamp_now; +use fnv::FnvHashMap; +use lighthouse_network::PeerId; use strum::IntoStaticStr; use types::Hash256; -pub use blobs_by_root::{ActiveBlobsByRootRequest, BlobsByRootSingleBlockRequest}; -pub use blocks_by_root::{ActiveBlocksByRootRequest, BlocksByRootSingleRequest}; +pub use blobs_by_root::{BlobsByRootRequestItems, BlobsByRootSingleBlockRequest}; +pub use blocks_by_root::{BlocksByRootRequestItems, BlocksByRootSingleRequest}; pub use data_columns_by_root::{ - ActiveDataColumnsByRootRequest, DataColumnsByRootSingleBlockRequest, + DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest, }; +use crate::metrics; + +use super::{RpcEvent, RpcResponseResult}; + mod blobs_by_root; mod blocks_by_root; mod data_columns_by_root; #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupVerifyError { - NoResponseReturned, - NotEnoughResponsesReturned { expected: usize, actual: usize }, + NotEnoughResponsesReturned { actual: usize }, TooManyResponses, UnrequestedBlockRoot(Hash256), UnrequestedIndex(u64), InvalidInclusionProof, DuplicateData, } + +/// Collection of active requests of a single ReqResp method, i.e. `blocks_by_root` +pub struct ActiveRequests { + requests: FnvHashMap>, + name: &'static str, + expect_max_responses: bool, +} + +/// Stateful container for a single active ReqResp request +struct ActiveRequest { + state: State, + peer_id: PeerId, +} + +enum State { + Active(T), + CompletedEarly, + Errored, +} + +impl ActiveRequests { + pub fn new(expect_max_responses: bool, name: &'static str) -> Self { + Self { + requests: <_>::default(), + name, + expect_max_responses, + } + } + + pub fn insert(&mut self, id: K, peer_id: PeerId, items: T) { + self.requests.insert( + id, + ActiveRequest { + state: State::Active(items), + peer_id, + }, + ); + } + + /// Handle an `RpcEvent` for a specific request index by `id`. + /// + /// Lighthouse ReqResp protocol API promises to send 0 or more `RpcEvent::Response` chunks, + /// and EITHER a single `RpcEvent::RPCError` or RpcEvent::StreamTermination. + /// + /// Downstream code expects to receive a single `Result` value per request ID. However, + /// `add_item` may convert ReqResp success chunks into errors. This function handles the + /// multiple errors / stream termination internally ensuring that a single `Some` is + /// returned. + pub fn on_response( + &mut self, + id: K, + rpc_event: RpcEvent, + ) -> Option>> { + let Entry::Occupied(mut request) = self.requests.entry(id) else { + metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &[self.name]); + return None; + }; + + match rpc_event { + // Handler of a success ReqResp chunk. Adds the item to the request accumulator. + // `ActiveRequestItems` validates the item before appending to its internal state. + RpcEvent::Response(item, seen_timestamp) => { + let request = &mut request.get_mut(); + match &mut request.state { + State::Active(items) => { + match items.add(item) { + // Received all items we are expecting for, return early, but keep the request + // struct to handle the stream termination gracefully. + Ok(true) => { + let items = items.consume(); + request.state = State::CompletedEarly; + Some(Ok((items, seen_timestamp))) + } + // Received item, but we are still expecting more + Ok(false) => None, + // Received an invalid item + Err(e) => { + request.state = State::Errored; + Some(Err(e.into())) + } + } + } + // Should never happen, ReqResp network behaviour enforces a max count of chunks + State::CompletedEarly => None, + // Ignore items after errors. We may want to penalize repeated invalid chunks + // for the same response. But that's an optimization to ban peers sending + // invalid data faster that we choose to not adopt for now. + State::Errored => None, + } + } + RpcEvent::StreamTermination => { + // After stream termination we must forget about this request, there will be no more + // messages coming from the network + match request.remove().state { + // Received a stream termination in a valid sequence, consume items + State::Active(mut items) => { + if self.expect_max_responses { + Some(Err(LookupVerifyError::NotEnoughResponsesReturned { + actual: items.consume().len(), + } + .into())) + } else { + Some(Ok((items.consume(), timestamp_now()))) + } + } + // Items already returned, ignore stream termination + State::CompletedEarly => None, + // Returned an error earlier, ignore stream termination + State::Errored => None, + } + } + RpcEvent::RPCError(e) => { + // After an Error event from the network we must forget about this request as this + // may be the last message for this request. + match request.remove().state { + // Received error while request is still active, propagate error. + State::Active(_) => Some(Err(e.into())), + // Received error after completing the request, ignore the error. This is okay + // because the network has already registered a downscore event if necessary for + // this message. + State::CompletedEarly => None, + // Received a network error after a validity error. Okay to ignore, see above + State::Errored => None, + } + } + } + } + + pub fn active_requests_of_peer(&self, peer_id: &PeerId) -> Vec<&K> { + self.requests + .iter() + .filter(|(_, request)| &request.peer_id == peer_id) + .map(|(id, _)| id) + .collect() + } + + pub fn len(&self) -> usize { + self.requests.len() + } +} + +pub trait ActiveRequestItems { + type Item; + + /// Add a new item into the accumulator. Returns true if all expected items have been received. + fn add(&mut self, item: Self::Item) -> Result; + + /// Return all accumulated items consuming them. + fn consume(&mut self) -> Vec; +} diff --git a/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs b/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs index cb2b1a42ec4..fefb27a5efc 100644 --- a/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs +++ b/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs @@ -1,8 +1,8 @@ -use lighthouse_network::{rpc::methods::BlobsByRootRequest, PeerId}; +use lighthouse_network::rpc::methods::BlobsByRootRequest; use std::sync::Arc; use types::{blob_sidecar::BlobIdentifier, BlobSidecar, ChainSpec, EthSpec, Hash256}; -use super::LookupVerifyError; +use super::{ActiveRequestItems, LookupVerifyError}; #[derive(Debug, Clone)] pub struct BlobsByRootSingleBlockRequest { @@ -25,34 +25,27 @@ impl BlobsByRootSingleBlockRequest { } } -pub struct ActiveBlobsByRootRequest { +pub struct BlobsByRootRequestItems { request: BlobsByRootSingleBlockRequest, - blobs: Vec>>, - resolved: bool, - pub(crate) peer_id: PeerId, + items: Vec>>, } -impl ActiveBlobsByRootRequest { - pub fn new(request: BlobsByRootSingleBlockRequest, peer_id: PeerId) -> Self { +impl BlobsByRootRequestItems { + pub fn new(request: BlobsByRootSingleBlockRequest) -> Self { Self { request, - blobs: vec![], - resolved: false, - peer_id, + items: vec![], } } +} + +impl ActiveRequestItems for BlobsByRootRequestItems { + type Item = Arc>; /// Appends a chunk to this multi-item request. If all expected chunks are received, this /// method returns `Some`, resolving the request before the stream terminator. /// The active request SHOULD be dropped after `add_response` returns an error - pub fn add_response( - &mut self, - blob: Arc>, - ) -> Result>>>, LookupVerifyError> { - if self.resolved { - return Err(LookupVerifyError::TooManyResponses); - } - + fn add(&mut self, blob: Self::Item) -> Result { let block_root = blob.block_root(); if self.request.block_root != block_root { return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); @@ -63,34 +56,16 @@ impl ActiveBlobsByRootRequest { if !self.request.indices.contains(&blob.index) { return Err(LookupVerifyError::UnrequestedIndex(blob.index)); } - if self.blobs.iter().any(|b| b.index == blob.index) { + if self.items.iter().any(|b| b.index == blob.index) { return Err(LookupVerifyError::DuplicateData); } - self.blobs.push(blob); - if self.blobs.len() >= self.request.indices.len() { - // All expected chunks received, return result early - self.resolved = true; - Ok(Some(std::mem::take(&mut self.blobs))) - } else { - Ok(None) - } - } + self.items.push(blob); - pub fn terminate(self) -> Result<(), LookupVerifyError> { - if self.resolved { - Ok(()) - } else { - Err(LookupVerifyError::NotEnoughResponsesReturned { - expected: self.request.indices.len(), - actual: self.blobs.len(), - }) - } + Ok(self.items.len() >= self.request.indices.len()) } - /// Mark request as resolved (= has returned something downstream) while marking this status as - /// true for future calls. - pub fn resolve(&mut self) -> bool { - std::mem::replace(&mut self.resolved, true) + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) } } diff --git a/beacon_node/network/src/sync/network_context/requests/blocks_by_root.rs b/beacon_node/network/src/sync/network_context/requests/blocks_by_root.rs index a15d4e39353..f3cdcbe714f 100644 --- a/beacon_node/network/src/sync/network_context/requests/blocks_by_root.rs +++ b/beacon_node/network/src/sync/network_context/requests/blocks_by_root.rs @@ -1,9 +1,9 @@ use beacon_chain::get_block_root; -use lighthouse_network::{rpc::BlocksByRootRequest, PeerId}; +use lighthouse_network::rpc::BlocksByRootRequest; use std::sync::Arc; use types::{ChainSpec, EthSpec, Hash256, SignedBeaconBlock}; -use super::LookupVerifyError; +use super::{ActiveRequestItems, LookupVerifyError}; #[derive(Debug, Copy, Clone)] pub struct BlocksByRootSingleRequest(pub Hash256); @@ -14,47 +14,38 @@ impl BlocksByRootSingleRequest { } } -pub struct ActiveBlocksByRootRequest { +pub struct BlocksByRootRequestItems { request: BlocksByRootSingleRequest, - resolved: bool, - pub(crate) peer_id: PeerId, + items: Vec>>, } -impl ActiveBlocksByRootRequest { - pub fn new(request: BlocksByRootSingleRequest, peer_id: PeerId) -> Self { +impl BlocksByRootRequestItems { + pub fn new(request: BlocksByRootSingleRequest) -> Self { Self { request, - resolved: false, - peer_id, + items: vec![], } } +} + +impl ActiveRequestItems for BlocksByRootRequestItems { + type Item = Arc>; /// Append a response to the single chunk request. If the chunk is valid, the request is /// resolved immediately. /// The active request SHOULD be dropped after `add_response` returns an error - pub fn add_response( - &mut self, - block: Arc>, - ) -> Result>, LookupVerifyError> { - if self.resolved { - return Err(LookupVerifyError::TooManyResponses); - } - + fn add(&mut self, block: Self::Item) -> Result { let block_root = get_block_root(&block); if self.request.0 != block_root { return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); } - // Valid data, blocks by root expects a single response - self.resolved = true; - Ok(block) + self.items.push(block); + // Always returns true, blocks by root expects a single response + Ok(true) } - pub fn terminate(self) -> Result<(), LookupVerifyError> { - if self.resolved { - Ok(()) - } else { - Err(LookupVerifyError::NoResponseReturned) - } + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) } } diff --git a/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs b/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs index a42ae7ca41f..1b8d46ff072 100644 --- a/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs +++ b/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs @@ -1,9 +1,8 @@ -use lighthouse_network::service::api_types::DataColumnsByRootRequester; -use lighthouse_network::{rpc::methods::DataColumnsByRootRequest, PeerId}; +use lighthouse_network::rpc::methods::DataColumnsByRootRequest; use std::sync::Arc; use types::{ChainSpec, DataColumnIdentifier, DataColumnSidecar, EthSpec, Hash256}; -use super::LookupVerifyError; +use super::{ActiveRequestItems, LookupVerifyError}; #[derive(Debug, Clone)] pub struct DataColumnsByRootSingleBlockRequest { @@ -26,40 +25,27 @@ impl DataColumnsByRootSingleBlockRequest { } } -pub struct ActiveDataColumnsByRootRequest { +pub struct DataColumnsByRootRequestItems { request: DataColumnsByRootSingleBlockRequest, items: Vec>>, - resolved: bool, - pub(crate) peer_id: PeerId, - pub(crate) requester: DataColumnsByRootRequester, } -impl ActiveDataColumnsByRootRequest { - pub fn new( - request: DataColumnsByRootSingleBlockRequest, - peer_id: PeerId, - requester: DataColumnsByRootRequester, - ) -> Self { +impl DataColumnsByRootRequestItems { + pub fn new(request: DataColumnsByRootSingleBlockRequest) -> Self { Self { request, items: vec![], - resolved: false, - peer_id, - requester, } } +} + +impl ActiveRequestItems for DataColumnsByRootRequestItems { + type Item = Arc>; /// Appends a chunk to this multi-item request. If all expected chunks are received, this /// method returns `Some`, resolving the request before the stream terminator. /// The active request SHOULD be dropped after `add_response` returns an error - pub fn add_response( - &mut self, - data_column: Arc>, - ) -> Result>>>, LookupVerifyError> { - if self.resolved { - return Err(LookupVerifyError::TooManyResponses); - } - + fn add(&mut self, data_column: Self::Item) -> Result { let block_root = data_column.block_root(); if self.request.block_root != block_root { return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); @@ -75,29 +61,11 @@ impl ActiveDataColumnsByRootRequest { } self.items.push(data_column); - if self.items.len() >= self.request.indices.len() { - // All expected chunks received, return result early - self.resolved = true; - Ok(Some(std::mem::take(&mut self.items))) - } else { - Ok(None) - } - } - pub fn terminate(self) -> Result<(), LookupVerifyError> { - if self.resolved { - Ok(()) - } else { - Err(LookupVerifyError::NotEnoughResponsesReturned { - expected: self.request.indices.len(), - actual: self.items.len(), - }) - } + Ok(self.items.len() >= self.request.indices.len()) } - /// Mark request as resolved (= has returned something downstream) while marking this status as - /// true for future calls. - pub fn resolve(&mut self) -> bool { - std::mem::replace(&mut self.resolved, true) + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) } } From 9446569a8adff0e44e305a8e5128d825b45f4f6d Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Fri, 27 Sep 2024 19:19:53 +0200 Subject: [PATCH 2/9] Remove impossible to hit test --- .../network/src/sync/block_lookups/tests.rs | 21 +------------------ .../src/sync/network_context/requests.rs | 12 +++++++---- 2 files changed, 9 insertions(+), 24 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index f50bd888c00..0aec00d746b 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -1153,6 +1153,7 @@ impl TestRig { penalty_msg, expect_penalty_msg, "Unexpected penalty msg for {peer_id}" ); + self.log(&format!("Found expected penalty {penalty_msg}")); } pub fn expect_single_penalty(&mut self, peer_id: PeerId, expect_penalty_msg: &'static str) { @@ -2551,11 +2552,6 @@ mod deneb_only { self.blobs.pop().expect("blobs"); self } - fn invalidate_blobs_too_many(mut self) -> Self { - let first_blob = self.blobs.first().expect("blob").clone(); - self.blobs.push(first_blob); - self - } fn expect_block_process(mut self) -> Self { self.rig.expect_block_process(ResponseType::Block); self @@ -2644,21 +2640,6 @@ mod deneb_only { .expect_no_block_request(); } - #[test] - fn single_block_response_then_too_many_blobs_response_attestation() { - let Some(tester) = DenebTester::new(RequestTrigger::AttestationUnknownBlock) else { - return; - }; - tester - .block_response_triggering_process() - .invalidate_blobs_too_many() - .blobs_response() - .expect_penalty("TooManyResponses") - // Network context returns "download success" because the request has enough blobs + it - // downscores the peer for returning too many. - .expect_no_block_request(); - } - // Test peer returning block that has unknown parent, and a new lookup is created #[test] fn parent_block_unknown_parent() { diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 16c4cde44d4..9977e1358f2 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -82,7 +82,7 @@ impl ActiveRequests { id: K, rpc_event: RpcEvent, ) -> Option>> { - let Entry::Occupied(mut request) = self.requests.entry(id) else { + let Entry::Occupied(mut entry) = self.requests.entry(id) else { metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &[self.name]); return None; }; @@ -91,7 +91,7 @@ impl ActiveRequests { // Handler of a success ReqResp chunk. Adds the item to the request accumulator. // `ActiveRequestItems` validates the item before appending to its internal state. RpcEvent::Response(item, seen_timestamp) => { - let request = &mut request.get_mut(); + let request = &mut entry.get_mut(); match &mut request.state { State::Active(items) => { match items.add(item) { @@ -112,6 +112,10 @@ impl ActiveRequests { } } // Should never happen, ReqResp network behaviour enforces a max count of chunks + // When `max_remaining_chunks <= 1` a the inbound stream in terminated in + // `rpc/handler.rs`. Handling this case adds complexity for no gain. Even if an + // attacker could abuse this, there's no gain in sending garbage chunks that + // will be ignored anyway. State::CompletedEarly => None, // Ignore items after errors. We may want to penalize repeated invalid chunks // for the same response. But that's an optimization to ban peers sending @@ -122,7 +126,7 @@ impl ActiveRequests { RpcEvent::StreamTermination => { // After stream termination we must forget about this request, there will be no more // messages coming from the network - match request.remove().state { + match entry.remove().state { // Received a stream termination in a valid sequence, consume items State::Active(mut items) => { if self.expect_max_responses { @@ -143,7 +147,7 @@ impl ActiveRequests { RpcEvent::RPCError(e) => { // After an Error event from the network we must forget about this request as this // may be the last message for this request. - match request.remove().state { + match entry.remove().state { // Received error while request is still active, propagate error. State::Active(_) => Some(Err(e.into())), // Received error after completing the request, ignore the error. This is okay From b9cd5ad88272503a8043c3fa89baac1928479eab Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 3 Oct 2024 23:43:15 +0300 Subject: [PATCH 3/9] Update beacon_node/lighthouse_network/src/service/api_types.rs Co-authored-by: realbigsean --- beacon_node/lighthouse_network/src/service/api_types.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 3e7d19bbdd0..f0824ef4bc3 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -42,8 +42,8 @@ pub enum SyncRequestId { RangeBlockAndBlobs { id: Id }, } -/// Request ID for data_columns_by_root requests. Block lookup do not issue this requests directly. -/// Wrapping this particular req_id, ensures not mixing this requests with a custody req_id. +/// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly. +/// Wrapping this particular req_id, ensures not mixing this request with a custody req_id. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub struct DataColumnsByRootRequestId { pub id: Id, From 715ba8c53bf5ca61204ba2fcbf887bc7600ba698 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 3 Oct 2024 23:43:30 +0300 Subject: [PATCH 4/9] Update beacon_node/network/src/sync/network_context.rs Co-authored-by: realbigsean --- beacon_node/network/src/sync/network_context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index bfb9bd44a6f..66523148193 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -224,7 +224,7 @@ impl SyncNetworkContext { network_send, execution_engine_state: EngineState::Online, // always assume `Online` at the start request_id: 1, - // true = enfore max_requests as returned for blocks_by_root. We always request a single + // true = enforce max_requests as returned for blocks_by_root. We always request a single // block and the peer must have it. blocks_by_root_requests: ActiveRequests::new(true, "blocks_by_root"), // true = enfore max_requests are returned for blobs_by_root. We only issue requests for From 0d593b8295ba6f52bfcfb69acfc6bf239e86f6be Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 3 Oct 2024 23:43:37 +0300 Subject: [PATCH 5/9] Update beacon_node/network/src/sync/network_context.rs Co-authored-by: realbigsean --- beacon_node/network/src/sync/network_context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 66523148193..c646e049d29 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -227,7 +227,7 @@ impl SyncNetworkContext { // true = enforce max_requests as returned for blocks_by_root. We always request a single // block and the peer must have it. blocks_by_root_requests: ActiveRequests::new(true, "blocks_by_root"), - // true = enfore max_requests are returned for blobs_by_root. We only issue requests for + // true = enforce max_requests are returned for blobs_by_root. We only issue requests for // blocks after we know the block has data, and only request peers after they claim to // have imported the block+blobs. blobs_by_root_requests: ActiveRequests::new(true, "blobs_by_root"), From 1270322ddd2a4e61fd20a0e76279ca9ab4a83cb3 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 3 Oct 2024 23:58:19 +0300 Subject: [PATCH 6/9] Simplify match --- .../network/src/sync/network_context.rs | 53 +++++++++---------- 1 file changed, 25 insertions(+), 28 deletions(-) diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index c646e049d29..70e4ef4a3bb 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -921,26 +921,23 @@ impl SyncNetworkContext { peer_id: PeerId, rpc_event: RpcEvent>>, ) -> Option>>> { - let r = self.blocks_by_root_requests.on_response(id, rpc_event); - let r = match r { - // Enforce that exactly one chunk = one block is returned. ReqResp behavior limits the - // response count to at most 1. - Some(Ok((mut blocks, seen_timestamp))) => match blocks.pop() { - Some(block) => Some(Ok((block, seen_timestamp))), - // Should never happen, `blocks_by_root_requests` enforces that we receive at least - // 1 chunk. - None => Some(Err(LookupVerifyError::NotEnoughResponsesReturned { - actual: 0, + let response = self.blocks_by_root_requests.on_response(id, rpc_event); + let response = response.map(|res| { + res.and_then(|(mut blocks, seen_timestamp)| { + // Enforce that exactly one chunk = one block is returned. ReqResp behavior limits the + // response count to at most 1. + match blocks.pop() { + Some(block) => Ok((block, seen_timestamp)), + // Should never happen, `blocks_by_root_requests` enforces that we receive at least + // 1 chunk. + None => Err(LookupVerifyError::NotEnoughResponsesReturned { actual: 0 }.into()), } - .into())), - }, - Some(Err(e)) => Some(Err(e)), - None => None, - }; - if let Some(Err(RpcResponseError::VerifyError(e))) = &r { + }) + }); + if let Some(Err(RpcResponseError::VerifyError(e))) = &response { self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } - r + response } pub(crate) fn on_single_blob_response( @@ -949,19 +946,19 @@ impl SyncNetworkContext { peer_id: PeerId, rpc_event: RpcEvent>>, ) -> Option>> { - let r = self.blobs_by_root_requests.on_response(id, rpc_event); - let r = match r { - Some(Ok((blobs, seen_timestamp))) => match to_fixed_blob_sidecar_list(blobs) { - Ok(blobs) => Some(Ok((blobs, seen_timestamp))), - Err(e) => Some(Err(e.into())), - }, - Some(Err(e)) => Some(Err(e)), - None => None, - }; - if let Some(Err(RpcResponseError::VerifyError(e))) = &r { + let response = self.blobs_by_root_requests.on_response(id, rpc_event); + let response = response.map(|res| { + res.and_then( + |(blobs, seen_timestamp)| match to_fixed_blob_sidecar_list(blobs) { + Ok(blobs) => Ok((blobs, seen_timestamp)), + Err(e) => Err(e.into()), + }, + ) + }); + if let Some(Err(RpcResponseError::VerifyError(e))) = &response { self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } - r + response } #[allow(clippy::type_complexity)] From 1910bf6d485a77d5cc7f32c2f2c1e31a1432b85f Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Fri, 4 Oct 2024 00:17:54 +0300 Subject: [PATCH 7/9] Fix display --- beacon_node/lighthouse_network/src/service/api_types.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index f0824ef4bc3..ea695d4e94c 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -248,8 +248,9 @@ impl slog::Value for RequestId { } } +// This custom impl reduces log boilerplate not printing `DataColumnsByRootRequestId` on each id log impl std::fmt::Display for DataColumnsByRootRequestId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.id) + write!(f, "{} {:?}", self.id, self.requester) } } From af35a272549b9a1d51204dece73dfc38115e97df Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Sat, 5 Oct 2024 15:39:32 +0300 Subject: [PATCH 8/9] Sampling requests should not expect all responses --- .../network/src/sync/network_context.rs | 37 +++++++++++-------- .../src/sync/network_context/custody.rs | 4 ++ .../src/sync/network_context/requests.rs | 14 ++++--- beacon_node/network/src/sync/sampling.rs | 18 ++++++++- 4 files changed, 51 insertions(+), 22 deletions(-) diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index f77b7ebd7da..a4905c93eba 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -239,17 +239,9 @@ impl SyncNetworkContext { network_send, execution_engine_state: EngineState::Online, // always assume `Online` at the start request_id: 1, - // true = enforce max_requests as returned for blocks_by_root. We always request a single - // block and the peer must have it. - blocks_by_root_requests: ActiveRequests::new(true, "blocks_by_root"), - // true = enforce max_requests are returned for blobs_by_root. We only issue requests for - // blocks after we know the block has data, and only request peers after they claim to - // have imported the block+blobs. - blobs_by_root_requests: ActiveRequests::new(true, "blobs_by_root"), - // true = enforce max_requests are returned data_columns_by_root. We only issue requests - // for blocks after we know the block has data, and only request peers after they claim to - // have imported the block+columns and claim to be custodians - data_columns_by_root_requests: ActiveRequests::new(true, "data_columns_by_root"), + blocks_by_root_requests: ActiveRequests::new("blocks_by_root"), + blobs_by_root_requests: ActiveRequests::new("blobs_by_root"), + data_columns_by_root_requests: ActiveRequests::new("data_columns_by_root"), custody_by_root_requests: <_>::default(), range_block_components_requests: FnvHashMap::default(), network_beacon_processor, @@ -603,8 +595,14 @@ impl SyncNetworkContext { }) .map_err(|_| RpcRequestSendError::NetworkSendError)?; - self.blocks_by_root_requests - .insert(id, peer_id, BlocksByRootRequestItems::new(request)); + self.blocks_by_root_requests.insert( + id, + peer_id, + // true = enforce max_requests as returned for blocks_by_root. We always request a single + // block and the peer must have it. + true, + BlocksByRootRequestItems::new(request), + ); Ok(LookupRequestResult::RequestSent(req_id)) } @@ -701,8 +699,15 @@ impl SyncNetworkContext { }) .map_err(|_| RpcRequestSendError::NetworkSendError)?; - self.blobs_by_root_requests - .insert(id, peer_id, BlobsByRootRequestItems::new(request)); + self.blobs_by_root_requests.insert( + id, + peer_id, + // true = enforce max_requests are returned for blobs_by_root. We only issue requests for + // blocks after we know the block has data, and only request peers after they claim to + // have imported the block+blobs. + true, + BlobsByRootRequestItems::new(request), + ); Ok(LookupRequestResult::RequestSent(req_id)) } @@ -713,6 +718,7 @@ impl SyncNetworkContext { requester: DataColumnsByRootRequester, peer_id: PeerId, request: DataColumnsByRootSingleBlockRequest, + expect_max_responses: bool, ) -> Result, &'static str> { let req_id = DataColumnsByRootRequestId { id: self.next_id(), @@ -738,6 +744,7 @@ impl SyncNetworkContext { self.data_columns_by_root_requests.insert( req_id, peer_id, + expect_max_responses, DataColumnsByRootRequestItems::new(request), ); diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index 6736bfb82f0..e4bce3dafcd 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -283,6 +283,10 @@ impl ActiveCustodyRequest { block_root: self.block_root, indices: indices.clone(), }, + // true = enforce max_requests are returned data_columns_by_root. We only issue requests + // for blocks after we know the block has data, and only request peers after they claim to + // have imported the block+columns and claim to be custodians + true, ) .map_err(Error::SendFailed)?; diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 9977e1358f2..b9214bafcd7 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -34,13 +34,14 @@ pub enum LookupVerifyError { pub struct ActiveRequests { requests: FnvHashMap>, name: &'static str, - expect_max_responses: bool, } /// Stateful container for a single active ReqResp request struct ActiveRequest { state: State, peer_id: PeerId, + // Error if the request terminates before receiving max expected responses + expect_max_responses: bool, } enum State { @@ -50,20 +51,20 @@ enum State { } impl ActiveRequests { - pub fn new(expect_max_responses: bool, name: &'static str) -> Self { + pub fn new(name: &'static str) -> Self { Self { requests: <_>::default(), name, - expect_max_responses, } } - pub fn insert(&mut self, id: K, peer_id: PeerId, items: T) { + pub fn insert(&mut self, id: K, peer_id: PeerId, expect_max_responses: bool, items: T) { self.requests.insert( id, ActiveRequest { state: State::Active(items), peer_id, + expect_max_responses, }, ); } @@ -126,10 +127,11 @@ impl ActiveRequests { RpcEvent::StreamTermination => { // After stream termination we must forget about this request, there will be no more // messages coming from the network - match entry.remove().state { + let request = entry.remove(); + match request.state { // Received a stream termination in a valid sequence, consume items State::Active(mut items) => { - if self.expect_max_responses { + if request.expect_max_responses { Some(Err(LookupVerifyError::NotEnoughResponsesReturned { actual: items.consume().len(), } diff --git a/beacon_node/network/src/sync/sampling.rs b/beacon_node/network/src/sync/sampling.rs index 4d0fa509cd5..202b7ee2a26 100644 --- a/beacon_node/network/src/sync/sampling.rs +++ b/beacon_node/network/src/sync/sampling.rs @@ -74,7 +74,11 @@ impl Sampling { } }; - debug!(self.log, "Created new sample request"; "id" => ?id); + debug!(self.log, + "Created new sample request"; + "id" => ?id, + "column_selection" => ?request.column_selection() + ); // TOOD(das): If a node has very little peers, continue_sampling() will attempt to find enough // to sample here, immediately failing the sampling request. There should be some grace @@ -220,6 +224,14 @@ impl ActiveSamplingRequest { } } + pub(crate) fn column_selection(&self) -> Vec { + self.column_shuffle + .iter() + .take(REQUIRED_SUCCESSES[0]) + .copied() + .collect() + } + /// Insert a downloaded column into an active sampling request. Then make progress on the /// entire request. /// @@ -508,6 +520,10 @@ impl ActiveSamplingRequest { block_root: self.block_root, indices: column_indexes.clone(), }, + // false = We issue request to custodians who may or may not have received the + // samples yet. We don't any signal (like an attestation or status messages that the + // custodian has received data). + false, ) .map_err(SamplingError::SendFailed)?; self.column_indexes_by_sampling_request From a0844e1219d1e577fa39fd22be95aa6b2b5057ce Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 17 Oct 2024 00:21:20 +0300 Subject: [PATCH 9/9] Fix sampling_batch_requests_not_enough_responses_returned test --- beacon_node/network/src/sync/block_lookups/tests.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 776b3a316f7..3dc897e77d3 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -2094,7 +2094,7 @@ fn sampling_batch_requests_not_enough_responses_returned() { r.assert_sampling_request_status(block_root, &column_indexes, &vec![]); // Split the indexes to simulate the case where the supernode doesn't have the requested column. - let (_column_indexes_supernode_does_not_have, column_indexes_to_complete) = + let (column_indexes_supernode_does_not_have, column_indexes_to_complete) = column_indexes.split_at(1); // Complete the requests but only partially, so a NotEnoughResponsesReturned error occurs. @@ -2109,7 +2109,11 @@ fn sampling_batch_requests_not_enough_responses_returned() { ); // The request status should be set to NoPeers since the supernode, the only peer, returned not enough responses. - r.assert_sampling_request_status(block_root, &vec![], &column_indexes); + r.assert_sampling_request_status( + block_root, + &vec![], + &column_indexes_supernode_does_not_have.to_vec(), + ); // The sampling request stalls. r.expect_empty_network();