Skip to content

Commit

Permalink
fix Rpc Ping sequence number (#6408)
Browse files Browse the repository at this point in the history
* fix Rpc Ping sequence number

* bubble up Outbound Err's and Responses even if the peer disconnected

* send pings via Rpc from main network

* add comment to connected check

* Merge branch 'unstable' into fix-ping-seq-number
  • Loading branch information
jxs authored Sep 26, 2024
1 parent 50d8375 commit 5d1ff7c
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 94 deletions.
24 changes: 2 additions & 22 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use super::methods::{GoodbyeReason, RPCCodedResponse, RPCResponseErrorCode};
use super::outbound::OutboundRequestContainer;
use super::protocol::{InboundOutput, InboundRequest, Protocol, RPCError, RPCProtocol};
use super::{RPCReceived, RPCResponse, RPCSend, ReqId};
use super::{RPCReceived, RPCSend, ReqId};
use crate::rpc::outbound::{OutboundFramed, OutboundRequest};
use crate::rpc::protocol::InboundFramed;
use fnv::FnvHashMap;
Expand All @@ -14,8 +14,7 @@ use libp2p::swarm::handler::{
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol,
};
use libp2p::swarm::{ConnectionId, Stream};
use libp2p::PeerId;
use libp2p::swarm::Stream;
use slog::{crit, debug, trace};
use smallvec::SmallVec;
use std::{
Expand Down Expand Up @@ -89,12 +88,6 @@ pub struct RPCHandler<Id, E>
where
E: EthSpec,
{
/// This `ConnectionId`.
id: ConnectionId,

/// The matching `PeerId` of this connection.
peer_id: PeerId,

/// The upgrade for inbound substreams.
listen_protocol: SubstreamProtocol<RPCProtocol<E>, ()>,

Expand Down Expand Up @@ -225,16 +218,12 @@ where
E: EthSpec,
{
pub fn new(
id: ConnectionId,
peer_id: PeerId,
listen_protocol: SubstreamProtocol<RPCProtocol<E>, ()>,
fork_context: Arc<ForkContext>,
log: &slog::Logger,
resp_timeout: Duration,
) -> Self {
RPCHandler {
id,
peer_id,
listen_protocol,
events_out: SmallVec::new(),
dial_queue: SmallVec::new(),
Expand Down Expand Up @@ -903,15 +892,6 @@ where
self.shutdown(None);
}

// If we received a Ping, we queue a Pong response.
if let InboundRequest::Ping(ping) = req {
trace!(self.log, "Received Ping, queueing Pong";"connection_id" => %self.id, "peer_id" => %self.peer_id);
self.send_response(
self.current_inbound_substream_id,
RPCCodedResponse::Success(RPCResponse::Pong(ping)),
);
}

self.events_out.push(HandlerEvent::Ok(RPCReceived::Request(
self.current_inbound_substream_id,
req,
Expand Down
57 changes: 38 additions & 19 deletions beacon_node/lighthouse_network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use libp2p::swarm::{
use libp2p::swarm::{ConnectionClosed, FromSwarm, SubstreamProtocol, THandlerInEvent};
use libp2p::PeerId;
use rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr};
use slog::{crit, debug, o};
use slog::{crit, debug, o, trace};
use std::marker::PhantomData;
use std::sync::Arc;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -132,6 +132,8 @@ pub struct RPC<Id: ReqId, E: EthSpec> {
log: slog::Logger,
/// Networking constant values
network_params: NetworkParams,
/// A sequential counter indicating when data gets modified.
seq_number: u64,
}

impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
Expand All @@ -142,6 +144,7 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
outbound_rate_limiter_config: Option<OutboundRateLimiterConfig>,
log: slog::Logger,
network_params: NetworkParams,
seq_number: u64,
) -> Self {
let log = log.new(o!("service" => "libp2p_rpc"));

Expand All @@ -163,6 +166,7 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
enable_light_client_server,
log,
network_params,
seq_number,
}
}

Expand Down Expand Up @@ -214,6 +218,19 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
event: RPCSend::Shutdown(id, reason),
});
}

pub fn update_seq_number(&mut self, seq_number: u64) {
self.seq_number = seq_number
}

/// Send a Ping request to the destination `PeerId` via `ConnectionId`.
pub fn ping(&mut self, peer_id: PeerId, id: Id) {
let ping = Ping {
data: self.seq_number,
};
trace!(self.log, "Sending Ping"; "peer_id" => %peer_id);
self.send_request(peer_id, id, OutboundRequest::Ping(ping));
}
}

impl<Id, E> NetworkBehaviour for RPC<Id, E>
Expand Down Expand Up @@ -245,8 +262,6 @@ where
.log
.new(slog::o!("peer_id" => peer_id.to_string(), "connection_id" => connection_id.to_string()));
let handler = RPCHandler::new(
connection_id,
peer_id,
protocol,
self.fork_context.clone(),
&log,
Expand Down Expand Up @@ -280,8 +295,6 @@ where
.new(slog::o!("peer_id" => peer_id.to_string(), "connection_id" => connection_id.to_string()));

let handler = RPCHandler::new(
connection_id,
peer_id,
protocol,
self.fork_context.clone(),
&log,
Expand Down Expand Up @@ -359,14 +372,6 @@ where
if let Some(limiter) = self.limiter.as_mut() {
// check if the request is conformant to the quota
match limiter.allows(&peer_id, &req) {
Ok(()) => {
// send the event to the user
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
conn_id,
message: Ok(RPCReceived::Request(id, req)),
}))
}
Err(RateLimitedErr::TooLarge) => {
// we set the batch sizes, so this is a coding/config err for most protocols
let protocol = req.versioned_protocol().protocol();
Expand Down Expand Up @@ -394,6 +399,7 @@ where
"Rate limited. Request too large".into(),
),
);
return;
}
Err(RateLimitedErr::TooSoon(wait_time)) => {
debug!(self.log, "Request exceeds the rate limit";
Expand All @@ -408,16 +414,29 @@ where
format!("Wait {:?}", wait_time).into(),
),
);
return;
}
// No rate limiting, continue.
Ok(_) => {}
}
} else {
// No rate limiting, send the event to the user
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
}
// If we received a Ping, we queue a Pong response.
if let InboundRequest::Ping(_) = req {
trace!(self.log, "Received Ping, queueing Pong";"connection_id" => %conn_id, "peer_id" => %peer_id);
self.send_response(
peer_id,
conn_id,
message: Ok(RPCReceived::Request(id, req)),
}))
(conn_id, id),
RPCCodedResponse::Success(RPCResponse::Pong(Ping {
data: self.seq_number,
})),
);
}

