Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(l1): initiate rlpx connections from discv4 #936

Merged
merged 23 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
6ff269e
Add initiator handler for RLPx
ElFantasma Oct 21, 2024
03804eb
Merge branch 'main' into 837-initiate-rlpx-connections-from-discv4
ElFantasma Oct 21, 2024
e4ede46
Merge branch 'main' into 837-initiate-rlpx-connections-from-discv4
ElFantasma Oct 21, 2024
b2c132b
removed log message
ElFantasma Oct 22, 2024
fbcc695
Merge branch 'main' into 837-initiate-rlpx-connections-from-discv4
ElFantasma Oct 22, 2024
8ccbfc3
Removing peer from kademlia table if handshake fails
ElFantasma Oct 22, 2024
0ecb3a2
fix(levm): change Makefile's lint (#925)
maximopalopoli Oct 22, 2024
a0abe69
docs(l2): update README (#929)
jrchatruc Oct 22, 2024
72deb49
fix(l1): add needed patch for risc0's zkVM (#932)
fborello-lambda Oct 22, 2024
85760c2
docs(l1): add more explicit references to geth (#924)
MarcosNicolau Oct 22, 2024
b5ac694
docs(l1): readme update and kurtosis services addition (#928)
rodrigo-o Oct 22, 2024
e58d867
feat(l2): improve `Makefile` (#937)
ilitteri Oct 22, 2024
bdde416
feat(levm): add transact validations (#921)
maximopalopoli Oct 22, 2024
0d5bc07
fix(core): change `RpcRequestId` from `i32` to `u64` (#941)
ManuelBilbao Oct 22, 2024
6b29b31
feat(l1): add P2P receipt messages. (#885)
mpaulucci Oct 23, 2024
d6c84ba
docs(l2): fix readme (#948)
jrchatruc Oct 23, 2024
269c78a
refactor(l1): move P2P blocks into a separate module. (#947)
mpaulucci Oct 23, 2024
09d9bcb
refactor(levm): move opcodes to their corresponding folder (#863) (#910)
gsbujo Oct 23, 2024
25f81e3
feat(l2): add transaction commands (#918)
ManuelBilbao Oct 23, 2024
d50f5d6
fix(l2): add clippy suggestions (#951)
fborello-lambda Oct 23, 2024
f3fe5cc
removed incorrect port asignment
ElFantasma Oct 23, 2024
801d8ad
Fixed broken test
ElFantasma Oct 23, 2024
9dbec3b
Merge branch 'main' into 837-initiate-rlpx-connections-from-discv4
ElFantasma Oct 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/networking/p2p/bootnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ impl FromStr for BootNode {
type Err = ParseIntError;
/// Takes a str with the format "enode://nodeID@IPaddress:port" and
/// parses it to a BootNode
// TODO: fix it to support different UDP and TCP ports, according to
// https://github.com/lambdaclass/lambda_ethereum_rust/issues/905
fn from_str(input: &str) -> Result<BootNode, ParseIntError> {
// TODO: error handling
let node_id = H512::from_str(&input[8..136]).expect("Failed to parse node id");
Expand Down
137 changes: 62 additions & 75 deletions crates/networking/p2p/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,32 @@ pub async fn start_network(
) {
info!("Starting discovery service at {udp_addr}");
info!("Listening for requests at {tcp_addr}");
let local_node_id = node_id_from_signing_key(&signer);
let table = Arc::new(Mutex::new(KademliaTable::new(local_node_id)));

let discovery_handle = tokio::spawn(discover_peers(udp_addr, signer.clone(), bootnodes));
let server_handle = tokio::spawn(serve_requests(tcp_addr, signer.clone(), storage.clone()));
// TODO Remove this spawn, it's just for testing
// https://github.com/lambdaclass/lambda_ethereum_rust/issues/837
tokio::spawn(start_hardcoded_connection(tcp_addr, signer, storage));
let discovery_handle = tokio::spawn(discover_peers(
udp_addr,
signer.clone(),
table.clone(),
bootnodes,
));
let server_handle = tokio::spawn(serve_requests(
tcp_addr,
signer.clone(),
storage.clone(),
table.clone(),
));

try_join!(discovery_handle, server_handle).unwrap();
}

async fn discover_peers(udp_addr: SocketAddr, signer: SigningKey, bootnodes: Vec<BootNode>) {
async fn discover_peers(
udp_addr: SocketAddr,
signer: SigningKey,
table: Arc<Mutex<KademliaTable>>,
bootnodes: Vec<BootNode>,
) {
let udp_socket = Arc::new(UdpSocket::bind(udp_addr).await.unwrap());
let local_node_id = node_id_from_signing_key(&signer);
let table = Arc::new(Mutex::new(KademliaTable::new(local_node_id)));

let server_handler = tokio::spawn(discover_peers_server(
udp_addr,
Expand Down Expand Up @@ -89,7 +101,7 @@ async fn discover_peers(udp_addr: SocketAddr, signer: SigningKey, bootnodes: Vec
udp_socket.clone(),
table.clone(),
signer.clone(),
local_node_id,
node_id_from_signing_key(&signer),
PEERS_RANDOM_LOOKUP_TIME_IN_MIN as u64 * 60,
));

Expand Down Expand Up @@ -148,7 +160,8 @@ async fn discover_peers_server(
table.insert_node(Node {
ip: from.ip(),
udp_port: from.port(),
tcp_port: 0,
// TODO: Check how to obtain proper tcp port
fkrause98 marked this conversation as resolved.
Show resolved Hide resolved
tcp_port: from.port(),
node_id: packet.get_node_id(),
})
};
Expand All @@ -165,6 +178,7 @@ async fn discover_peers_server(
}
}
Message::Pong(msg) => {
let table = table.clone();
if is_expired(msg.expiration) {
debug!("Ignoring pong as it is expired.");
continue;
Expand All @@ -180,17 +194,14 @@ async fn discover_peers_server(
}
if peer.last_ping_hash.unwrap() == msg.ping_hash {
table.lock().await.pong_answered(peer.node.node_id);
// TODO: make this work to initiate p2p thread
// https://github.com/lambdaclass/lambda_ethereum_rust/issues/837
let _node = peer.node;
let mut msg_buf = vec![0; read - (32 + 65)];
buf[32 + 65..read].clone_into(&mut msg_buf);
let mut sig_bytes = vec![0; 65];
buf[32..32 + 65].clone_into(&mut sig_bytes);
let _signer_clone = signer.clone();
// tokio::spawn(async move {
// handle_peer_as_initiator(signer_clone, &msg_buf, &node).await;
// });

let mut msg_buf = vec![0; read - 32];
fkrause98 marked this conversation as resolved.
Show resolved Hide resolved
buf[32..read].clone_into(&mut msg_buf);
fkrause98 marked this conversation as resolved.
Show resolved Hide resolved
let signer_clone = signer.clone();
tokio::spawn(async move {
handle_peer_as_initiator(signer_clone, &msg_buf, &peer.node, table)
.await;
});
} else {
debug!(
"Discarding pong as the hash did not match the last corresponding ping"
Expand Down Expand Up @@ -306,7 +317,9 @@ async fn discovery_startup(
table.lock().await.insert_node(Node {
ip: bootnode.socket_address.ip(),
udp_port: bootnode.socket_address.port(),
tcp_port: 0,
// TODO: udp port can differ from tcp port.
// see https://github.com/lambdaclass/lambda_ethereum_rust/issues/905
tcp_port: bootnode.socket_address.port(),
node_id: bootnode.node_id,
});
let ping_hash = ping(&udp_socket, udp_addr, bootnode.socket_address, &signer).await;
Expand Down Expand Up @@ -712,82 +725,55 @@ async fn pong(socket: &UdpSocket, to_addr: SocketAddr, ping_hash: H256, signer:
let _ = socket.send_to(&buf, to_addr).await;
}

// TODO: remove this function. It's used for a hardcoded test
// https://github.com/lambdaclass/lambda_ethereum_rust/issues/837
async fn start_hardcoded_connection(tcp_addr: SocketAddr, signer: SigningKey, _storage: Store) {
let mut udp_addr = tcp_addr;
udp_addr.set_port(tcp_addr.port() + 1);
let udp_socket = UdpSocket::bind(udp_addr).await.unwrap();

// Try contacting a known peer
// TODO: this is just an example, and we should do this dynamically
let str_udp_addr = "127.0.0.1:30307";

let udp_addr: SocketAddr = str_udp_addr.parse().unwrap();

let mut buf = vec![0; MAX_DISC_PACKET_SIZE];

let (msg, endpoint, node_id) = loop {
ping(&udp_socket, tcp_addr, udp_addr, &signer).await;

let (read, from) = udp_socket.recv_from(&mut buf).await.unwrap();
debug!("RLPx: Received {read} bytes from {from}");
let packet = Packet::decode(&buf[..read]).unwrap();
debug!("RLPx: Message: {:?}", packet);

match packet.get_message() {
Message::Pong(pong) => {
break (&buf[32..read], pong.to, packet.get_node_id());
}
Message::Ping(ping) => {
break (&buf[32..read], ping.from, packet.get_node_id());
}
_ => {
tracing::warn!("Unexpected message type");
}
};
};

let node = Node {
ip: endpoint.ip,
udp_port: 30307u16, //endpoint.udp_port,
tcp_port: 30307u16, //endpoint.tcp_port,
node_id,
};
handle_peer_as_initiator(signer, msg, &node).await;
}

// TODO build a proper listen loop that receives requests from both
// peers and business layer and propagate storage to use when required
// https://github.com/lambdaclass/lambda_ethereum_rust/issues/840
async fn serve_requests(tcp_addr: SocketAddr, signer: SigningKey, _storage: Store) {
async fn serve_requests(
tcp_addr: SocketAddr,
signer: SigningKey,
_storage: Store,
table: Arc<Mutex<KademliaTable>>,
) {
let tcp_socket = TcpSocket::new_v4().unwrap();
tcp_socket.bind(tcp_addr).unwrap();
let listener = tcp_socket.listen(50).unwrap();
loop {
let (stream, _peer_addr) = listener.accept().await.unwrap();

tokio::spawn(handle_peer_as_receiver(signer.clone(), stream));
tokio::spawn(handle_peer_as_receiver(
signer.clone(),
stream,
table.clone(),
));
}
}

async fn handle_peer_as_receiver(signer: SigningKey, stream: TcpStream) {
async fn handle_peer_as_receiver(
signer: SigningKey,
stream: TcpStream,
table: Arc<Mutex<KademliaTable>>,
) {
let conn = RLPxConnection::receiver(signer, stream);
handle_peer(conn).await;
handle_peer(conn, table).await;
}

async fn handle_peer_as_initiator(signer: SigningKey, msg: &[u8], node: &Node) {
async fn handle_peer_as_initiator(
signer: SigningKey,
msg: &[u8],
node: &Node,
table: Arc<Mutex<KademliaTable>>,
) {
info!("Trying RLPx connection with {node:?}");
let stream = TcpSocket::new_v4()
.unwrap()
.connect(SocketAddr::new(node.ip, node.tcp_port))
.await
.unwrap();
let conn = RLPxConnection::initiator(signer, msg, stream).await;
handle_peer(conn).await;
handle_peer(conn, table).await;
}

async fn handle_peer(mut conn: RLPxConnection<TcpStream>) {
async fn handle_peer(mut conn: RLPxConnection<TcpStream>, table: Arc<Mutex<KademliaTable>>) {
match conn.handshake().await {
Ok(_) => {
// TODO Properly build listen loop
Expand All @@ -797,8 +783,9 @@ async fn handle_peer(mut conn: RLPxConnection<TcpStream>) {
// }
}
Err(e) => {
// TODO propagate error to eventually discard peer from kademlia table
// Discard peer from kademlia table
info!("Handshake failed, discarding peer: ({e})");
table.lock().await.replace_peer(conn.get_remote_node_id());
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions crates/networking/p2p/rlpx/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,14 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
_ => panic!("Received an unexpected message"),
}
}

pub fn get_remote_node_id(&self) -> H512 {
match &self.state {
RLPxConnectionState::Established(state) => state.remote_node_id,
// TODO proper error
_ => panic!("Invalid state"),
fkrause98 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

enum RLPxConnectionState {
Expand Down Expand Up @@ -358,6 +366,7 @@ impl ReceivedAuth {
}

struct InitiatedAuth {
pub(crate) remote_node_id: H512,
pub(crate) local_nonce: H256,
pub(crate) local_ephemeral_key: SecretKey,
pub(crate) local_init_message: Vec<u8>,
Expand All @@ -366,6 +375,7 @@ struct InitiatedAuth {
impl InitiatedAuth {
pub fn new(previous_state: &Initiator, local_init_message: Vec<u8>) -> Self {
Self {
remote_node_id: previous_state.remote_node_id,
local_nonce: previous_state.nonce,
local_ephemeral_key: previous_state.ephemeral_key.clone(),
local_init_message,
Expand All @@ -374,6 +384,7 @@ impl InitiatedAuth {
}

pub struct Established {
pub remote_node_id: H512,
pub(crate) mac_key: H256,
pub ingress_mac: Keccak256,
pub egress_mac: Keccak256,
Expand All @@ -391,6 +402,7 @@ impl Established {
.into();

Self::new(
previous_state.remote_node_id,
init_message,
previous_state.local_nonce,
previous_state.local_ephemeral_key.clone(),
Expand All @@ -413,6 +425,7 @@ impl Established {
Keccak256::digest([remote_nonce.0, previous_state.local_nonce.0].concat()).into();

Self::new(
previous_state.remote_node_id,
previous_state.local_init_message.clone(),
previous_state.local_nonce,
previous_state.local_ephemeral_key.clone(),
Expand All @@ -423,7 +436,9 @@ impl Established {
)
}

#[allow(clippy::too_many_arguments)]
fn new(
remote_node_id: H512,
local_init_message: Vec<u8>,
local_nonce: H256,
local_ephemeral_key: SecretKey,
Expand Down Expand Up @@ -456,6 +471,7 @@ impl Established {
let ingress_aes = <Aes256Ctr64BE as KeyIvInit>::new(&aes_key.0.into(), &[0; 16].into());
let egress_aes = ingress_aes.clone();
Self {
remote_node_id,
mac_key,
ingress_mac,
egress_mac,
Expand Down
2 changes: 0 additions & 2 deletions crates/networking/p2p/rlpx/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ pub(crate) fn decode_ack_message(
}

fn decrypt_message(static_key: &SecretKey, msg: &[u8], auth_data: [u8; 2]) -> Vec<u8> {
info!("msg {msg:?}");

// Split the message into its components. General layout is:
// public-key (65) || iv (16) || ciphertext || mac (32)
let (pk, rest) = msg.split_at(65);
Expand Down
10 changes: 7 additions & 3 deletions crates/networking/p2p/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,13 @@ impl Node {
pub fn enode_url(&self) -> String {
let node_id = hex::encode(self.node_id);
let node_ip = self.ip;
let discovery_port = self.tcp_port;
let listener_port = self.udp_port;
format!("enode://{node_id}@{node_ip}:{listener_port}?discport={discovery_port}")
let discovery_port = self.udp_port;
let listener_port = self.tcp_port;
if discovery_port != listener_port {
format!("enode://{node_id}@{node_ip}:{listener_port}?discport={discovery_port}")
} else {
format!("enode://{node_id}@{node_ip}:{listener_port}")
}
}
}

Expand Down
Loading