diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index 08e55e50c9c..6f338ebc8be 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -4,7 +4,7 @@ use super::methods::{GoodbyeReason, RPCCodedResponse, RPCResponseErrorCode}; use super::outbound::OutboundRequestContainer; use super::protocol::{InboundOutput, InboundRequest, Protocol, RPCError, RPCProtocol}; -use super::{RPCReceived, RPCResponse, RPCSend, ReqId}; +use super::{RPCReceived, RPCSend, ReqId}; use crate::rpc::outbound::{OutboundFramed, OutboundRequest}; use crate::rpc::protocol::InboundFramed; use fnv::FnvHashMap; @@ -14,8 +14,7 @@ use libp2p::swarm::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol, }; -use libp2p::swarm::{ConnectionId, Stream}; -use libp2p::PeerId; +use libp2p::swarm::Stream; use slog::{crit, debug, trace}; use smallvec::SmallVec; use std::{ @@ -89,12 +88,6 @@ pub struct RPCHandler where E: EthSpec, { - /// This `ConnectionId`. - id: ConnectionId, - - /// The matching `PeerId` of this connection. - peer_id: PeerId, - /// The upgrade for inbound substreams. listen_protocol: SubstreamProtocol, ()>, @@ -225,16 +218,12 @@ where E: EthSpec, { pub fn new( - id: ConnectionId, - peer_id: PeerId, listen_protocol: SubstreamProtocol, ()>, fork_context: Arc, log: &slog::Logger, resp_timeout: Duration, ) -> Self { RPCHandler { - id, - peer_id, listen_protocol, events_out: SmallVec::new(), dial_queue: SmallVec::new(), @@ -903,15 +892,6 @@ where self.shutdown(None); } - // If we received a Ping, we queue a Pong response. - if let InboundRequest::Ping(ping) = req { - trace!(self.log, "Received Ping, queueing Pong";"connection_id" => %self.id, "peer_id" => %self.peer_id); - self.send_response( - self.current_inbound_substream_id, - RPCCodedResponse::Success(RPCResponse::Pong(ping)), - ); - } - self.events_out.push(HandlerEvent::Ok(RPCReceived::Request( self.current_inbound_substream_id, req, diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index eae206e022d..4961c31d28e 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -14,7 +14,7 @@ use libp2p::swarm::{ use libp2p::swarm::{ConnectionClosed, FromSwarm, SubstreamProtocol, THandlerInEvent}; use libp2p::PeerId; use rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr}; -use slog::{crit, debug, o}; +use slog::{crit, debug, o, trace}; use std::marker::PhantomData; use std::sync::Arc; use std::task::{Context, Poll}; @@ -132,6 +132,8 @@ pub struct RPC { log: slog::Logger, /// Networking constant values network_params: NetworkParams, + /// A sequential counter indicating when data gets modified. + seq_number: u64, } impl RPC { @@ -142,6 +144,7 @@ impl RPC { outbound_rate_limiter_config: Option, log: slog::Logger, network_params: NetworkParams, + seq_number: u64, ) -> Self { let log = log.new(o!("service" => "libp2p_rpc")); @@ -163,6 +166,7 @@ impl RPC { enable_light_client_server, log, network_params, + seq_number, } } @@ -214,6 +218,19 @@ impl RPC { event: RPCSend::Shutdown(id, reason), }); } + + pub fn update_seq_number(&mut self, seq_number: u64) { + self.seq_number = seq_number + } + + /// Send a Ping request to the destination `PeerId` via `ConnectionId`. + pub fn ping(&mut self, peer_id: PeerId, id: Id) { + let ping = Ping { + data: self.seq_number, + }; + trace!(self.log, "Sending Ping"; "peer_id" => %peer_id); + self.send_request(peer_id, id, OutboundRequest::Ping(ping)); + } } impl NetworkBehaviour for RPC @@ -245,8 +262,6 @@ where .log .new(slog::o!("peer_id" => peer_id.to_string(), "connection_id" => connection_id.to_string())); let handler = RPCHandler::new( - connection_id, - peer_id, protocol, self.fork_context.clone(), &log, @@ -280,8 +295,6 @@ where .new(slog::o!("peer_id" => peer_id.to_string(), "connection_id" => connection_id.to_string())); let handler = RPCHandler::new( - connection_id, - peer_id, protocol, self.fork_context.clone(), &log, @@ -359,14 +372,6 @@ where if let Some(limiter) = self.limiter.as_mut() { // check if the request is conformant to the quota match limiter.allows(&peer_id, &req) { - Ok(()) => { - // send the event to the user - self.events.push(ToSwarm::GenerateEvent(RPCMessage { - peer_id, - conn_id, - message: Ok(RPCReceived::Request(id, req)), - })) - } Err(RateLimitedErr::TooLarge) => { // we set the batch sizes, so this is a coding/config err for most protocols let protocol = req.versioned_protocol().protocol(); @@ -394,6 +399,7 @@ where "Rate limited. Request too large".into(), ), ); + return; } Err(RateLimitedErr::TooSoon(wait_time)) => { debug!(self.log, "Request exceeds the rate limit"; @@ -408,16 +414,29 @@ where format!("Wait {:?}", wait_time).into(), ), ); + return; } + // No rate limiting, continue. + Ok(_) => {} } - } else { - // No rate limiting, send the event to the user - self.events.push(ToSwarm::GenerateEvent(RPCMessage { + } + // If we received a Ping, we queue a Pong response. + if let InboundRequest::Ping(_) = req { + trace!(self.log, "Received Ping, queueing Pong";"connection_id" => %conn_id, "peer_id" => %peer_id); + self.send_response( peer_id, - conn_id, - message: Ok(RPCReceived::Request(id, req)), - })) + (conn_id, id), + RPCCodedResponse::Success(RPCResponse::Pong(Ping { + data: self.seq_number, + })), + ); } + + self.events.push(ToSwarm::GenerateEvent(RPCMessage { + peer_id, + conn_id, + message: Ok(RPCReceived::Request(id, req)), + })); } HandlerEvent::Ok(rpc) => { self.events.push(ToSwarm::GenerateEvent(RPCMessage { diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 43217ba5ab6..ede8fdd13a7 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -159,38 +159,36 @@ impl Network { .collect(); // set up a collection of variables accessible outside of the network crate - let network_globals = { - // Create an ENR or load from disk if appropriate - let enr = crate::discovery::enr::build_or_load_enr::( - local_keypair.clone(), - &config, - &ctx.enr_fork_id, - &log, - &ctx.chain_spec, - )?; - // Construct the metadata - let custody_subnet_count = if ctx.chain_spec.is_peer_das_scheduled() { - if config.subscribe_all_data_column_subnets { - Some(ctx.chain_spec.data_column_sidecar_subnet_count) - } else { - Some(ctx.chain_spec.custody_requirement) - } + // Create an ENR or load from disk if appropriate + let enr = crate::discovery::enr::build_or_load_enr::( + local_keypair.clone(), + &config, + &ctx.enr_fork_id, + &log, + &ctx.chain_spec, + )?; + + // Construct the metadata + let custody_subnet_count = ctx.chain_spec.is_peer_das_scheduled().then(|| { + if config.subscribe_all_data_column_subnets { + ctx.chain_spec.data_column_sidecar_subnet_count } else { - None - }; - let meta_data = - utils::load_or_build_metadata(&config.network_dir, custody_subnet_count, &log); - let globals = NetworkGlobals::new( - enr, - meta_data, - trusted_peers, - config.disable_peer_scoring, - &log, - config.clone(), - ctx.chain_spec.clone(), - ); - Arc::new(globals) - }; + ctx.chain_spec.custody_requirement + } + }); + let meta_data = + utils::load_or_build_metadata(&config.network_dir, custody_subnet_count, &log); + let seq_number = *meta_data.seq_number(); + let globals = NetworkGlobals::new( + enr, + meta_data, + trusted_peers, + config.disable_peer_scoring, + &log, + config.clone(), + ctx.chain_spec.clone(), + ); + let network_globals = Arc::new(globals); // Grab our local ENR FORK ID let enr_fork_id = network_globals @@ -338,6 +336,7 @@ impl Network { config.outbound_rate_limiter_config.clone(), log.clone(), network_params, + seq_number, ); let discovery = { @@ -1104,33 +1103,26 @@ impl Network { .sync_committee_bitfield::() .expect("Local discovery must have sync committee bitfield"); - { - // write lock scope - let mut meta_data = self.network_globals.local_metadata.write(); + // write lock scope + let mut meta_data_w = self.network_globals.local_metadata.write(); - *meta_data.seq_number_mut() += 1; - *meta_data.attnets_mut() = local_attnets; - if let Ok(syncnets) = meta_data.syncnets_mut() { - *syncnets = local_syncnets; - } + *meta_data_w.seq_number_mut() += 1; + *meta_data_w.attnets_mut() = local_attnets; + if let Ok(syncnets) = meta_data_w.syncnets_mut() { + *syncnets = local_syncnets; } + let seq_number = *meta_data_w.seq_number(); + let meta_data = meta_data_w.clone(); + + drop(meta_data_w); + self.eth2_rpc_mut().update_seq_number(seq_number); // Save the updated metadata to disk - utils::save_metadata_to_disk( - &self.network_dir, - self.network_globals.local_metadata.read().clone(), - &self.log, - ); + utils::save_metadata_to_disk(&self.network_dir, meta_data, &self.log); } /// Sends a Ping request to the peer. fn ping(&mut self, peer_id: PeerId) { - let ping = crate::rpc::Ping { - data: *self.network_globals.local_metadata.read().seq_number(), - }; - trace!(self.log, "Sending Ping"; "peer_id" => %peer_id); - let id = RequestId::Internal; - self.eth2_rpc_mut() - .send_request(peer_id, id, OutboundRequest::Ping(ping)); + self.eth2_rpc_mut().ping(peer_id, RequestId::Internal); } /// Sends a METADATA request to a peer. @@ -1400,8 +1392,12 @@ impl Network { fn inject_rpc_event(&mut self, event: RPCMessage) -> Option> { let peer_id = event.peer_id; - // Do not permit Inbound events from peers that are being disconnected, or RPC requests. - if !self.peer_manager().is_connected(&peer_id) { + // Do not permit Inbound events from peers that are being disconnected or RPC requests, + // but allow `RpcFailed` and `HandlerErr::Outbound` to be bubble up to sync for state management. + if !self.peer_manager().is_connected(&peer_id) + && (matches!(event.message, Err(HandlerErr::Inbound { .. })) + || matches!(event.message, Ok(RPCReceived::Request(..)))) + { debug!( self.log, "Ignoring rpc message of disconnecting peer";