Skip to content

Commit

Permalink
feat(l1): initiate rlpx connections from discv4 (#936)
Browse files Browse the repository at this point in the history
**Motivation**

DiscV4 protocol discovers new peers, and after deciding it is a valid
peer, a TCP connection is established and RLPx protocol starts

**Description**

Now, when receiving a DiscV4 Pong message and after evaluating the peer,
a RLPx connection is created and established. If handshake fails it
removes the peer from the Kademlia table.

Closes #837.

Also removed some hard-coded testing code.

Once the listen loop is built (#840) there may be other conditions where
a peer has to be discarded. (eg. when the other peer sends a Disconnect
message). After #840 is completed we may create some more issues on this
regard.

---------

Co-authored-by: Maximo Palopoli <[email protected]>
Co-authored-by: Javier Rodríguez Chatruc <[email protected]>
Co-authored-by: Federico Borello <[email protected]>
Co-authored-by: Marcos Nicolau <[email protected]>
Co-authored-by: Rodrigo Oliveri <[email protected]>
Co-authored-by: Ivan Litteri <[email protected]>
Co-authored-by: Manuel Iñaki Bilbao <[email protected]>
Co-authored-by: Martin Paulucci <[email protected]>
Co-authored-by: François <[email protected]>
  • Loading branch information
10 people authored Oct 23, 2024
1 parent 80f6448 commit 943a395
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 80 deletions.
2 changes: 2 additions & 0 deletions crates/networking/p2p/bootnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ impl FromStr for BootNode {
type Err = ParseIntError;
/// Takes a str with the format "enode://nodeID@IPaddress:port" and
/// parses it to a BootNode
// TODO: fix it to support different UDP and TCP ports, according to
// https://github.com/lambdaclass/lambda_ethereum_rust/issues/905
fn from_str(input: &str) -> Result<BootNode, ParseIntError> {
// TODO: error handling
let node_id = H512::from_str(&input[8..136]).expect("Failed to parse node id");
Expand Down
134 changes: 60 additions & 74 deletions crates/networking/p2p/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,32 @@ pub async fn start_network(
) {
info!("Starting discovery service at {udp_addr}");
info!("Listening for requests at {tcp_addr}");
let local_node_id = node_id_from_signing_key(&signer);
let table = Arc::new(Mutex::new(KademliaTable::new(local_node_id)));

let discovery_handle = tokio::spawn(discover_peers(udp_addr, signer.clone(), bootnodes));
let server_handle = tokio::spawn(serve_requests(tcp_addr, signer.clone(), storage.clone()));
// TODO Remove this spawn, it's just for testing
// https://github.com/lambdaclass/lambda_ethereum_rust/issues/837
tokio::spawn(start_hardcoded_connection(tcp_addr, signer, storage));
let discovery_handle = tokio::spawn(discover_peers(
udp_addr,
signer.clone(),
table.clone(),
bootnodes,
));
let server_handle = tokio::spawn(serve_requests(
tcp_addr,
signer.clone(),
storage.clone(),
table.clone(),
));

try_join!(discovery_handle, server_handle).unwrap();
}

async fn discover_peers(udp_addr: SocketAddr, signer: SigningKey, bootnodes: Vec<BootNode>) {
async fn discover_peers(
udp_addr: SocketAddr,
signer: SigningKey,
table: Arc<Mutex<KademliaTable>>,
bootnodes: Vec<BootNode>,
) {
let udp_socket = Arc::new(UdpSocket::bind(udp_addr).await.unwrap());
let local_node_id = node_id_from_signing_key(&signer);
let table = Arc::new(Mutex::new(KademliaTable::new(local_node_id)));

let server_handler = tokio::spawn(discover_peers_server(
udp_addr,
Expand Down Expand Up @@ -89,7 +101,7 @@ async fn discover_peers(udp_addr: SocketAddr, signer: SigningKey, bootnodes: Vec
udp_socket.clone(),
table.clone(),
signer.clone(),
local_node_id,
node_id_from_signing_key(&signer),
PEERS_RANDOM_LOOKUP_TIME_IN_MIN as u64 * 60,
));

Expand Down Expand Up @@ -165,6 +177,7 @@ async fn discover_peers_server(
}
}
Message::Pong(msg) => {
let table = table.clone();
if is_expired(msg.expiration) {
debug!("Ignoring pong as it is expired.");
continue;
Expand All @@ -180,17 +193,14 @@ async fn discover_peers_server(
}
if peer.last_ping_hash.unwrap() == msg.ping_hash {
table.lock().await.pong_answered(peer.node.node_id);
// TODO: make this work to initiate p2p thread
// https://github.com/lambdaclass/lambda_ethereum_rust/issues/837
let _node = peer.node;
let mut msg_buf = vec![0; read - (32 + 65)];
buf[32 + 65..read].clone_into(&mut msg_buf);
let mut sig_bytes = vec![0; 65];
buf[32..32 + 65].clone_into(&mut sig_bytes);
let _signer_clone = signer.clone();
// tokio::spawn(async move {
// handle_peer_as_initiator(signer_clone, &msg_buf, &node).await;
// });

let mut msg_buf = vec![0; read - 32];
buf[32..read].clone_into(&mut msg_buf);
let signer_clone = signer.clone();
tokio::spawn(async move {
handle_peer_as_initiator(signer_clone, &msg_buf, &peer.node, table)
.await;
});
} else {
debug!(
"Discarding pong as the hash did not match the last corresponding ping"
Expand Down Expand Up @@ -306,7 +316,9 @@ async fn discovery_startup(
table.lock().await.insert_node(Node {
ip: bootnode.socket_address.ip(),
udp_port: bootnode.socket_address.port(),
tcp_port: 0,
// TODO: udp port can differ from tcp port.
// see https://github.com/lambdaclass/lambda_ethereum_rust/issues/905
tcp_port: bootnode.socket_address.port(),
node_id: bootnode.node_id,
});
let ping_hash = ping(&udp_socket, udp_addr, bootnode.socket_address, &signer).await;
Expand Down Expand Up @@ -712,82 +724,55 @@ async fn pong(socket: &UdpSocket, to_addr: SocketAddr, ping_hash: H256, signer:
let _ = socket.send_to(&buf, to_addr).await;
}

// TODO: remove this function. It's used for a hardcoded test
// https://github.com/lambdaclass/lambda_ethereum_rust/issues/837
async fn start_hardcoded_connection(tcp_addr: SocketAddr, signer: SigningKey, _storage: Store) {
let mut udp_addr = tcp_addr;
udp_addr.set_port(tcp_addr.port() + 1);
let udp_socket = UdpSocket::bind(udp_addr).await.unwrap();

// Try contacting a known peer
// TODO: this is just an example, and we should do this dynamically
let str_udp_addr = "127.0.0.1:30307";

let udp_addr: SocketAddr = str_udp_addr.parse().unwrap();

let mut buf = vec![0; MAX_DISC_PACKET_SIZE];

let (msg, endpoint, node_id) = loop {
ping(&udp_socket, tcp_addr, udp_addr, &signer).await;

let (read, from) = udp_socket.recv_from(&mut buf).await.unwrap();
debug!("RLPx: Received {read} bytes from {from}");
let packet = Packet::decode(&buf[..read]).unwrap();
debug!("RLPx: Message: {:?}", packet);

match packet.get_message() {
Message::Pong(pong) => {
break (&buf[32..read], pong.to, packet.get_node_id());
}
Message::Ping(ping) => {
break (&buf[32..read], ping.from, packet.get_node_id());
}
_ => {
tracing::warn!("Unexpected message type");
}
};
};

let node = Node {
ip: endpoint.ip,
udp_port: 30307u16, //endpoint.udp_port,
tcp_port: 30307u16, //endpoint.tcp_port,
node_id,
};
handle_peer_as_initiator(signer, msg, &node).await;
}

// TODO build a proper listen loop that receives requests from both
// peers and business layer and propagate storage to use when required
// https://github.com/lambdaclass/lambda_ethereum_rust/issues/840
async fn serve_requests(tcp_addr: SocketAddr, signer: SigningKey, _storage: Store) {
async fn serve_requests(
tcp_addr: SocketAddr,
signer: SigningKey,
_storage: Store,
table: Arc<Mutex<KademliaTable>>,
) {
let tcp_socket = TcpSocket::new_v4().unwrap();
tcp_socket.bind(tcp_addr).unwrap();
let listener = tcp_socket.listen(50).unwrap();
loop {
let (stream, _peer_addr) = listener.accept().await.unwrap();

tokio::spawn(handle_peer_as_receiver(signer.clone(), stream));
tokio::spawn(handle_peer_as_receiver(
signer.clone(),
stream,
table.clone(),
));
}
}

async fn handle_peer_as_receiver(signer: SigningKey, stream: TcpStream) {
async fn handle_peer_as_receiver(
signer: SigningKey,
stream: TcpStream,
table: Arc<Mutex<KademliaTable>>,
) {
let conn = RLPxConnection::receiver(signer, stream);
handle_peer(conn).await;
handle_peer(conn, table).await;
}

async fn handle_peer_as_initiator(signer: SigningKey, msg: &[u8], node: &Node) {
async fn handle_peer_as_initiator(
signer: SigningKey,
msg: &[u8],
node: &Node,
table: Arc<Mutex<KademliaTable>>,
) {
info!("Trying RLPx connection with {node:?}");
let stream = TcpSocket::new_v4()
.unwrap()
.connect(SocketAddr::new(node.ip, node.tcp_port))
.await
.unwrap();
let conn = RLPxConnection::initiator(signer, msg, stream).await;
handle_peer(conn).await;
handle_peer(conn, table).await;
}

async fn handle_peer(mut conn: RLPxConnection<TcpStream>) {
async fn handle_peer(mut conn: RLPxConnection<TcpStream>, table: Arc<Mutex<KademliaTable>>) {
match conn.handshake().await {
Ok(_) => {
// TODO Properly build listen loop
Expand All @@ -797,8 +782,9 @@ async fn handle_peer(mut conn: RLPxConnection<TcpStream>) {
// }
}
Err(e) => {
// TODO propagate error to eventually discard peer from kademlia table
// Discard peer from kademlia table
info!("Handshake failed, discarding peer: ({e})");
table.lock().await.replace_peer(conn.get_remote_node_id());
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions crates/networking/p2p/rlpx/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,14 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
_ => panic!("Received an unexpected message"),
}
}

pub fn get_remote_node_id(&self) -> H512 {
match &self.state {
RLPxConnectionState::Established(state) => state.remote_node_id,
// TODO proper error
_ => panic!("Invalid state"),
}
}
}

enum RLPxConnectionState {
Expand Down Expand Up @@ -358,6 +366,7 @@ impl ReceivedAuth {
}

struct InitiatedAuth {
pub(crate) remote_node_id: H512,
pub(crate) local_nonce: H256,
pub(crate) local_ephemeral_key: SecretKey,
pub(crate) local_init_message: Vec<u8>,
Expand All @@ -366,6 +375,7 @@ struct InitiatedAuth {
impl InitiatedAuth {
pub fn new(previous_state: &Initiator, local_init_message: Vec<u8>) -> Self {
Self {
remote_node_id: previous_state.remote_node_id,
local_nonce: previous_state.nonce,
local_ephemeral_key: previous_state.ephemeral_key.clone(),
local_init_message,
Expand All @@ -374,6 +384,7 @@ impl InitiatedAuth {
}

pub struct Established {
pub remote_node_id: H512,
pub(crate) mac_key: H256,
pub ingress_mac: Keccak256,
pub egress_mac: Keccak256,
Expand All @@ -391,6 +402,7 @@ impl Established {
.into();

Self::new(
previous_state.remote_node_id,
init_message,
previous_state.local_nonce,
previous_state.local_ephemeral_key.clone(),
Expand All @@ -413,6 +425,7 @@ impl Established {
Keccak256::digest([remote_nonce.0, previous_state.local_nonce.0].concat()).into();

Self::new(
previous_state.remote_node_id,
previous_state.local_init_message.clone(),
previous_state.local_nonce,
previous_state.local_ephemeral_key.clone(),
Expand All @@ -423,7 +436,9 @@ impl Established {
)
}

#[allow(clippy::too_many_arguments)]
fn new(
remote_node_id: H512,
local_init_message: Vec<u8>,
local_nonce: H256,
local_ephemeral_key: SecretKey,
Expand Down Expand Up @@ -456,6 +471,7 @@ impl Established {
let ingress_aes = <Aes256Ctr64BE as KeyIvInit>::new(&aes_key.0.into(), &[0; 16].into());
let egress_aes = ingress_aes.clone();
Self {
remote_node_id,
mac_key,
ingress_mac,
egress_mac,
Expand Down
2 changes: 0 additions & 2 deletions crates/networking/p2p/rlpx/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ pub(crate) fn decode_ack_message(
}

fn decrypt_message(static_key: &SecretKey, msg: &[u8], auth_data: [u8; 2]) -> Vec<u8> {
info!("msg {msg:?}");

// Split the message into its components. General layout is:
// public-key (65) || iv (16) || ciphertext || mac (32)
let (pk, rest) = msg.split_at(65);
Expand Down
10 changes: 7 additions & 3 deletions crates/networking/p2p/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,13 @@ impl Node {
pub fn enode_url(&self) -> String {
let node_id = hex::encode(self.node_id);
let node_ip = self.ip;
let discovery_port = self.tcp_port;
let listener_port = self.udp_port;
format!("enode://{node_id}@{node_ip}:{listener_port}?discport={discovery_port}")
let discovery_port = self.udp_port;
let listener_port = self.tcp_port;
if discovery_port != listener_port {
format!("enode://{node_id}@{node_ip}:{listener_port}?discport={discovery_port}")
} else {
format!("enode://{node_id}@{node_ip}:{listener_port}")
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/networking/rpc/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ mod tests {
let result = map_http_requests(&request, storage, local_p2p_node, Default::default());
let rpc_response = rpc_response(request.id, result);
let expected_response = to_rpc_response_success_value(
r#"{"jsonrpc":"2.0","id":1,"result":{"enode":"enode://d860a01f9722d78051619d1e2351aba3f43f943f6f00718d1b9baa4101932a1f5011f16bb2b1bb35db20d6fe28fa0bf09636d26a87d31de9ec6203eeedb1f666@127.0.0.1:30303?discport=30303","id":"d860a01f9722d78051619d1e2351aba3f43f943f6f00718d1b9baa4101932a1f5011f16bb2b1bb35db20d6fe28fa0bf09636d26a87d31de9ec6203eeedb1f666","ip":"127.0.0.1","name":"ethereum_rust/0.1.0/rust1.80","ports":{"discovery":30303,"listener":30303},"protocols":{"eth":{"chainId":3151908,"homesteadBlock":0,"daoForkBlock":null,"daoForkSupport":false,"eip150Block":0,"eip155Block":0,"eip158Block":0,"byzantiumBlock":0,"constantinopleBlock":0,"petersburgBlock":0,"istanbulBlock":0,"muirGlacierBlock":null,"berlinBlock":0,"londonBlock":0,"arrowGlacierBlock":null,"grayGlacierBlock":null,"mergeNetsplitBlock":0,"shanghaiTime":0,"cancunTime":0,"pragueTime":1718232101,"verkleTime":null,"terminalTotalDifficulty":0,"terminalTotalDifficultyPassed":true}}}}"#,
r#"{"jsonrpc":"2.0","id":1,"result":{"enode":"enode://d860a01f9722d78051619d1e2351aba3f43f943f6f00718d1b9baa4101932a1f5011f16bb2b1bb35db20d6fe28fa0bf09636d26a87d31de9ec6203eeedb1f666@127.0.0.1:30303","id":"d860a01f9722d78051619d1e2351aba3f43f943f6f00718d1b9baa4101932a1f5011f16bb2b1bb35db20d6fe28fa0bf09636d26a87d31de9ec6203eeedb1f666","ip":"127.0.0.1","name":"ethereum_rust/0.1.0/rust1.80","ports":{"discovery":30303,"listener":30303},"protocols":{"eth":{"chainId":3151908,"homesteadBlock":0,"daoForkBlock":null,"daoForkSupport":false,"eip150Block":0,"eip155Block":0,"eip158Block":0,"byzantiumBlock":0,"constantinopleBlock":0,"petersburgBlock":0,"istanbulBlock":0,"muirGlacierBlock":null,"berlinBlock":0,"londonBlock":0,"arrowGlacierBlock":null,"grayGlacierBlock":null,"mergeNetsplitBlock":0,"shanghaiTime":0,"cancunTime":0,"pragueTime":1718232101,"verkleTime":null,"terminalTotalDifficulty":0,"terminalTotalDifficultyPassed":true}}}}"#,
);
assert_eq!(rpc_response.to_string(), expected_response.to_string())
}
Expand Down

0 comments on commit 943a395

Please sign in to comment.