Skip to content

Commit

Permalink
Generalize sync ActiveRequests
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Sep 17, 2024
1 parent e0ccadb commit 043d52b
Show file tree
Hide file tree
Showing 8 changed files with 327 additions and 304 deletions.
17 changes: 10 additions & 7 deletions beacon_node/lighthouse_network/src/service/api_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@ pub struct SingleLookupReqId {
pub req_id: Id,
}

/// Request ID for data_columns_by_root requests. Block lookup do not issue this requests directly.
/// Wrapping this particular req_id, ensures not mixing this requests with a custody req_id.
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct DataColumnsByRootRequestId(pub Id);

/// Id of rpc requests sent by sync to the network.
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum SyncRequestId {
Expand All @@ -42,11 +37,19 @@ pub enum SyncRequestId {
/// Request searching for a set of blobs given a hash.
SingleBlob { id: SingleLookupReqId },
/// Request searching for a set of data columns given a hash and list of column indices.
DataColumnsByRoot(DataColumnsByRootRequestId, DataColumnsByRootRequester),
DataColumnsByRoot(DataColumnsByRootRequestId),
/// Range request that is composed by both a block range request and a blob range request.
RangeBlockAndBlobs { id: Id },
}

/// Request ID for data_columns_by_root requests. Block lookup do not issue this requests directly.
/// Wrapping this particular req_id, ensures not mixing this requests with a custody req_id.
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct DataColumnsByRootRequestId {
pub id: Id,
pub requester: DataColumnsByRootRequester,
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum DataColumnsByRootRequester {
Sampling(SamplingId),
Expand Down Expand Up @@ -247,6 +250,6 @@ impl slog::Value for RequestId {

impl std::fmt::Display for DataColumnsByRootRequestId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
write!(f, "{}", self.id)
}
}
33 changes: 17 additions & 16 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use beacon_chain::{
use beacon_processor::WorkEvent;
use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode};
use lighthouse_network::service::api_types::{
AppRequestId, DataColumnsByRootRequester, Id, SamplingRequester, SingleLookupReqId,
SyncRequestId,
AppRequestId, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, SamplingRequester,
SingleLookupReqId, SyncRequestId,
};
use lighthouse_network::types::SyncState;
use lighthouse_network::{NetworkGlobals, Request};
Expand Down Expand Up @@ -713,10 +713,10 @@ impl TestRig {
let first_dc = data_columns.first().unwrap();
let block_root = first_dc.block_root();
let sampling_request_id = match id.0 {
SyncRequestId::DataColumnsByRoot(
_,
_requester @ DataColumnsByRootRequester::Sampling(sampling_id),
) => sampling_id.sampling_request_id,
SyncRequestId::DataColumnsByRoot(DataColumnsByRootRequestId {
requester: DataColumnsByRootRequester::Sampling(sampling_id),
..
}) => sampling_id.sampling_request_id,
_ => unreachable!(),
};
self.complete_data_columns_by_root_request(id, data_columns);
Expand All @@ -741,14 +741,15 @@ impl TestRig {
data_columns: Vec<Arc<DataColumnSidecar<E>>>,
missing_components: bool,
) {
let lookup_id =
if let SyncRequestId::DataColumnsByRoot(_, DataColumnsByRootRequester::Custody(id)) =
ids.first().unwrap().0
{
id.requester.0.lookup_id
} else {
panic!("not a custody requester")
};
let lookup_id = if let SyncRequestId::DataColumnsByRoot(DataColumnsByRootRequestId {
requester: DataColumnsByRootRequester::Custody(id),
..
}) = ids.first().unwrap().0
{
id.requester.0.lookup_id
} else {
panic!("not a custody requester")
};

let first_column = data_columns.first().cloned().unwrap();

Expand Down Expand Up @@ -1339,7 +1340,7 @@ fn test_single_block_lookup_empty_response() {

// The peer does not have the block. It should be penalized.
r.single_lookup_block_response(id, peer_id, None);
r.expect_penalty(peer_id, "NoResponseReturned");
r.expect_penalty(peer_id, "NotEnoughResponsesReturned");
// it should be retried
let id = r.expect_block_lookup_request(block_root);
// Send the right block this time.
Expand Down Expand Up @@ -2698,7 +2699,7 @@ mod deneb_only {
};
tester
.empty_block_response()
.expect_penalty("NoResponseReturned")
.expect_penalty("NotEnoughResponsesReturned")
.expect_block_request()
.expect_no_blobs_request()
.block_response_and_expect_blob_request()
Expand Down
16 changes: 5 additions & 11 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,13 +380,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
SyncRequestId::SingleBlob { id } => {
self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error))
}
SyncRequestId::DataColumnsByRoot(req_id, requester) => self
.on_data_columns_by_root_response(
req_id,
requester,
peer_id,
RpcEvent::RPCError(error),
),
SyncRequestId::DataColumnsByRoot(req_id) => {
self.on_data_columns_by_root_response(req_id, peer_id, RpcEvent::RPCError(error))
}
SyncRequestId::RangeBlockAndBlobs { id } => {
if let Some(sender_id) = self.network.range_request_failed(id) {
match sender_id {
Expand Down Expand Up @@ -991,10 +987,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
seen_timestamp: Duration,
) {
match request_id {
SyncRequestId::DataColumnsByRoot(req_id, requester) => {
SyncRequestId::DataColumnsByRoot(req_id) => {
self.on_data_columns_by_root_response(
req_id,
requester,
peer_id,
match data_column {
Some(data_column) => RpcEvent::Response(data_column, seen_timestamp),
Expand Down Expand Up @@ -1036,15 +1031,14 @@ impl<T: BeaconChainTypes> SyncManager<T> {
fn on_data_columns_by_root_response(
&mut self,
req_id: DataColumnsByRootRequestId,
requester: DataColumnsByRootRequester,
peer_id: PeerId,
data_column: RpcEvent<Arc<DataColumnSidecar<T::EthSpec>>>,
) {
if let Some(resp) =
self.network
.on_data_columns_by_root_response(req_id, peer_id, data_column)
{
match requester {
match req_id.requester {
DataColumnsByRootRequester::Sampling(id) => {
if let Some((requester, result)) =
self.sampling
Expand Down
Loading

0 comments on commit 043d52b

Please sign in to comment.