Skip to content

Commit

Permalink
add a unique integer id to Rpc requests (#6444)
Browse files Browse the repository at this point in the history
* add id to rpc requests

* rename rpc request and response types for more accurate meaning

* remove unrequired build_request function

* remove unirequired Request wrapper types and unify Outbound and Inbound Request

* add RequestId to NetworkMessage::SendResponse

,NetworkMessage::SendErrorResponse to be passed to Rpc::send_response
  • Loading branch information
jxs authored Oct 1, 2024
1 parent 5d1ff7c commit 82098e1
Show file tree
Hide file tree
Showing 20 changed files with 1,324 additions and 1,043 deletions.
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,6 @@ pub use peer_manager::{
ConnectionDirection, PeerConnectionStatus, PeerInfo, PeerManager, SyncInfo, SyncStatus,
};
// pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_FILENAME};
pub use service::api_types::{PeerRequestId, Request, Response};
pub use service::api_types::{PeerRequestId, Response};
pub use service::utils::*;
pub use service::{Gossipsub, NetworkEvent};
14 changes: 7 additions & 7 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use crate::discovery::enr_ext::EnrExt;
use crate::discovery::peer_id_to_node_id;
use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode};
use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RpcErrorResponse};
use crate::service::TARGET_SUBNET_PEERS;
use crate::{error, metrics, Gossipsub, NetworkGlobals, PeerId, Subnet, SubnetDiscovery};
use delay_map::HashSetDelay;
Expand Down Expand Up @@ -526,8 +526,8 @@ impl<E: EthSpec> PeerManager<E> {
PeerAction::HighToleranceError
}
RPCError::ErrorResponse(code, _) => match code {
RPCResponseErrorCode::Unknown => PeerAction::HighToleranceError,
RPCResponseErrorCode::ResourceUnavailable => {
RpcErrorResponse::Unknown => PeerAction::HighToleranceError,
RpcErrorResponse::ResourceUnavailable => {
// Don't ban on this because we want to retry with a block by root request.
if matches!(
protocol,
Expand Down Expand Up @@ -558,9 +558,9 @@ impl<E: EthSpec> PeerManager<E> {
ConnectionDirection::Incoming => return,
}
}
RPCResponseErrorCode::ServerError => PeerAction::MidToleranceError,
RPCResponseErrorCode::InvalidRequest => PeerAction::LowToleranceError,
RPCResponseErrorCode::RateLimited => match protocol {
RpcErrorResponse::ServerError => PeerAction::MidToleranceError,
RpcErrorResponse::InvalidRequest => PeerAction::LowToleranceError,
RpcErrorResponse::RateLimited => match protocol {
Protocol::Ping => PeerAction::MidToleranceError,
Protocol::BlocksByRange => PeerAction::MidToleranceError,
Protocol::BlocksByRoot => PeerAction::MidToleranceError,
Expand All @@ -577,7 +577,7 @@ impl<E: EthSpec> PeerManager<E> {
Protocol::MetaData => PeerAction::LowToleranceError,
Protocol::Status => PeerAction::LowToleranceError,
},
RPCResponseErrorCode::BlobsNotFoundForBlock => PeerAction::LowToleranceError,
RpcErrorResponse::BlobsNotFoundForBlock => PeerAction::LowToleranceError,
},
RPCError::SSZDecodeError(_) => PeerAction::Fatal,
RPCError::UnsupportedProtocol => {
Expand Down
494 changes: 273 additions & 221 deletions beacon_node/lighthouse_network/src/rpc/codec.rs

Large diffs are not rendered by default.

59 changes: 31 additions & 28 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#![allow(clippy::type_complexity)]
#![allow(clippy::cognitive_complexity)]

use super::methods::{GoodbyeReason, RPCCodedResponse, RPCResponseErrorCode};
use super::methods::{GoodbyeReason, RpcErrorResponse, RpcResponse};
use super::outbound::OutboundRequestContainer;
use super::protocol::{InboundOutput, InboundRequest, Protocol, RPCError, RPCProtocol};
use super::{RPCReceived, RPCSend, ReqId};
use crate::rpc::outbound::{OutboundFramed, OutboundRequest};
use super::protocol::{InboundOutput, Protocol, RPCError, RPCProtocol, RequestType};
use super::RequestId;
use super::{RPCReceived, RPCSend, ReqId, Request};
use crate::rpc::outbound::OutboundFramed;
use crate::rpc::protocol::InboundFramed;
use fnv::FnvHashMap;
use futures::prelude::*;
Expand Down Expand Up @@ -95,7 +96,7 @@ where
events_out: SmallVec<[HandlerEvent<Id, E>; 4]>,

/// Queue of outbound substreams to open.
dial_queue: SmallVec<[(Id, OutboundRequest<E>); 4]>,
dial_queue: SmallVec<[(Id, RequestType<E>); 4]>,

/// Current number of concurrent outbound substreams being opened.
dial_negotiated: u32,
Expand Down Expand Up @@ -159,7 +160,7 @@ struct InboundInfo<E: EthSpec> {
/// State of the substream.
state: InboundState<E>,
/// Responses queued for sending.
pending_items: VecDeque<RPCCodedResponse<E>>,
pending_items: VecDeque<RpcResponse<E>>,
/// Protocol of the original request we received from the peer.
protocol: Protocol,
/// Responses that the peer is still expecting from us.
Expand Down Expand Up @@ -205,7 +206,7 @@ pub enum OutboundSubstreamState<E: EthSpec> {
/// The framed negotiated substream.
substream: Box<OutboundFramed<Stream, E>>,
/// Keeps track of the actual request sent.
request: OutboundRequest<E>,
request: RequestType<E>,
},
/// Closing an outbound substream>
Closing(Box<OutboundFramed<Stream, E>>),
Expand Down Expand Up @@ -263,7 +264,7 @@ where

// Queue our goodbye message.
if let Some((id, reason)) = goodbye_reason {
self.dial_queue.push((id, OutboundRequest::Goodbye(reason)));
self.dial_queue.push((id, RequestType::Goodbye(reason)));
}

self.state = HandlerState::ShuttingDown(Box::pin(sleep(Duration::from_secs(
Expand All @@ -273,7 +274,7 @@ where
}

/// Opens an outbound substream with a request.
fn send_request(&mut self, id: Id, req: OutboundRequest<E>) {
fn send_request(&mut self, id: Id, req: RequestType<E>) {
match self.state {
HandlerState::Active => {
self.dial_queue.push((id, req));
Expand All @@ -291,18 +292,18 @@ where
/// Sends a response to a peer's request.
// NOTE: If the substream has closed due to inactivity, or the substream is in the
// wrong state a response will fail silently.
fn send_response(&mut self, inbound_id: SubstreamId, response: RPCCodedResponse<E>) {
fn send_response(&mut self, inbound_id: SubstreamId, response: RpcResponse<E>) {
// check if the stream matching the response still exists
let Some(inbound_info) = self.inbound_substreams.get_mut(&inbound_id) else {
if !matches!(response, RPCCodedResponse::StreamTermination(..)) {
if !matches!(response, RpcResponse::StreamTermination(..)) {
// the stream is closed after sending the expected number of responses
trace!(self.log, "Inbound stream has expired. Response not sent";
"response" => %response, "id" => inbound_id);
}
return;
};
// If the response we are sending is an error, report back for handling
if let RPCCodedResponse::Error(ref code, ref reason) = response {
if let RpcResponse::Error(ref code, ref reason) = response {
self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
error: RPCError::ErrorResponse(*code, reason.to_string()),
proto: inbound_info.protocol,
Expand All @@ -329,7 +330,7 @@ where
type ToBehaviour = HandlerEvent<Id, E>;
type InboundProtocol = RPCProtocol<E>;
type OutboundProtocol = OutboundRequestContainer<E>;
type OutboundOpenInfo = (Id, OutboundRequest<E>); // Keep track of the id and the request
type OutboundOpenInfo = (Id, RequestType<E>); // Keep track of the id and the request
type InboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
Expand Down Expand Up @@ -403,8 +404,8 @@ where

if info.pending_items.back().map(|l| l.close_after()) == Some(false) {
// if the last chunk does not close the stream, append an error
info.pending_items.push_back(RPCCodedResponse::Error(
RPCResponseErrorCode::ServerError,
info.pending_items.push_back(RpcResponse::Error(
RpcErrorResponse::ServerError,
"Request timed out".into(),
));
}
Expand Down Expand Up @@ -672,13 +673,13 @@ where
let proto = entry.get().proto;

let received = match response {
RPCCodedResponse::StreamTermination(t) => {
RpcResponse::StreamTermination(t) => {
HandlerEvent::Ok(RPCReceived::EndOfStream(id, t))
}
RPCCodedResponse::Success(resp) => {
RpcResponse::Success(resp) => {
HandlerEvent::Ok(RPCReceived::Response(id, resp))
}
RPCCodedResponse::Error(ref code, ref r) => {
RpcResponse::Error(ref code, ref r) => {
HandlerEvent::Err(HandlerErr::Outbound {
id,
proto,
Expand Down Expand Up @@ -888,21 +889,23 @@ where
}

// If we received a goodbye, shutdown the connection.
if let InboundRequest::Goodbye(_) = req {
if let RequestType::Goodbye(_) = req {
self.shutdown(None);
}

self.events_out.push(HandlerEvent::Ok(RPCReceived::Request(
self.current_inbound_substream_id,
req,
)));
self.events_out
.push(HandlerEvent::Ok(RPCReceived::Request(Request {
id: RequestId::next(),
substream_id: self.current_inbound_substream_id,
r#type: req,
})));
self.current_inbound_substream_id.0 += 1;
}

fn on_fully_negotiated_outbound(
&mut self,
substream: OutboundFramed<Stream, E>,
(id, request): (Id, OutboundRequest<E>),
(id, request): (Id, RequestType<E>),
) {
self.dial_negotiated -= 1;
// Reset any io-retries counter.
Expand Down Expand Up @@ -958,7 +961,7 @@ where
}
fn on_dial_upgrade_error(
&mut self,
request_info: (Id, OutboundRequest<E>),
request_info: (Id, RequestType<E>),
error: StreamUpgradeError<RPCError>,
) {
let (id, req) = request_info;
Expand Down Expand Up @@ -1016,15 +1019,15 @@ impl slog::Value for SubstreamId {
/// error that occurred with sending a message is reported also.
async fn send_message_to_inbound_substream<E: EthSpec>(
mut substream: InboundSubstream<E>,
message: RPCCodedResponse<E>,
message: RpcResponse<E>,
last_chunk: bool,
) -> Result<(InboundSubstream<E>, bool), RPCError> {
if matches!(message, RPCCodedResponse::StreamTermination(_)) {
if matches!(message, RpcResponse::StreamTermination(_)) {
substream.close().await.map(|_| (substream, true))
} else {
// chunks that are not stream terminations get sent, and the stream is closed if
// the response is an error
let is_error = matches!(message, RPCCodedResponse::Error(..));
let is_error = matches!(message, RpcResponse::Error(..));

let send_result = substream.send(message).await;

Expand Down
Loading

0 comments on commit 82098e1

Please sign in to comment.