Skip to content

Commit

Permalink
WIP request size limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
ackintosh committed Jun 18, 2024
1 parent c0ae632 commit 29e3f00
Show file tree
Hide file tree
Showing 3 changed files with 277 additions and 45 deletions.
136 changes: 91 additions & 45 deletions beacon_node/lighthouse_network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub use methods::{
};
pub(crate) use outbound::OutboundRequest;
pub use protocol::{max_rpc_size, Protocol, RPCError};
use crate::rpc::rate_limiter::RequestSizeLimiter;

use self::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig};
use self::protocol::RPCProtocol;
Expand Down Expand Up @@ -122,6 +123,8 @@ pub struct RPC<Id: ReqId, E: EthSpec> {
limiter: Option<RateLimiter>,
/// Rate limiter for our responses. This is shared with RPCHandlers.
response_limiter: Option<Arc<Mutex<RateLimiter>>>,
///
request_size_limiter: Option<RequestSizeLimiter>,
/// Rate limiter for our own requests.
self_limiter: Option<SelfRateLimiter<Id, E>>,
/// Queue of events to be processed.
Expand Down Expand Up @@ -152,21 +155,27 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
// });
let inbound_limiter = None; // TODO

let response_limiter = inbound_rate_limiter_config.map(|config| {
let response_limiter = inbound_rate_limiter_config.clone().map(|config| {
debug!(log, "Using response rate limiting params"; "config" => ?config);
Arc::new(Mutex::new(
RateLimiter::new_with_config(config.0)
.expect("Inbound limiter configuration parameters are valid"),
))
});

let request_size_limiter = inbound_rate_limiter_config.map(|config| {
RequestSizeLimiter::new_with_config(config.0)
.expect("Inbound limiter configuration parameters are valid")
});

let self_limiter = outbound_rate_limiter_config.map(|config| {
SelfRateLimiter::new(config, log.clone()).expect("Configuration parameters are valid")
});

RPC {
limiter: inbound_limiter,
response_limiter,
request_size_limiter,
self_limiter,
events: Vec::new(),
fork_context,
Expand Down Expand Up @@ -315,58 +324,95 @@ where
) {
match event {
HandlerEvent::Ok(RPCReceived::Request(ref id, ref req)) => {
if let Some(limiter) = self.limiter.as_mut() {
// check if the request is conformant to the quota
match limiter.allows(&peer_id, req) {
Ok(()) => {
// send the event to the user
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
conn_id,
event,
}))
}
Err(RateLimitedErr::TooLarge) => {
// we set the batch sizes, so this is a coding/config err for most protocols
let protocol = req.versioned_protocol().protocol();
if matches!(
if let Some(limiter) = self.request_size_limiter.as_ref() {
println!("req limiter");
// if let Some(limiter) = self.limiter.as_mut() {
// Check if the request is conformant to the quota
if limiter.allows(req) {
println!("req limiter allowed");
// Send the event to the user
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
conn_id,
event,
}))
} else {
println!("req limiter NOT allowed");
// We set the batch sizes, so this is a coding/config err for most protocols
let protocol = req.versioned_protocol().protocol();
if matches!(
protocol,
Protocol::BlocksByRange
| Protocol::BlobsByRange
| Protocol::BlocksByRoot
| Protocol::BlobsByRoot
) {
debug!(self.log, "Request too large to process"; "request" => %req, "protocol" => %protocol);
} else {
// Other protocols shouldn't be sending large messages, we should flag the peer kind
crit!(self.log, "Request size too large to ever be processed"; "protocol" => %protocol);
}
// send an error code to the peer.
// the handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
(conn_id, *id),
RPCCodedResponse::Error(
RPCResponseErrorCode::RateLimited,
"Rate limited. Request too large".into(),
),
);
}
Err(RateLimitedErr::TooSoon(wait_time)) => {
debug!(self.log, "Request exceeds the rate limit";
"request" => %req, "peer_id" => %peer_id, "wait_time_ms" => wait_time.as_millis());
// send an error code to the peer.
// the handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
(conn_id, *id),
RPCCodedResponse::Error(
RPCResponseErrorCode::RateLimited,
format!("Wait {:?}", wait_time).into(),
),
);
debug!(self.log, "Request too large to process"; "request" => %req, "protocol" => %protocol);
} else {
// Other protocols shouldn't be sending large messages, we should flag the peer kind
crit!(self.log, "Request size too large to ever be processed"; "protocol" => %protocol);
}
// Send an error code to the peer.
// The handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
(conn_id, *id),
RPCCodedResponse::Error(
RPCResponseErrorCode::RateLimited,
"Rate limited. Request too large".into(),
),
);
}
// match limiter.allows(&peer_id, req) {
// Ok(()) => {
// // send the event to the user
// self.events.push(ToSwarm::GenerateEvent(RPCMessage {
// peer_id,
// conn_id,
// event,
// }))
// }
// Err(RateLimitedErr::TooLarge) => {
// // we set the batch sizes, so this is a coding/config err for most protocols
// let protocol = req.versioned_protocol().protocol();
// if matches!(
// protocol,
// Protocol::BlocksByRange
// | Protocol::BlobsByRange
// | Protocol::BlocksByRoot
// | Protocol::BlobsByRoot
// ) {
// debug!(self.log, "Request too large to process"; "request" => %req, "protocol" => %protocol);
// } else {
// // Other protocols shouldn't be sending large messages, we should flag the peer kind
// crit!(self.log, "Request size too large to ever be processed"; "protocol" => %protocol);
// }
// // send an error code to the peer.
// // the handler upon receiving the error code will send it back to the behaviour
// self.send_response(
// peer_id,
// (conn_id, *id),
// RPCCodedResponse::Error(
// RPCResponseErrorCode::RateLimited,
// "Rate limited. Request too large".into(),
// ),
// );
// }
// Err(RateLimitedErr::TooSoon(wait_time)) => {
// debug!(self.log, "Request exceeds the rate limit";
// "request" => %req, "peer_id" => %peer_id, "wait_time_ms" => wait_time.as_millis());
// // send an error code to the peer.
// // the handler upon receiving the error code will send it back to the behaviour
// self.send_response(
// peer_id,
// (conn_id, *id),
// RPCCodedResponse::Error(
// RPCResponseErrorCode::RateLimited,
// format!("Wait {:?}", wait_time).into(),
// ),
// );
// }
// }
} else {
// No rate limiting, send the event to the user
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
Expand Down
83 changes: 83 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,89 @@ impl<Key: Hash + Eq + Clone> Limiter<Key> {
}
}

