Skip to content

Commit

Permalink
Add inbound request size limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
ackintosh committed Jun 19, 2024
1 parent c0ae632 commit 7e0c630
Show file tree
Hide file tree
Showing 4 changed files with 278 additions and 54 deletions.
98 changes: 46 additions & 52 deletions beacon_node/lighthouse_network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use libp2p::swarm::{
use libp2p::swarm::{FromSwarm, SubstreamProtocol, THandlerInEvent};
use libp2p::PeerId;
use parking_lot::Mutex;
use rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr};
use rate_limiter::RPCRateLimiter as RateLimiter;
use slog::{crit, debug, o};
use std::marker::PhantomData;
use std::sync::Arc;
Expand All @@ -25,6 +25,7 @@ pub(crate) use handler::{HandlerErr, HandlerEvent};
pub(crate) use methods::{MetaData, MetaDataV1, MetaDataV2, Ping, RPCCodedResponse, RPCResponse};
pub(crate) use protocol::InboundRequest;

use crate::rpc::rate_limiter::InboundRequestSizeLimiter;
pub use handler::SubstreamId;
pub use methods::{
BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, LightClientBootstrapRequest,
Expand Down Expand Up @@ -124,6 +125,8 @@ pub struct RPC<Id: ReqId, E: EthSpec> {
response_limiter: Option<Arc<Mutex<RateLimiter>>>,
/// Rate limiter for our own requests.
self_limiter: Option<SelfRateLimiter<Id, E>>,
/// Limiter for our inbound requests, which checks the request size.
inbound_request_size_limiter: Option<InboundRequestSizeLimiter>,
/// Queue of events to be processed.
events: Vec<BehaviourAction<Id, E>>,
fork_context: Arc<ForkContext>,
Expand Down Expand Up @@ -152,14 +155,19 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
// });
let inbound_limiter = None; // TODO

let response_limiter = inbound_rate_limiter_config.map(|config| {
let response_limiter = inbound_rate_limiter_config.clone().map(|config| {
debug!(log, "Using response rate limiting params"; "config" => ?config);
Arc::new(Mutex::new(
RateLimiter::new_with_config(config.0)
.expect("Inbound limiter configuration parameters are valid"),
))
});

let inbound_request_size_limiter = inbound_rate_limiter_config.map(|config| {
InboundRequestSizeLimiter::new_with_config(config.0)
.expect("Inbound limiter configuration parameters are valid")
});

let self_limiter = outbound_rate_limiter_config.map(|config| {
SelfRateLimiter::new(config, log.clone()).expect("Configuration parameters are valid")
});
Expand All @@ -168,6 +176,7 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
limiter: inbound_limiter,
response_limiter,
self_limiter,
inbound_request_size_limiter,
events: Vec::new(),
fork_context,
enable_light_client_server,
Expand Down Expand Up @@ -315,57 +324,42 @@ where
) {
match event {
HandlerEvent::Ok(RPCReceived::Request(ref id, ref req)) => {
if let Some(limiter) = self.limiter.as_mut() {
// check if the request is conformant to the quota
match limiter.allows(&peer_id, req) {
Ok(()) => {
// send the event to the user
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
conn_id,
event,
}))
}
Err(RateLimitedErr::TooLarge) => {
// we set the batch sizes, so this is a coding/config err for most protocols
let protocol = req.versioned_protocol().protocol();
if matches!(
protocol,
Protocol::BlocksByRange
| Protocol::BlobsByRange
| Protocol::BlocksByRoot
| Protocol::BlobsByRoot
) {
debug!(self.log, "Request too large to process"; "request" => %req, "protocol" => %protocol);
} else {
// Other protocols shouldn't be sending large messages, we should flag the peer kind
crit!(self.log, "Request size too large to ever be processed"; "protocol" => %protocol);
}
// send an error code to the peer.
// the handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
(conn_id, *id),
RPCCodedResponse::Error(
RPCResponseErrorCode::RateLimited,
"Rate limited. Request too large".into(),
),
);
}
Err(RateLimitedErr::TooSoon(wait_time)) => {
debug!(self.log, "Request exceeds the rate limit";
"request" => %req, "peer_id" => %peer_id, "wait_time_ms" => wait_time.as_millis());
// send an error code to the peer.
// the handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
(conn_id, *id),
RPCCodedResponse::Error(
RPCResponseErrorCode::RateLimited,
format!("Wait {:?}", wait_time).into(),
),
);
// TODO: Send error response if there is ongoing request with the same protocol.

if let Some(limiter) = self.inbound_request_size_limiter.as_ref() {
// Check if the request is conformant to the quota
if limiter.allows(req) {
// Send the event to the user
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
conn_id,
event,
}))
} else {
// We set the batch sizes, so this is a coding/config err for most protocols
let protocol = req.versioned_protocol().protocol();
if matches!(
protocol,
Protocol::BlocksByRange
| Protocol::BlobsByRange
| Protocol::BlocksByRoot
| Protocol::BlobsByRoot
) {
debug!(self.log, "Request too large to process"; "request" => %req, "protocol" => %protocol);
} else {
// Other protocols shouldn't be sending large messages, we should flag the peer kind
crit!(self.log, "Request size too large to ever be processed"; "protocol" => %protocol);
}
// Send an error code to the peer.
// The handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
(conn_id, *id),
RPCCodedResponse::Error(
RPCResponseErrorCode::RateLimited,
"Rate limited. Request too large".into(),
),
);
}
} else {
// No rate limiting, send the event to the user
Expand Down
88 changes: 88 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,94 @@ impl<Key: Hash + Eq + Clone> Limiter<Key> {
}
}