self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
conn_id,
message: Ok(RPCReceived::Request(id, req)),
}));
}
HandlerEvent::Ok(rpc) => {
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
Expand Down
102 changes: 49 additions & 53 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,38 +159,36 @@ impl<E: EthSpec> Network<E> {
.collect();

// set up a collection of variables accessible outside of the network crate
let network_globals = {
// Create an ENR or load from disk if appropriate
let enr = crate::discovery::enr::build_or_load_enr::<E>(
local_keypair.clone(),
&config,
&ctx.enr_fork_id,
&log,
&ctx.chain_spec,
)?;
// Construct the metadata
let custody_subnet_count = if ctx.chain_spec.is_peer_das_scheduled() {
if config.subscribe_all_data_column_subnets {
Some(ctx.chain_spec.data_column_sidecar_subnet_count)
} else {
Some(ctx.chain_spec.custody_requirement)
}
// Create an ENR or load from disk if appropriate
let enr = crate::discovery::enr::build_or_load_enr::<E>(
local_keypair.clone(),
&config,
&ctx.enr_fork_id,
&log,
&ctx.chain_spec,
)?;

// Construct the metadata
let custody_subnet_count = ctx.chain_spec.is_peer_das_scheduled().then(|| {
if config.subscribe_all_data_column_subnets {
ctx.chain_spec.data_column_sidecar_subnet_count
} else {
None
};
let meta_data =
utils::load_or_build_metadata(&config.network_dir, custody_subnet_count, &log);
let globals = NetworkGlobals::new(
enr,
meta_data,
trusted_peers,
config.disable_peer_scoring,
&log,
config.clone(),
ctx.chain_spec.clone(),
);
Arc::new(globals)
};
ctx.chain_spec.custody_requirement
}
});
let meta_data =
utils::load_or_build_metadata(&config.network_dir, custody_subnet_count, &log);
let seq_number = *meta_data.seq_number();
let globals = NetworkGlobals::new(
enr,
meta_data,
trusted_peers,
config.disable_peer_scoring,
&log,
config.clone(),
ctx.chain_spec.clone(),
);
let network_globals = Arc::new(globals);

// Grab our local ENR FORK ID
let enr_fork_id = network_globals
Expand Down Expand Up @@ -338,6 +336,7 @@ impl<E: EthSpec> Network<E> {
config.outbound_rate_limiter_config.clone(),
log.clone(),
network_params,
seq_number,
);

let discovery = {
Expand Down Expand Up @@ -1104,33 +1103,26 @@ impl<E: EthSpec> Network<E> {
.sync_committee_bitfield::<E>()
.expect("Local discovery must have sync committee bitfield");

{
// write lock scope
let mut meta_data = self.network_globals.local_metadata.write();
// write lock scope
let mut meta_data_w = self.network_globals.local_metadata.write();

*meta_data.seq_number_mut() += 1;
*meta_data.attnets_mut() = local_attnets;
if let Ok(syncnets) = meta_data.syncnets_mut() {
*syncnets = local_syncnets;
}
*meta_data_w.seq_number_mut() += 1;
*meta_data_w.attnets_mut() = local_attnets;
if let Ok(syncnets) = meta_data_w.syncnets_mut() {
*syncnets = local_syncnets;
}
let seq_number = *meta_data_w.seq_number();
let meta_data = meta_data_w.clone();

drop(meta_data_w);
self.eth2_rpc_mut().update_seq_number(seq_number);
// Save the updated metadata to disk
utils::save_metadata_to_disk(
&self.network_dir,
self.network_globals.local_metadata.read().clone(),
&self.log,
);
utils::save_metadata_to_disk(&self.network_dir, meta_data, &self.log);
}

/// Sends a Ping request to the peer.
fn ping(&mut self, peer_id: PeerId) {
let ping = crate::rpc::Ping {
data: *self.network_globals.local_metadata.read().seq_number(),
};
trace!(self.log, "Sending Ping"; "peer_id" => %peer_id);
let id = RequestId::Internal;
self.eth2_rpc_mut()
.send_request(peer_id, id, OutboundRequest::Ping(ping));
self.eth2_rpc_mut().ping(peer_id, RequestId::Internal);
}

/// Sends a METADATA request to a peer.
Expand Down Expand Up @@ -1400,8 +1392,12 @@ impl<E: EthSpec> Network<E> {
fn inject_rpc_event(&mut self, event: RPCMessage<RequestId, E>) -> Option<NetworkEvent<E>> {
let peer_id = event.peer_id;

// Do not permit Inbound events from peers that are being disconnected, or RPC requests.
if !self.peer_manager().is_connected(&peer_id) {
// Do not permit Inbound events from peers that are being disconnected or RPC requests,
// but allow `RpcFailed` and `HandlerErr::Outbound` to be bubble up to sync for state management.
if !self.peer_manager().is_connected(&peer_id)
&& (matches!(event.message, Err(HandlerErr::Inbound { .. }))
|| matches!(event.message, Ok(RPCReceived::Request(..))))
{
debug!(
self.log,
"Ignoring rpc message of disconnecting peer";
Expand Down

0 comments on commit 5d1ff7c

Please sign in to comment.