pub(super) struct RequestSizeLimiter {
ping: (Nanosecs, Nanosecs),
status: (Nanosecs, Nanosecs),
meta_data: (Nanosecs, Nanosecs),
goodbye: (Nanosecs, Nanosecs),
blocks_by_range: (Nanosecs, Nanosecs),
blocks_by_root: (Nanosecs, Nanosecs),
}

impl RequestSizeLimiter {
pub fn new_with_config(config: RateLimiterConfig) -> Result<Self, &'static str> {
// Destructure to make sure every configuration value is used.
let RateLimiterConfig {
ping_quota,
meta_data_quota,
status_quota,
goodbye_quota,
blocks_by_range_quota,
blocks_by_root_quota,
blobs_by_range_quota,
blobs_by_root_quota,
light_client_bootstrap_quota,
light_client_optimistic_update_quota,
light_client_finality_update_quota,
} = config;

let tau_and_t = |quota: &Quota| {
let tau = quota.replenish_all_every.as_nanos();
if tau == 0 {
return Err("Replenish time must be positive");
}
let t = (tau / quota.max_tokens as u128).try_into().map_err(|_| "total replenish time is too long")?;
let tau = tau
.try_into()
.map_err(|_| "total replenish time is too long")?;
Ok((tau, t))
};

Ok(Self {
ping: tau_and_t(&ping_quota)?,
meta_data: tau_and_t(&meta_data_quota)?,
status: tau_and_t(&status_quota)?,
goodbye: tau_and_t(&goodbye_quota)?,
blocks_by_range: tau_and_t(&blocks_by_range_quota)?,
blocks_by_root: tau_and_t(&blocks_by_root_quota)?,
})
}