pub(super) struct InboundRequestSizeLimiter {
ping: (Nanosecs, Nanosecs),
status: (Nanosecs, Nanosecs),
meta_data: (Nanosecs, Nanosecs),
goodbye: (Nanosecs, Nanosecs),
blocks_by_range: (Nanosecs, Nanosecs),
blocks_by_root: (Nanosecs, Nanosecs),
blobs_by_range: (Nanosecs, Nanosecs),
blobs_by_root: (Nanosecs, Nanosecs),
light_client_bootstrap: (Nanosecs, Nanosecs),
light_client_optimistic_update: (Nanosecs, Nanosecs),
light_client_finality_update: (Nanosecs, Nanosecs),
}

impl InboundRequestSizeLimiter {
pub fn new_with_config(config: RateLimiterConfig) -> Result<Self, &'static str> {
// Destructure to make sure every configuration value is used.
let RateLimiterConfig {
ping_quota,
meta_data_quota,
status_quota,
goodbye_quota,
blocks_by_range_quota,
blocks_by_root_quota,
blobs_by_range_quota,
blobs_by_root_quota,
light_client_bootstrap_quota,
light_client_optimistic_update_quota,
light_client_finality_update_quota,
} = config;

let tau_and_t = |quota: &Quota| {
let tau = quota.replenish_all_every.as_nanos();
if tau == 0 {
return Err("Replenish time must be positive");
}
let t = (tau / quota.max_tokens as u128)
.try_into()
.map_err(|_| "total replenish time is too long")?;
let tau = tau
.try_into()
.map_err(|_| "total replenish time is too long")?;
Ok((tau, t))
};

Ok(Self {
ping: tau_and_t(&ping_quota)?,
meta_data: tau_and_t(&meta_data_quota)?,
status: tau_and_t(&status_quota)?,
goodbye: tau_and_t(&goodbye_quota)?,
blocks_by_range: tau_and_t(&blocks_by_range_quota)?,
blocks_by_root: tau_and_t(&blocks_by_root_quota)?,
blobs_by_range: tau_and_t(&blobs_by_range_quota)?,
blobs_by_root: tau_and_t(&blobs_by_root_quota)?,
light_client_bootstrap: tau_and_t(&light_client_bootstrap_quota)?,
light_client_optimistic_update: tau_and_t(&light_client_optimistic_update_quota)?,
light_client_finality_update: tau_and_t(&light_client_finality_update_quota)?,
})
}

