From 7e0c630e1e21c4cdbf86ed94c696d023c47fad35 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Fri, 14 Jun 2024 22:30:08 +0900 Subject: [PATCH] Add inbound request size limiter --- beacon_node/lighthouse_network/src/rpc/mod.rs | 98 +++++++------- .../src/rpc/rate_limiter.rs | 88 ++++++++++++ .../lighthouse_network/tests/common.rs | 19 ++- .../lighthouse_network/tests/rpc_tests.rs | 127 ++++++++++++++++++ 4 files changed, 278 insertions(+), 54 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index 6e2c256dc30..1dded86f9fa 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -13,7 +13,7 @@ use libp2p::swarm::{ use libp2p::swarm::{FromSwarm, SubstreamProtocol, THandlerInEvent}; use libp2p::PeerId; use parking_lot::Mutex; -use rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr}; +use rate_limiter::RPCRateLimiter as RateLimiter; use slog::{crit, debug, o}; use std::marker::PhantomData; use std::sync::Arc; @@ -25,6 +25,7 @@ pub(crate) use handler::{HandlerErr, HandlerEvent}; pub(crate) use methods::{MetaData, MetaDataV1, MetaDataV2, Ping, RPCCodedResponse, RPCResponse}; pub(crate) use protocol::InboundRequest; +use crate::rpc::rate_limiter::InboundRequestSizeLimiter; pub use handler::SubstreamId; pub use methods::{ BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, LightClientBootstrapRequest, @@ -124,6 +125,8 @@ pub struct RPC { response_limiter: Option>>, /// Rate limiter for our own requests. self_limiter: Option>, + /// Limiter for our inbound requests, which checks the request size. + inbound_request_size_limiter: Option, /// Queue of events to be processed. events: Vec>, fork_context: Arc, @@ -152,7 +155,7 @@ impl RPC { // }); let inbound_limiter = None; // TODO - let response_limiter = inbound_rate_limiter_config.map(|config| { + let response_limiter = inbound_rate_limiter_config.clone().map(|config| { debug!(log, "Using response rate limiting params"; "config" => ?config); Arc::new(Mutex::new( RateLimiter::new_with_config(config.0) @@ -160,6 +163,11 @@ impl RPC { )) }); + let inbound_request_size_limiter = inbound_rate_limiter_config.map(|config| { + InboundRequestSizeLimiter::new_with_config(config.0) + .expect("Inbound limiter configuration parameters are valid") + }); + let self_limiter = outbound_rate_limiter_config.map(|config| { SelfRateLimiter::new(config, log.clone()).expect("Configuration parameters are valid") }); @@ -168,6 +176,7 @@ impl RPC { limiter: inbound_limiter, response_limiter, self_limiter, + inbound_request_size_limiter, events: Vec::new(), fork_context, enable_light_client_server, @@ -315,57 +324,42 @@ where ) { match event { HandlerEvent::Ok(RPCReceived::Request(ref id, ref req)) => { - if let Some(limiter) = self.limiter.as_mut() { - // check if the request is conformant to the quota - match limiter.allows(&peer_id, req) { - Ok(()) => { - // send the event to the user - self.events.push(ToSwarm::GenerateEvent(RPCMessage { - peer_id, - conn_id, - event, - })) - } - Err(RateLimitedErr::TooLarge) => { - // we set the batch sizes, so this is a coding/config err for most protocols - let protocol = req.versioned_protocol().protocol(); - if matches!( - protocol, - Protocol::BlocksByRange - | Protocol::BlobsByRange - | Protocol::BlocksByRoot - | Protocol::BlobsByRoot - ) { - debug!(self.log, "Request too large to process"; "request" => %req, "protocol" => %protocol); - } else { - // Other protocols shouldn't be sending large messages, we should flag the peer kind - crit!(self.log, "Request size too large to ever be processed"; "protocol" => %protocol); - } - // send an error code to the peer. - // the handler upon receiving the error code will send it back to the behaviour - self.send_response( - peer_id, - (conn_id, *id), - RPCCodedResponse::Error( - RPCResponseErrorCode::RateLimited, - "Rate limited. Request too large".into(), - ), - ); - } - Err(RateLimitedErr::TooSoon(wait_time)) => { - debug!(self.log, "Request exceeds the rate limit"; - "request" => %req, "peer_id" => %peer_id, "wait_time_ms" => wait_time.as_millis()); - // send an error code to the peer. - // the handler upon receiving the error code will send it back to the behaviour - self.send_response( - peer_id, - (conn_id, *id), - RPCCodedResponse::Error( - RPCResponseErrorCode::RateLimited, - format!("Wait {:?}", wait_time).into(), - ), - ); + // TODO: Send error response if there is ongoing request with the same protocol. + + if let Some(limiter) = self.inbound_request_size_limiter.as_ref() { + // Check if the request is conformant to the quota + if limiter.allows(req) { + // Send the event to the user + self.events.push(ToSwarm::GenerateEvent(RPCMessage { + peer_id, + conn_id, + event, + })) + } else { + // We set the batch sizes, so this is a coding/config err for most protocols + let protocol = req.versioned_protocol().protocol(); + if matches!( + protocol, + Protocol::BlocksByRange + | Protocol::BlobsByRange + | Protocol::BlocksByRoot + | Protocol::BlobsByRoot + ) { + debug!(self.log, "Request too large to process"; "request" => %req, "protocol" => %protocol); + } else { + // Other protocols shouldn't be sending large messages, we should flag the peer kind + crit!(self.log, "Request size too large to ever be processed"; "protocol" => %protocol); } + // Send an error code to the peer. + // The handler upon receiving the error code will send it back to the behaviour + self.send_response( + peer_id, + (conn_id, *id), + RPCCodedResponse::Error( + RPCResponseErrorCode::RateLimited, + "Rate limited. Request too large".into(), + ), + ); } } else { // No rate limiting, send the event to the user diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index 10e62550659..e11c190ca37 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -434,6 +434,94 @@ impl Limiter { } } +pub(super) struct InboundRequestSizeLimiter { + ping: (Nanosecs, Nanosecs), + status: (Nanosecs, Nanosecs), + meta_data: (Nanosecs, Nanosecs), + goodbye: (Nanosecs, Nanosecs), + blocks_by_range: (Nanosecs, Nanosecs), + blocks_by_root: (Nanosecs, Nanosecs), + blobs_by_range: (Nanosecs, Nanosecs), + blobs_by_root: (Nanosecs, Nanosecs), + light_client_bootstrap: (Nanosecs, Nanosecs), + light_client_optimistic_update: (Nanosecs, Nanosecs), + light_client_finality_update: (Nanosecs, Nanosecs), +} + +impl InboundRequestSizeLimiter { + pub fn new_with_config(config: RateLimiterConfig) -> Result { + // Destructure to make sure every configuration value is used. + let RateLimiterConfig { + ping_quota, + meta_data_quota, + status_quota, + goodbye_quota, + blocks_by_range_quota, + blocks_by_root_quota, + blobs_by_range_quota, + blobs_by_root_quota, + light_client_bootstrap_quota, + light_client_optimistic_update_quota, + light_client_finality_update_quota, + } = config; + + let tau_and_t = |quota: &Quota| { + let tau = quota.replenish_all_every.as_nanos(); + if tau == 0 { + return Err("Replenish time must be positive"); + } + let t = (tau / quota.max_tokens as u128) + .try_into() + .map_err(|_| "total replenish time is too long")?; + let tau = tau + .try_into() + .map_err(|_| "total replenish time is too long")?; + Ok((tau, t)) + }; + + Ok(Self { + ping: tau_and_t(&ping_quota)?, + meta_data: tau_and_t(&meta_data_quota)?, + status: tau_and_t(&status_quota)?, + goodbye: tau_and_t(&goodbye_quota)?, + blocks_by_range: tau_and_t(&blocks_by_range_quota)?, + blocks_by_root: tau_and_t(&blocks_by_root_quota)?, + blobs_by_range: tau_and_t(&blobs_by_range_quota)?, + blobs_by_root: tau_and_t(&blobs_by_root_quota)?, + light_client_bootstrap: tau_and_t(&light_client_bootstrap_quota)?, + light_client_optimistic_update: tau_and_t(&light_client_optimistic_update_quota)?, + light_client_finality_update: tau_and_t(&light_client_finality_update_quota)?, + }) + } + + pub fn allows(&self, request: &Item) -> bool { + let tokens = request.max_responses().max(1); + let (tau, t) = match request.protocol() { + Protocol::Ping => self.ping, + Protocol::Status => self.status, + Protocol::MetaData => self.meta_data, + Protocol::Goodbye => self.goodbye, + Protocol::BlocksByRange => self.blocks_by_range, + Protocol::BlocksByRoot => self.blocks_by_root, + Protocol::BlobsByRange => self.blobs_by_range, + Protocol::BlobsByRoot => self.blobs_by_root, + Protocol::LightClientBootstrap => self.light_client_bootstrap, + Protocol::LightClientOptimisticUpdate => self.light_client_optimistic_update, + Protocol::LightClientFinalityUpdate => self.light_client_finality_update, + }; + + // how long does it take to replenish these tokens + let additional_time = t * tokens; + + if additional_time > tau { + // the time required to process this amount of tokens is longer than the time that + // makes the bucket full. So, this batch can _never_ be processed + return false; + } + true + } +} + #[cfg(test)] mod tests { use crate::rpc::rate_limiter::{Limiter, Quota}; diff --git a/beacon_node/lighthouse_network/tests/common.rs b/beacon_node/lighthouse_network/tests/common.rs index 3daf9971ccd..f2d18771399 100644 --- a/beacon_node/lighthouse_network/tests/common.rs +++ b/beacon_node/lighthouse_network/tests/common.rs @@ -79,6 +79,7 @@ pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger { pub fn build_config( mut boot_nodes: Vec, + disable_peer_scoring: bool, inbound_rate_limiter: Option, ) -> NetworkConfig { let mut config = NetworkConfig::default(); @@ -96,6 +97,7 @@ pub fn build_config( config.enr_address = (Some(std::net::Ipv4Addr::LOCALHOST), None); config.boot_nodes_enr.append(&mut boot_nodes); config.network_dir = path.into_path(); + config.disable_peer_scoring = disable_peer_scoring; config.inbound_rate_limiter_config = inbound_rate_limiter; config } @@ -106,9 +108,10 @@ pub async fn build_libp2p_instance( log: slog::Logger, fork_name: ForkName, spec: &ChainSpec, + disable_peer_scoring: bool, inbound_rate_limiter: Option, ) -> Libp2pInstance { - let config = build_config(boot_nodes, inbound_rate_limiter); + let config = build_config(boot_nodes, disable_peer_scoring, inbound_rate_limiter); // launch libp2p service let (signal, exit) = async_channel::bounded(1); @@ -150,6 +153,7 @@ pub async fn build_node_pair( fork_name: ForkName, spec: &ChainSpec, protocol: Protocol, + disable_peer_scoring: bool, inbound_rate_limiter: Option, ) -> (Libp2pInstance, Libp2pInstance) { let sender_log = log.new(o!("who" => "sender")); @@ -161,6 +165,7 @@ pub async fn build_node_pair( sender_log, fork_name, spec, + disable_peer_scoring, inbound_rate_limiter.clone(), ) .await; @@ -170,6 +175,7 @@ pub async fn build_node_pair( receiver_log, fork_name, spec, + disable_peer_scoring, inbound_rate_limiter, ) .await; @@ -246,7 +252,16 @@ pub async fn build_linear( let mut nodes = Vec::with_capacity(n); for _ in 0..n { nodes.push( - build_libp2p_instance(rt.clone(), vec![], log.clone(), fork_name, spec, None).await, + build_libp2p_instance( + rt.clone(), + vec![], + log.clone(), + fork_name, + spec, + false, + None, + ) + .await, ); } diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index a35527bc1fa..7135228f0b2 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -12,6 +12,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::runtime::Runtime; use tokio::time::sleep; +use types::blob_sidecar::BlobIdentifier; use types::{ BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockBellatrix, BlobSidecar, ChainSpec, EmptyBlock, Epoch, EthSpec, ForkContext, ForkName, Hash256, MinimalEthSpec, Signature, @@ -70,6 +71,7 @@ fn test_tcp_status_rpc() { ForkName::Base, &spec, Protocol::Tcp, + false, None, ) .await; @@ -171,6 +173,7 @@ fn test_tcp_blocks_by_range_chunked_rpc() { ForkName::Bellatrix, &spec, Protocol::Tcp, + false, None, ) .await; @@ -305,6 +308,7 @@ fn test_blobs_by_range_chunked_rpc() { ForkName::Deneb, &spec, Protocol::Tcp, + false, None, ) .await; @@ -418,6 +422,7 @@ fn test_tcp_blocks_by_range_over_limit() { ForkName::Bellatrix, &spec, Protocol::Tcp, + false, None, ) .await; @@ -510,6 +515,7 @@ fn test_tcp_blocks_by_range_chunked_rpc_terminates_correctly() { ForkName::Base, &spec, Protocol::Tcp, + false, None, ) .await; @@ -639,6 +645,7 @@ fn test_tcp_blocks_by_range_single_empty_rpc() { ForkName::Base, &spec, Protocol::Tcp, + false, None, ) .await; @@ -747,6 +754,7 @@ fn test_tcp_blocks_by_root_chunked_rpc() { ForkName::Bellatrix, &spec, Protocol::Tcp, + false, None, ) .await; @@ -884,6 +892,7 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() { ForkName::Base, &spec, Protocol::Tcp, + false, None, ) .await; @@ -1022,6 +1031,7 @@ fn test_disconnect_triggers_rpc_error() { ForkName::Base, &spec, Protocol::Tcp, + false, None, ) .await; @@ -1112,6 +1122,7 @@ fn goodbye_test(log_level: Level, enable_logging: bool, protocol: Protocol) { ForkName::Base, &spec, protocol, + false, None, ) .await; @@ -1193,6 +1204,7 @@ fn test_delayed_rpc_response() { ForkName::Base, &spec, Protocol::Tcp, + false, // Configure a quota for STATUS responses of 1 token every 3 seconds. Some("status:1/3".parse().unwrap()), ) @@ -1297,3 +1309,118 @@ fn test_delayed_rpc_response() { } }) } + +// Test that the receiver sends an RPC error when the request is too large. +#[test] +fn test_request_too_large() { + let rt = Arc::new(Runtime::new().unwrap()); + let log = logging::test_logger(); + let spec = E::default_spec(); + + rt.block_on(async { + // Configure quotas for requests. + let quotas = vec![ + "beacon_blocks_by_range:1/1024", + "beacon_blocks_by_root:1/128", + "blob_sidecars_by_range:1/768", + "blob_sidecars_by_root:1/128", + ]; + + let (mut sender, mut receiver) = common::build_node_pair( + Arc::downgrade(&rt), + &log, + ForkName::Base, + &spec, + Protocol::Tcp, + // In this test, many RPC errors occur (which are expected). Disabling peer scoring to + // avoid banning a peer and to ensure we can test that the receiver sends RPC errors to + // the sender. + true, + Some(quotas.join(";").parse().unwrap()), + ) + .await; + + // RPC requests that triggers RPC error (request too large) on the receiver side. + let mut rpc_requests = vec![ + Request::BlocksByRange(BlocksByRangeRequest::new(0, 2)), + Request::BlocksByRoot(BlocksByRootRequest::new( + vec![ + Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), + ], + &spec, + )), + Request::BlobsByRange(BlobsByRangeRequest { + start_slot: 0, + count: 32, + }), + Request::BlobsByRoot(BlobsByRootRequest::new( + vec![ + BlobIdentifier { block_root: Hash256::zero(), index: 0 }, + BlobIdentifier { block_root: Hash256::zero(), index: 1 }, + ], + &spec, + )), + ]; + let requests_to_be_failed = rpc_requests.len(); + let mut failed_request_ids = vec![]; + + // Build the sender future + let sender_future = async { + let mut request_id = 1; + loop { + match sender.next_event().await { + NetworkEvent::PeerConnectedOutgoing(peer_id) => { + let request = rpc_requests.pop().unwrap(); + debug!(log, "Sending RPC request"; "request_id" => request_id, "request" => ?request); + sender.send_request(peer_id, request_id, request); + } + NetworkEvent::ResponseReceived { id, response, .. } => { + debug!(log, "Received response"; "request_id" => id, "response" => ?response); + // Handle the response termination. + match response { + Response::BlocksByRange(None) | Response::BlocksByRoot(None) | Response::BlobsByRange(None) | Response::BlobsByRoot(None) => {}, + _ => unreachable!(), + } + } + NetworkEvent::RPCFailed { id, peer_id, error } => { + debug!(log, "RPC Failed"; "error" => ?error, "request_id" => id); + assert_eq!(id, request_id); + assert!(matches!(error, RPCError::ErrorResponse(RPCResponseErrorCode::RateLimited, .. ))); + + failed_request_ids.push(id); + if let Some(request) = rpc_requests.pop() { + request_id += 1; + debug!(log, "Sending RPC request"; "request_id" => request_id, "request" => ?request); + sender.send_request(peer_id, request_id, request); + } else { + assert_eq!(failed_request_ids.len(), requests_to_be_failed); + return + } + } + _ => {} + } + } + }; + + // Build the receiver future + let receiver_future = async { + loop { + match receiver.next_event().await { + NetworkEvent::RequestReceived { .. } => { + unreachable!(); + } + _ => {} // Ignore other events + } + } + }; + + tokio::select! { + _ = sender_future => {} + _ = receiver_future => {} + _ = sleep(Duration::from_secs(30)) => { + panic!("Future timed out"); + } + } + }); +}