pub fn allows<Item: RateLimiterItem>(&self, request: &Item) -> bool {
let tokens = request.max_responses().max(1);
let (tau, t) = match request.protocol() {
Protocol::Ping => self.ping,
Protocol::Status => self.status,
Protocol::MetaData => self.meta_data,
Protocol::Goodbye => self.goodbye,
Protocol::BlocksByRange => self.blocks_by_range,
Protocol::BlocksByRoot => self.blocks_by_root,
// Protocol::BlobsByRange => &mut self.blbrange_rl,
// Protocol::BlobsByRoot => &mut self.blbroot_rl,
// Protocol::LightClientBootstrap => &mut self.lc_bootstrap_rl,
// Protocol::LightClientOptimisticUpdate => &mut self.lc_optimistic_update_rl,
// Protocol::LightClientFinalityUpdate => &mut self.lc_finality_update_rl,
_ => todo!(),
};

// how long does it take to replenish these tokens
let additional_time = t * tokens;

// DEBUG
println!("{}", request.protocol());
println!("t:{}, tokens:{}, tau:{}", t, tokens, tau);
println!("{}", additional_time);
println!("{}", tau);

if additional_time > tau {
// the time required to process this amount of tokens is longer than the time that
// makes the bucket full. So, this batch can _never_ be processed
return false;
}
true
}
}

#[cfg(test)]
mod tests {
use crate::rpc::rate_limiter::{Limiter, Quota};
Expand Down
103 changes: 103 additions & 0 deletions beacon_node/lighthouse_network/tests/rpc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1297,3 +1297,106 @@ fn test_delayed_rpc_response() {
}
})
}

#[test]
fn test_request_too_large() {
let rt = Arc::new(Runtime::new().unwrap());
let log = logging::test_logger();
let spec = E::default_spec();

rt.block_on(async {
// get sender/receiver
let (mut sender, mut receiver) = common::build_node_pair(
Arc::downgrade(&rt),
&log,
ForkName::Base,
&spec,
Protocol::Tcp,
// Configure quotas for requests.
Some("beacon_blocks_by_range:1/1024;beacon_blocks_by_root:1/128".parse().unwrap()),
)
.await;

let mut rpc_requests = vec![
Request::BlocksByRange(BlocksByRangeRequest::new(0, 2)),
Request::BlocksByRoot(BlocksByRootRequest::new(
vec![
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
],
&spec,
)),
];
// // BlocksByRange Request
// let rpc_request = Request::BlocksByRange(BlocksByRangeRequest::new(0, 2));
// // BlocksByRoot Request
// let rpc_request = Request::BlocksByRoot(BlocksByRootRequest::new(
// vec![
// Hash256::from_low_u64_be(0),
// Hash256::from_low_u64_be(0),
// ],
// &spec,
// ));


// build the sender future
let sender_future = async {
let mut request_id = 1;
loop {
match sender.next_event().await {
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
let request = rpc_requests.pop().unwrap();
debug!(log, "Sending RPC request"; "request_id" => request_id, "request" => ?request);
sender.send_request(peer_id, request_id, request);
}
NetworkEvent::ResponseReceived { id, response, .. } => {
// TODO: 終端を示すレスポンスが返ってくるのでハンドリングする
debug!(log, "ResponseReceived"; "request_id" => id, "response" => ?response);
match response {
Response::BlobsByRoot(None) => {},
_ => unreachable!(),
}
}
NetworkEvent::RPCFailed { id, peer_id, error } => {
debug!(log, "RPC Failed"; "error" => ?error, "request_id" => id);
assert_eq!(id, request_id);
let v = matches!(error, RPCError::ErrorResponse(RPCResponseErrorCode::RateLimited, .. ));
assert!(v);
if let Some(request) = rpc_requests.pop() {
request_id += 1;
debug!(log, "Sending RPC request"; "request_id" => request_id, "request" => ?request);
sender.send_request(peer_id, request_id, request);
} else {
return
}
}
_ => {}
}
}
};

// build the receiver future
let receiver_future = async {
loop {
match receiver.next_event().await {
NetworkEvent::RequestReceived {
peer_id,
id,
request,
} => {
unreachable!();
}
_ => {} // Ignore other events
}
}
};

tokio::select! {
_ = sender_future => {}
_ = receiver_future => {}
_ = sleep(Duration::from_secs(30)) => {
panic!("Future timed out");
}
}
});
}

0 comments on commit 29e3f00

Please sign in to comment.