pub fn allows<Item: RateLimiterItem>(&self, request: &Item) -> bool {
let tokens = request.max_responses().max(1);
let (tau, t) = match request.protocol() {
Protocol::Ping => self.ping,
Protocol::Status => self.status,
Protocol::MetaData => self.meta_data,
Protocol::Goodbye => self.goodbye,
Protocol::BlocksByRange => self.blocks_by_range,
Protocol::BlocksByRoot => self.blocks_by_root,
Protocol::BlobsByRange => self.blobs_by_range,
Protocol::BlobsByRoot => self.blobs_by_root,
Protocol::LightClientBootstrap => self.light_client_bootstrap,
Protocol::LightClientOptimisticUpdate => self.light_client_optimistic_update,
Protocol::LightClientFinalityUpdate => self.light_client_finality_update,
};

// how long does it take to replenish these tokens
let additional_time = t * tokens;

if additional_time > tau {
// the time required to process this amount of tokens is longer than the time that
// makes the bucket full. So, this batch can _never_ be processed
return false;
}
true
}
}

#[cfg(test)]
mod tests {
use crate::rpc::rate_limiter::{Limiter, Quota};
Expand Down
19 changes: 17 additions & 2 deletions beacon_node/lighthouse_network/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger {

pub fn build_config(
mut boot_nodes: Vec<Enr>,
disable_peer_scoring: bool,
inbound_rate_limiter: Option<InboundRateLimiterConfig>,
) -> NetworkConfig {
let mut config = NetworkConfig::default();
Expand All @@ -96,6 +97,7 @@ pub fn build_config(
config.enr_address = (Some(std::net::Ipv4Addr::LOCALHOST), None);
config.boot_nodes_enr.append(&mut boot_nodes);
config.network_dir = path.into_path();
config.disable_peer_scoring = disable_peer_scoring;
config.inbound_rate_limiter_config = inbound_rate_limiter;
config
}
Expand All @@ -106,9 +108,10 @@ pub async fn build_libp2p_instance(
log: slog::Logger,
fork_name: ForkName,
spec: &ChainSpec,
disable_peer_scoring: bool,
inbound_rate_limiter: Option<InboundRateLimiterConfig>,
) -> Libp2pInstance {
let config = build_config(boot_nodes, inbound_rate_limiter);
let config = build_config(boot_nodes, disable_peer_scoring, inbound_rate_limiter);
// launch libp2p service

let (signal, exit) = async_channel::bounded(1);
Expand Down Expand Up @@ -150,6 +153,7 @@ pub async fn build_node_pair(
fork_name: ForkName,
spec: &ChainSpec,
protocol: Protocol,
disable_peer_scoring: bool,
inbound_rate_limiter: Option<InboundRateLimiterConfig>,
) -> (Libp2pInstance, Libp2pInstance) {
let sender_log = log.new(o!("who" => "sender"));
Expand All @@ -161,6 +165,7 @@ pub async fn build_node_pair(
sender_log,
fork_name,
spec,
disable_peer_scoring,
inbound_rate_limiter.clone(),
)
.await;
Expand All @@ -170,6 +175,7 @@ pub async fn build_node_pair(
receiver_log,
fork_name,
spec,
disable_peer_scoring,
inbound_rate_limiter,
)
.await;
Expand Down Expand Up @@ -246,7 +252,16 @@ pub async fn build_linear(
let mut nodes = Vec::with_capacity(n);
for _ in 0..n {
nodes.push(
build_libp2p_instance(rt.clone(), vec![], log.clone(), fork_name, spec, None).await,
build_libp2p_instance(
rt.clone(),
vec![],
log.clone(),
fork_name,
spec,
false,
None,
)
.await,
);
}

Expand Down
Loading

0 comments on commit 7e0c630

Please sign in to comment.