diff --git a/Makefile b/Makefile index b2b09eb0b..5412ccfea 100644 --- a/Makefile +++ b/Makefile @@ -71,7 +71,7 @@ stop-localnet-silent: @kurtosis enclave stop lambdanet >/dev/null 2>&1 || true @kurtosis enclave rm lambdanet --force >/dev/null 2>&1 || true -HIVE_REVISION := fc6ddec210095e2369019e7f4ab2f9f38e35a8e8 +HIVE_REVISION := f220e0c55fb222aaaffdf17d66aa0537cd16a67a # Shallow clones can't specify a single revision, but at least we avoid working # the whole history by making it shallow since a given date (one day before our # target revision). diff --git a/README.md b/README.md index bd9126c30..d4bb0d54b 100644 --- a/README.md +++ b/README.md @@ -261,6 +261,7 @@ ethrex supports the following command line arguments: - `--discovery.port `: UDP port for P2P discovery. Default value: 30303. - `--bootnodes `: Comma separated enode URLs for P2P discovery bootstrap. - `--log.level `: The verbosity level used for logs. Default value: info. possible values: info, debug, trace, warn, error +- `--syncmode `: The way in which the node will sync its state. Can be either "full" or "snap" with "snap" as default value. # ethrex L2 diff --git a/cmd/ethrex/cli.rs b/cmd/ethrex/cli.rs index b85e48548..bcab7d078 100644 --- a/cmd/ethrex/cli.rs +++ b/cmd/ethrex/cli.rs @@ -104,6 +104,12 @@ pub fn cli() -> Command { .required(false) .value_name("CHAIN_RLP_PATH"), ) + .arg( + Arg::new("syncmode") + .long("syncmode") + .required(false) + .value_name("SYNC_MODE"), + ) .arg( Arg::new("import_dir") .long("import_dir") diff --git a/cmd/ethrex/ethrex.rs b/cmd/ethrex/ethrex.rs index 371929734..926df4902 100644 --- a/cmd/ethrex/ethrex.rs +++ b/cmd/ethrex/ethrex.rs @@ -1,31 +1,29 @@ use bytes::Bytes; use directories::ProjectDirs; - -use ethrex_blockchain::add_block; -use ethrex_blockchain::fork_choice::apply_fork_choice; -use ethrex_core::types::{Block, Genesis}; -use ethrex_core::H256; -use ethrex_net::bootnode::BootNode; -use ethrex_net::node_id_from_signing_key; -use ethrex_net::types::Node; +use ethrex_blockchain::{add_block, fork_choice::apply_fork_choice}; +use ethrex_core::{ + types::{Block, Genesis}, + H256, +}; +use ethrex_net::{ + bootnode::BootNode, node_id_from_signing_key, peer_table, sync::SyncManager, types::Node, +}; use ethrex_rlp::decode::RLPDecode; use ethrex_storage::{EngineType, Store}; use k256::ecdsa::SigningKey; use local_ip_address::local_ip; -use std::fs; -use std::future::IntoFuture; -use std::path::Path; -use std::str::FromStr as _; -use std::time::Duration; use std::{ - fs::File, + fs::{self, File}, + future::IntoFuture, io, net::{Ipv4Addr, SocketAddr, ToSocketAddrs}, + path::Path, + str::FromStr as _, + time::Duration, }; use tokio_util::task::TaskTracker; use tracing::{error, info, warn}; -use tracing_subscriber::filter::Directive; -use tracing_subscriber::{EnvFilter, FmtSubscriber}; +use tracing_subscriber::{filter::Directive, EnvFilter, FmtSubscriber}; mod cli; mod decode; @@ -117,6 +115,11 @@ async fn main() { .get_one::("datadir") .map_or(set_datadir(DEFAULT_DATADIR), |datadir| set_datadir(datadir)); + let snap_sync = is_snap_sync(&matches); + if snap_sync { + info!("snap-sync not available, defaulting to full-sync"); + } + let store = Store::new(&data_dir, EngineType::Libmdbx).expect("Failed to create Store"); let genesis = read_genesis_file(genesis_file_path); @@ -173,6 +176,10 @@ async fn main() { tcp_port: tcp_socket_addr.port(), node_id: local_node_id, }; + // Create Kademlia Table here so we can access it from rpc server (for syncing) + let peer_table = peer_table(signer.clone()); + // Create SyncManager + let syncer = SyncManager::new(peer_table.clone(), snap_sync); // TODO: Check every module starts properly. let tracker = TaskTracker::new(); @@ -182,6 +189,7 @@ async fn main() { store.clone(), jwt_secret, local_p2p_node, + syncer, ) .into_future(); @@ -215,6 +223,7 @@ async fn main() { tcp_socket_addr, bootnodes, signer, + peer_table, store, ) .into_future(); @@ -282,6 +291,19 @@ fn parse_socket_addr(addr: &str, port: &str) -> io::Result { )) } +fn is_snap_sync(matches: &clap::ArgMatches) -> bool { + let syncmode = matches.get_one::("syncmode"); + if let Some(syncmode) = syncmode { + match &**syncmode { + "full" => false, + "snap" => true, + other => panic!("Invalid syncmode {other} expected either snap or full"), + } + } else { + true + } +} + fn set_datadir(datadir: &str) -> String { let project_dir = ProjectDirs::from("", "", datadir).expect("Couldn't find home directory"); project_dir diff --git a/crates/networking/p2p/Cargo.toml b/crates/networking/p2p/Cargo.toml index d0d5e27e1..10c57bd41 100644 --- a/crates/networking/p2p/Cargo.toml +++ b/crates/networking/p2p/Cargo.toml @@ -7,9 +7,10 @@ edition = "2021" [dependencies] ethrex-core.workspace = true +ethrex-blockchain.workspace = true ethrex-rlp.workspace = true ethrex-storage.workspace = true -ethrex-blockchain.workspace = true +ethrex-trie.workspace = true tracing.workspace = true tokio.workspace = true diff --git a/crates/networking/p2p/kademlia.rs b/crates/networking/p2p/kademlia.rs index 94b665001..c87e47733 100644 --- a/crates/networking/p2p/kademlia.rs +++ b/crates/networking/p2p/kademlia.rs @@ -1,10 +1,12 @@ use crate::{ discv4::{time_now_unix, FindNodeRequest}, + peer_channels::PeerChannels, types::Node, }; use ethrex_core::{H256, H512, U256}; use sha3::{Digest, Keccak256}; use tokio::sync::mpsc::UnboundedSender; +use tracing::info; pub const MAX_NODES_PER_BUCKET: usize = 16; const NUMBER_OF_BUCKETS: usize = 256; @@ -187,7 +189,7 @@ impl KademliaTable { /// ## Dev note: /// This function should be improved: /// We might keep the `peers` list sorted by last_ping as we would avoid unnecessary loops - pub fn get_least_recently_pinged_peers(&mut self, limit: usize) -> Vec { + pub fn get_least_recently_pinged_peers(&self, limit: usize) -> Vec { let mut peers = vec![]; for bucket in &self.buckets { @@ -254,6 +256,40 @@ impl KademliaTable { None } + + /// Set the sender end of the channel between the kademlia table and the peer's active connection + /// This function should be called each time a connection is established so the backend can send requests to the peers + pub fn set_channels(&mut self, node_id: H512, channels: PeerChannels) { + let bucket_idx = bucket_number(self.local_node_id, node_id); + if let Some(peer) = self.buckets.get_mut(bucket_idx).and_then(|bucket| { + bucket + .peers + .iter_mut() + .find(|peer| peer.node.node_id == node_id) + }) { + peer.channels = Some(channels) + } + } + + /// TODO: Randomly select peer + pub fn get_peer(&self) -> Option { + self.get_least_recently_pinged_peers(1).pop() + } + + /// Returns the channel ends to an active peer connection + /// The peer is selected randomly (TODO), and doesn't guarantee that the selected peer is not currenlty busy + /// If no peer is found, this method will try again after 10 seconds + /// TODO: Filter peers by capabilities, set max amount of retries + pub async fn get_peer_channels(&self) -> PeerChannels { + loop { + if let Some(channels) = self.get_peer().and_then(|peer| peer.channels) { + return channels; + } + info!("[Sync] No peers available, retrying in 10 sec"); + // This is the unlikely case where we just started the node and don't have peers, wait a bit and try again + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + } + } } /// Computes the distance between two nodes according to the discv4 protocol @@ -279,6 +315,8 @@ pub struct PeerData { pub liveness: u16, /// if a revalidation was sent to the peer, the bool marks if it has answered pub revalidation: Option, + /// communication channels between the peer data and its active connection + pub channels: Option, } impl PeerData { @@ -292,6 +330,7 @@ impl PeerData { last_ping_hash: None, find_node_request: None, revalidation: None, + channels: None, } } diff --git a/crates/networking/p2p/net.rs b/crates/networking/p2p/net.rs index c5c225531..2de5c1c69 100644 --- a/crates/networking/p2p/net.rs +++ b/crates/networking/p2p/net.rs @@ -16,7 +16,8 @@ use k256::{ ecdsa::SigningKey, elliptic_curve::{sec1::ToEncodedPoint, PublicKey}, }; -use kademlia::{bucket_number, KademliaTable, MAX_NODES_PER_BUCKET}; +pub use kademlia::KademliaTable; +use kademlia::{bucket_number, MAX_NODES_PER_BUCKET}; use rand::rngs::OsRng; use rlpx::{connection::RLPxConnection, message::Message as RLPxMessage}; use tokio::{ @@ -30,8 +31,10 @@ use types::{Endpoint, Node}; pub mod bootnode; pub(crate) mod discv4; pub(crate) mod kademlia; +pub mod peer_channels; pub mod rlpx; pub(crate) mod snap; +pub mod sync; pub mod types; const MAX_DISC_PACKET_SIZE: usize = 1280; @@ -42,17 +45,21 @@ const MAX_DISC_PACKET_SIZE: usize = 1280; // we should bump this limit. const MAX_MESSAGES_TO_BROADCAST: usize = 1000; +pub fn peer_table(signer: SigningKey) -> Arc> { + let local_node_id = node_id_from_signing_key(&signer); + Arc::new(Mutex::new(KademliaTable::new(local_node_id))) +} + pub async fn start_network( udp_addr: SocketAddr, tcp_addr: SocketAddr, bootnodes: Vec, signer: SigningKey, + peer_table: Arc>, storage: Store, ) { info!("Starting discovery service at {udp_addr}"); info!("Listening for requests at {tcp_addr}"); - let local_node_id = node_id_from_signing_key(&signer); - let table = Arc::new(Mutex::new(KademliaTable::new(local_node_id))); let (channel_broadcast_send_end, _) = tokio::sync::broadcast::channel::<( tokio::task::Id, Arc, @@ -61,7 +68,7 @@ pub async fn start_network( udp_addr, signer.clone(), storage.clone(), - table.clone(), + peer_table.clone(), bootnodes, channel_broadcast_send_end.clone(), )); @@ -69,7 +76,7 @@ pub async fn start_network( tcp_addr, signer.clone(), storage.clone(), - table.clone(), + peer_table.clone(), channel_broadcast_send_end, )); diff --git a/crates/networking/p2p/peer_channels.rs b/crates/networking/p2p/peer_channels.rs new file mode 100644 index 000000000..47858de15 --- /dev/null +++ b/crates/networking/p2p/peer_channels.rs @@ -0,0 +1,112 @@ +use std::{sync::Arc, time::Duration}; + +use ethrex_core::{ + types::{BlockBody, BlockHeader}, + H256, +}; +use tokio::sync::{mpsc, Mutex}; + +use crate::{ + rlpx::eth::blocks::{ + BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, BLOCK_HEADER_LIMIT, + }, + RLPxMessage, +}; + +pub const PEER_REPLY_TIMOUT: Duration = Duration::from_secs(45); +pub const MAX_MESSAGES_IN_PEER_CHANNEL: usize = 25; + +#[derive(Debug, Clone)] +/// Holds the respective sender and receiver ends of the communication channels bewteen the peer data and its active connection +pub struct PeerChannels { + sender: mpsc::Sender, + receiver: Arc>>, +} + +impl PeerChannels { + /// Sets up the communication channels for the peer + /// Returns the channel endpoints to send to the active connection's listen loop + pub(crate) fn create() -> (Self, mpsc::Sender, mpsc::Receiver) { + let (sender, connection_receiver) = + mpsc::channel::(MAX_MESSAGES_IN_PEER_CHANNEL); + let (connection_sender, receiver) = + mpsc::channel::(MAX_MESSAGES_IN_PEER_CHANNEL); + ( + Self { + sender, + receiver: Arc::new(Mutex::new(receiver)), + }, + connection_sender, + connection_receiver, + ) + } + + /// Requests block headers from the peer + /// Returns the response message or None if: + /// - There are no available peers (the node just started up or was rejected by all other nodes) + /// - The response timed out + /// - The response was empty or not valid + pub async fn request_block_headers(&self, start: H256) -> Option> { + let request_id = rand::random(); + let request = RLPxMessage::GetBlockHeaders(GetBlockHeaders { + id: request_id, + startblock: start.into(), + limit: BLOCK_HEADER_LIMIT, + skip: 0, + reverse: false, + }); + self.sender.send(request).await.ok()?; + let mut receiver = self.receiver.lock().await; + let block_headers = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { + loop { + match receiver.recv().await { + Some(RLPxMessage::BlockHeaders(BlockHeaders { id, block_headers })) + if id == request_id => + { + return Some(block_headers) + } + // Ignore replies that don't match the expected id (such as late responses) + Some(_) => continue, + None => return None, + } + } + }) + .await + .ok()??; + (!block_headers.is_empty()).then_some(block_headers) + } + + /// Requests block headers from the peer + /// Returns the response message or None if: + /// - There are no available peers (the node just started up or was rejected by all other nodes) + /// - The response timed out + /// - The response was empty or not valid + pub async fn request_block_bodies(&self, block_hashes: Vec) -> Option> { + let block_hashes_len = block_hashes.len(); + let request_id = rand::random(); + let request = RLPxMessage::GetBlockBodies(GetBlockBodies { + id: request_id, + block_hashes, + }); + self.sender.send(request).await.ok()?; + let mut receiver = self.receiver.lock().await; + let block_bodies = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { + loop { + match receiver.recv().await { + Some(RLPxMessage::BlockBodies(BlockBodies { id, block_bodies })) + if id == request_id => + { + return Some(block_bodies) + } + // Ignore replies that don't match the expected id (such as late responses) + Some(_) => continue, + None => return None, + } + } + }) + .await + .ok()??; + // Check that the response is not empty and does not contain more bodies than the ones requested + (!block_bodies.is_empty() && block_bodies.len() <= block_hashes_len).then_some(block_bodies) + } +} diff --git a/crates/networking/p2p/rlpx/connection.rs b/crates/networking/p2p/rlpx/connection.rs index 7e2962f01..1d4e5e09a 100644 --- a/crates/networking/p2p/rlpx/connection.rs +++ b/crates/networking/p2p/rlpx/connection.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use crate::{ + peer_channels::PeerChannels, rlpx::{ eth::{ backend, @@ -23,7 +24,7 @@ use super::{ error::RLPxError, frame, handshake::{decode_ack_message, decode_auth_message, encode_auth_message}, - message as rlpx, + message::{self as rlpx}, p2p::Capability, utils::{ecdh_xchng, pubkey2id}, }; @@ -41,7 +42,7 @@ use tokio::{ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, sync::{ broadcast::{self, error::RecvError}, - Mutex, + mpsc, Mutex, }, task, time::{sleep, Instant}, @@ -151,7 +152,19 @@ impl RLPxConnection { self.peer_conn_failed("Handshake failed", e, table).await; } else { // Handshake OK: handle connection - if let Err(e) = self.handle_peer_conn().await { + // Create channels to communicate directly to the peer + let (peer_channels, sender, receiver) = PeerChannels::create(); + let Ok(node_id) = self.get_remote_node_id() else { + return self + .peer_conn_failed( + "Error during RLPx connection", + RLPxError::InvalidState(), + table, + ) + .await; + }; + table.lock().await.set_channels(node_id, peer_channels); + if let Err(e) = self.handle_peer_conn(sender, receiver).await { self.peer_conn_failed("Error during RLPx connection", e, table) .await; } @@ -238,7 +251,11 @@ impl RLPxConnection { } } - async fn handle_peer_conn(&mut self) -> Result<(), RLPxError> { + async fn handle_peer_conn( + &mut self, + sender: mpsc::Sender, + mut receiver: mpsc::Receiver, + ) -> Result<(), RLPxError> { if let RLPxConnectionState::Established(_) = &self.state { self.init_peer_conn().await?; debug!("Started peer main loop"); @@ -257,7 +274,7 @@ impl RLPxConnection { tokio::select! { // TODO check if this is cancel safe, and fix it if not. message = self.receive() => { - self.handle_message(message?).await?; + self.handle_message(message?, sender.clone()).await?; } // This is not ideal, but using the receiver without // this function call, causes the loop to take ownwership @@ -270,6 +287,9 @@ impl RLPxConnection { Some(broadcasted_msg) = Self::maybe_wait_for_broadcaster(&mut broadcaster_receive) => { self.handle_broadcast(broadcasted_msg?).await? } + Some(message) = receiver.recv() => { + self.send(message).await?; + } _ = sleep(PERIODIC_TASKS_CHECK_INTERVAL) => { // no progress on other tasks, yield control to check // periodic tasks @@ -308,7 +328,11 @@ impl RLPxConnection { Ok(()) } - async fn handle_message(&mut self, message: Message) -> Result<(), RLPxError> { + async fn handle_message( + &mut self, + message: Message, + sender: mpsc::Sender, + ) -> Result<(), RLPxError> { let peer_supports_eth = self.capabilities.contains(&CAP_ETH); match message { Message::Disconnect(msg_data) => { @@ -365,6 +389,14 @@ impl RLPxConnection { let response = process_trie_nodes_request(req, self.storage.clone())?; self.send(Message::TrieNodes(response)).await? } + // Send response messages to the backend + message @ Message::AccountRange(_) + | message @ Message::StorageRanges(_) + | message @ Message::ByteCodes(_) + | message @ Message::TrieNodes(_) + | message @ Message::BlockBodies(_) + | message @ Message::BlockHeaders(_) + | message @ Message::Receipts(_) => sender.send(message).await?, // TODO: Add new message types and handlers as they are implemented message => return Err(RLPxError::MessageNotHandled(format!("{message}"))), }; diff --git a/crates/networking/p2p/rlpx/error.rs b/crates/networking/p2p/rlpx/error.rs index c7b5abae8..66f6c927b 100644 --- a/crates/networking/p2p/rlpx/error.rs +++ b/crates/networking/p2p/rlpx/error.rs @@ -4,6 +4,8 @@ use ethrex_storage::error::StoreError; use thiserror::Error; use tokio::sync::broadcast::error::RecvError; +use super::message::Message; + // TODO improve errors #[derive(Debug, Error)] pub(crate) enum RLPxError { @@ -39,6 +41,8 @@ pub(crate) enum RLPxError { BroadcastError(String), #[error(transparent)] RecvError(#[from] RecvError), + #[error(transparent)] + Send(#[from] tokio::sync::mpsc::error::SendError), #[error("Error when inserting transaction in the mempool: {0}")] MempoolError(#[from] MempoolError), } diff --git a/crates/networking/p2p/rlpx/eth/blocks.rs b/crates/networking/p2p/rlpx/eth/blocks.rs index 4d50374c0..01ae0d3c1 100644 --- a/crates/networking/p2p/rlpx/eth/blocks.rs +++ b/crates/networking/p2p/rlpx/eth/blocks.rs @@ -37,6 +37,12 @@ impl RLPEncode for HashOrNumber { } } +impl From for HashOrNumber { + fn from(value: BlockHash) -> Self { + Self::Hash(value) + } +} + impl RLPDecode for HashOrNumber { fn decode_unfinished(buf: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> { let first_byte = buf.first().ok_or(RLPDecodeError::InvalidLength)?; @@ -54,8 +60,8 @@ impl RLPDecode for HashOrNumber { } // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getblockheaders-0x03 -#[derive(Debug)] -pub(crate) struct GetBlockHeaders { +#[derive(Debug, Clone)] +pub struct GetBlockHeaders { // id is a u64 chosen by the requesting peer, the responding peer must mirror the value for the response // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages pub id: u64, @@ -159,8 +165,8 @@ impl RLPxMessage for GetBlockHeaders { } // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#blockheaders-0x04 -#[derive(Debug)] -pub(crate) struct BlockHeaders { +#[derive(Debug, Clone)] +pub struct BlockHeaders { // id is a u64 chosen by the requesting peer, the responding peer must mirror the value for the response // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages pub id: u64, @@ -199,8 +205,8 @@ impl RLPxMessage for BlockHeaders { } // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getblockbodies-0x05 -#[derive(Debug)] -pub(crate) struct GetBlockBodies { +#[derive(Debug, Clone)] +pub struct GetBlockBodies { // id is a u64 chosen by the requesting peer, the responding peer must mirror the value for the response // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages pub id: u64, @@ -264,8 +270,8 @@ impl RLPxMessage for GetBlockBodies { } // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#blockbodies-0x06 -#[derive(Debug)] -pub(crate) struct BlockBodies { +#[derive(Debug, Clone)] +pub struct BlockBodies { // id is a u64 chosen by the requesting peer, the responding peer must mirror the value for the response // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages pub id: u64, diff --git a/crates/networking/p2p/rlpx/eth/receipts.rs b/crates/networking/p2p/rlpx/eth/receipts.rs index 0ec74af35..6c02119d8 100644 --- a/crates/networking/p2p/rlpx/eth/receipts.rs +++ b/crates/networking/p2p/rlpx/eth/receipts.rs @@ -48,6 +48,7 @@ impl RLPxMessage for GetReceipts { } // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#receipts-0x10 +#[derive(Debug)] pub(crate) struct Receipts { // id is a u64 chosen by the requesting peer, the responding peer must mirror the value for the response // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages diff --git a/crates/networking/p2p/rlpx/eth/transactions.rs b/crates/networking/p2p/rlpx/eth/transactions.rs index 8ad8c627c..0f2af96b0 100644 --- a/crates/networking/p2p/rlpx/eth/transactions.rs +++ b/crates/networking/p2p/rlpx/eth/transactions.rs @@ -12,7 +12,7 @@ use crate::rlpx::{ // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#transactions-0x02 // Broadcast message -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct Transactions { pub(crate) transactions: Vec, } diff --git a/crates/networking/p2p/rlpx/message.rs b/crates/networking/p2p/rlpx/message.rs index d44f1b4ca..377d6966a 100644 --- a/crates/networking/p2p/rlpx/message.rs +++ b/crates/networking/p2p/rlpx/message.rs @@ -3,6 +3,7 @@ use ethrex_rlp::error::{RLPDecodeError, RLPEncodeError}; use std::fmt::Display; use super::eth::blocks::{BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders}; +use super::eth::receipts::Receipts; use super::eth::status::StatusMessage; use super::eth::transactions::Transactions; use super::p2p::{DisconnectMessage, HelloMessage, PingMessage, PongMessage}; @@ -31,6 +32,7 @@ pub(crate) enum Message { Transactions(Transactions), GetBlockBodies(GetBlockBodies), BlockBodies(BlockBodies), + Receipts(Receipts), // snap capability GetAccountRange(GetAccountRange), AccountRange(AccountRange), @@ -64,6 +66,8 @@ impl Message { 0x13 => Ok(Message::GetBlockHeaders(GetBlockHeaders::decode(msg_data)?)), 0x14 => Ok(Message::BlockHeaders(BlockHeaders::decode(msg_data)?)), 0x15 => Ok(Message::GetBlockBodies(GetBlockBodies::decode(msg_data)?)), + 0x16 => Ok(Message::BlockBodies(BlockBodies::decode(msg_data)?)), + 0x20 => Ok(Message::Receipts(Receipts::decode(msg_data)?)), 0x21 => Ok(Message::GetAccountRange(GetAccountRange::decode(msg_data)?)), 0x22 => Ok(Message::AccountRange(AccountRange::decode(msg_data)?)), 0x23 => Ok(Message::GetStorageRanges(GetStorageRanges::decode( @@ -120,6 +124,10 @@ impl Message { 0x16_u8.encode(buf); msg.encode(buf) } + Message::Receipts(msg) => { + 0x20_u8.encode(buf); + msg.encode(buf) + } Message::GetAccountRange(msg) => { 0x21_u8.encode(buf); msg.encode(buf) @@ -169,6 +177,7 @@ impl Display for Message { Message::BlockBodies(_) => "eth:BlockBodies".fmt(f), Message::Transactions(_) => "eth:TransactionsMessage".fmt(f), Message::GetBlockBodies(_) => "eth:GetBlockBodies".fmt(f), + Message::Receipts(_) => "eth:Receipts".fmt(f), Message::GetAccountRange(_) => "snap:GetAccountRange".fmt(f), Message::AccountRange(_) => "snap:AccountRange".fmt(f), Message::GetStorageRanges(_) => "snap:GetStorageRanges".fmt(f), diff --git a/crates/networking/p2p/snap.rs b/crates/networking/p2p/snap.rs index 498101954..bf9ca53e8 100644 --- a/crates/networking/p2p/snap.rs +++ b/crates/networking/p2p/snap.rs @@ -10,6 +10,8 @@ use crate::rlpx::{ }, }; +// Request Processing + pub fn process_account_range_request( request: GetAccountRange, store: Store, @@ -26,15 +28,11 @@ pub fn process_account_range_request( break; } } - let proof = store - .get_account_range_proof( - request.root_hash, - request.starting_hash, - accounts.last().map(|acc| acc.hash), - )? - .iter() - .map(|bytes| Bytes::copy_from_slice(bytes)) - .collect(); + let proof = proof_to_encodable(store.get_account_range_proof( + request.root_hash, + request.starting_hash, + accounts.last().map(|acc| acc.hash), + )?); Ok(AccountRange { id: request.id, accounts, @@ -72,7 +70,7 @@ pub fn process_storage_ranges_request( // Generate proofs only if the response doesn't contain the full storage range for the account // Aka if the starting hash is not zero or if the response was capped due to byte limit if !request.starting_hash.is_zero() || res_capped && !account_slots.is_empty() { - proof.extend( + proof.extend(proof_to_encodable( store .get_storage_range_proof( request.root_hash, @@ -80,10 +78,8 @@ pub fn process_storage_ranges_request( request.starting_hash, account_slots.last().map(|acc| acc.hash), )? - .unwrap_or_default() - .iter() - .map(|bytes| Bytes::copy_from_slice(bytes)), - ); + .unwrap_or_default(), + )); } if !account_slots.is_empty() { @@ -153,6 +149,19 @@ pub fn process_trie_nodes_request( }) } +// Helper method to convert proof to RLP-encodable format +#[inline] +fn proof_to_encodable(proof: Vec>) -> Vec { + proof.into_iter().map(Bytes::from).collect() +} + +// Helper method to obtain proof from RLP-encodable format +#[inline] +#[allow(unused)] +fn encodable_to_proof(proof: &[Bytes]) -> Vec> { + proof.iter().map(|bytes| bytes.to_vec()).collect() +} + #[cfg(test)] mod tests { use std::str::FromStr; diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs new file mode 100644 index 000000000..551967237 --- /dev/null +++ b/crates/networking/p2p/sync.rs @@ -0,0 +1,137 @@ +use std::sync::Arc; + +use ethrex_blockchain::error::ChainError; +use ethrex_core::{ + types::{Block, BlockHash, BlockHeader}, + H256, +}; +use ethrex_storage::Store; +use tokio::{sync::Mutex, time::Instant}; +use tracing::{debug, info, warn}; + +use crate::kademlia::KademliaTable; + +/// Manager in charge the sync process +/// Only performs full-sync but will also be in charge of snap-sync in the future +#[derive(Debug)] +pub struct SyncManager { + // true: syncmode = snap, false = syncmode = full + #[allow(unused)] + snap_mode: bool, + peers: Arc>, +} + +impl SyncManager { + pub fn new(peers: Arc>, snap_mode: bool) -> Self { + Self { snap_mode, peers } + } + + /// Starts a sync cycle, updating the state with all blocks between the current head and the sync head + /// TODO: only uses full sync, should also process snap sync once implemented + pub async fn start_sync(&mut self, mut current_head: H256, sync_head: H256, store: Store) { + info!("Syncing from current head {current_head} to sync_head {sync_head}"); + let start_time = Instant::now(); + // Request all block headers between the current head and the sync head + // We will begin from the current head so that we download the earliest state first + // This step is not parallelized + let mut all_block_headers = vec![]; + let mut all_block_hashes = vec![]; + loop { + let peer = self.peers.lock().await.get_peer_channels().await; + debug!("Requesting Block Headers from {current_head}"); + // Request Block Headers from Peer + if let Some(block_headers) = peer.request_block_headers(current_head).await { + debug!("Received {} block headers", block_headers.len()); + let block_hashes = block_headers + .iter() + .map(|header| header.compute_block_hash()) + .collect::>(); + // Keep headers so we can process them later + // Discard the first header as we already have it + all_block_headers.extend_from_slice(&block_headers[1..]); + all_block_hashes.extend_from_slice(&block_hashes[1..]); + + // Check if we already reached our sync head or if we need to fetch more blocks + if !block_hashes.contains(&sync_head) { + // Update the request to fetch the next batch + current_head = *block_hashes.last().unwrap(); + } else { + // No more headers to request + break; + } + } + } + // We finished fetching all headers, now we can process them + // TODO: snap-sync: launch tasks to fetch blocks and state in parallel + // full-sync: Fetch all block bodies and execute them sequentially to build the state + match tokio::spawn(download_and_run_blocks( + all_block_hashes, + all_block_headers, + self.peers.clone(), + store.clone(), + )) + .await + { + Ok(Ok(())) => info!( + "Sync finished, time elapsed: {} secs", + start_time.elapsed().as_secs() + ), + Ok(Err(error)) => warn!( + "Sync failed due to {error}, time elapsed: {} secs ", + start_time.elapsed().as_secs() + ), + _ => warn!( + "Sync failed due to internal error, time elapsed: {} secs", + start_time.elapsed().as_secs() + ), + } + } + + /// Creates a dummy SyncManager for tests where syncing is not needed + /// This should only be used in tests as it won't be able to connect to the p2p network + pub fn dummy() -> Self { + let dummy_peer_table = Arc::new(Mutex::new(KademliaTable::new(Default::default()))); + Self { + snap_mode: false, + peers: dummy_peer_table, + } + } +} + +/// Requests block bodies from peers via p2p, executes and stores them +/// Returns an error if there was a problem while executing or validating the blocks +async fn download_and_run_blocks( + mut block_hashes: Vec, + mut block_headers: Vec, + peers: Arc>, + store: Store, +) -> Result<(), ChainError> { + loop { + let peer = peers.lock().await.get_peer_channels().await; + debug!("Requesting Block Bodies "); + if let Some(block_bodies) = peer.request_block_bodies(block_hashes.clone()).await { + let block_bodies_len = block_bodies.len(); + debug!("Received {} Block Bodies", block_bodies_len); + // Execute and store blocks + for body in block_bodies.into_iter() { + // We already validated that there are no more block bodies than the ones requested + let header = block_headers.remove(0); + let hash = block_hashes.remove(0); + let number = header.number; + let block = Block::new(header, body); + if let Err(error) = ethrex_blockchain::add_block(&block, &store) { + warn!("Failed to add block during FullSync: {error}"); + return Err(error); + } + store.set_canonical_block(number, hash)?; + store.update_latest_block_number(number)?; + } + debug!("Executed & stored {} blocks", block_bodies_len); + // Check if we need to ask for another batch + if block_hashes.is_empty() { + break; + } + } + } + Ok(()) +} diff --git a/crates/networking/rpc/engine/fork_choice.rs b/crates/networking/rpc/engine/fork_choice.rs index d6834ba09..8ef1e5aa4 100644 --- a/crates/networking/rpc/engine/fork_choice.rs +++ b/crates/networking/rpc/engine/fork_choice.rs @@ -58,36 +58,56 @@ impl RpcHandler for ForkChoiceUpdatedV3 { } fn handle(&self, context: RpcApiContext) -> Result { - let storage = &context.storage; info!( "New fork choice request with head: {}, safe: {}, finalized: {}.", self.fork_choice_state.head_block_hash, self.fork_choice_state.safe_block_hash, self.fork_choice_state.finalized_block_hash ); - let fork_choice_error_to_response = |error| { - let response = match error { - InvalidForkChoice::NewHeadAlreadyCanonical => ForkChoiceResponse::from( - PayloadStatus::valid_with_hash(latest_canonical_block_hash(storage).unwrap()), - ), - InvalidForkChoice::Syncing => ForkChoiceResponse::from(PayloadStatus::syncing()), - reason => { - warn!("Invalid fork choice state. Reason: {:#?}", reason); - return Err(RpcErr::InvalidForkChoiceState(reason.to_string())); - } - }; - - serde_json::to_value(response).map_err(|error| RpcErr::Internal(error.to_string())) - }; let head_block = match apply_fork_choice( - storage, + &context.storage, self.fork_choice_state.head_block_hash, self.fork_choice_state.safe_block_hash, self.fork_choice_state.finalized_block_hash, ) { Ok(head) => head, - Err(error) => return fork_choice_error_to_response(error), + Err(error) => { + let fork_choice_response = match error { + InvalidForkChoice::NewHeadAlreadyCanonical => { + ForkChoiceResponse::from(PayloadStatus::valid_with_hash( + latest_canonical_block_hash(&context.storage).unwrap(), + )) + } + InvalidForkChoice::Syncing => { + // Start sync + let current_number = context.storage.get_latest_block_number()?.unwrap(); + let Some(current_head) = + context.storage.get_canonical_block_hash(current_number)? + else { + return Err(RpcErr::Internal( + "Missing latest canonical block".to_owned(), + )); + }; + let sync_head = self.fork_choice_state.head_block_hash; + tokio::spawn(async move { + // If we can't get hold of the syncer, then it means that there is an active sync in process + if let Ok(mut syncer) = context.syncer.try_lock() { + syncer + .start_sync(current_head, sync_head, context.storage.clone()) + .await + } + }); + ForkChoiceResponse::from(PayloadStatus::syncing()) + } + reason => { + warn!("Invalid fork choice state. Reason: {:#?}", reason); + return Err(RpcErr::InvalidForkChoiceState(reason.to_string())); + } + }; + return serde_json::to_value(fork_choice_response) + .map_err(|error| RpcErr::Internal(error.to_string())); + } }; // Build block from received payload. This step is skipped if applying the fork choice state failed @@ -101,7 +121,7 @@ impl RpcHandler for ForkChoiceUpdatedV3 { Ok(None) => (), Ok(Some(attributes)) => { info!("Fork choice updated includes payload attributes. Creating a new payload."); - let chain_config = storage.get_chain_config()?; + let chain_config = context.storage.get_chain_config()?; if !chain_config.is_cancun_activated(attributes.timestamp) { return Err(RpcErr::UnsuportedFork( "forkChoiceV3 used to build pre-Cancun payload".to_string(), @@ -123,14 +143,14 @@ impl RpcHandler for ForkChoiceUpdatedV3 { }; let payload_id = args.id(); response.set_id(payload_id); - let payload = match create_payload(&args, storage) { + let payload = match create_payload(&args, &context.storage) { Ok(payload) => payload, Err(ChainError::EvmError(error)) => return Err(error.into()), // Parent block is guaranteed to be present at this point, // so the only errors that may be returned are internal storage errors Err(error) => return Err(RpcErr::Internal(error.to_string())), }; - storage.add_payload(payload_id, payload)?; + context.storage.add_payload(payload_id, payload)?; } } diff --git a/crates/networking/rpc/eth/filter.rs b/crates/networking/rpc/eth/filter.rs index 0a7027dfd..197dbc02b 100644 --- a/crates/networking/rpc/eth/filter.rs +++ b/crates/networking/rpc/eth/filter.rs @@ -257,6 +257,7 @@ mod tests { sync::{Arc, Mutex}, time::{Duration, Instant}, }; + use tokio::sync::Mutex as TokioMutex; use super::ActiveFilters; use crate::{ @@ -273,6 +274,7 @@ mod tests { utils::{test_utils::example_p2p_node, RpcRequest}, }; use ethrex_core::types::Genesis; + use ethrex_net::sync::SyncManager; use ethrex_storage::{EngineType, Store}; use serde_json::{json, Value}; @@ -442,6 +444,7 @@ mod tests { jwt_secret: Default::default(), local_p2p_node: example_p2p_node(), active_filters: filters_pointer.clone(), + syncer: Arc::new(TokioMutex::new(SyncManager::dummy())), }; let request: RpcRequest = serde_json::from_value(json_req).expect("Test json is incorrect"); let genesis_config: Genesis = @@ -494,6 +497,7 @@ mod tests { local_p2p_node: example_p2p_node(), jwt_secret: Default::default(), active_filters: active_filters.clone(), + syncer: Arc::new(TokioMutex::new(SyncManager::dummy())), }; map_http_requests(&uninstall_filter_req, context).unwrap(); @@ -513,6 +517,7 @@ mod tests { local_p2p_node: example_p2p_node(), active_filters: active_filters.clone(), jwt_secret: Default::default(), + syncer: Arc::new(TokioMutex::new(SyncManager::dummy())), }; let uninstall_filter_req: RpcRequest = serde_json::from_value(json!( { diff --git a/crates/networking/rpc/eth/gas_price.rs b/crates/networking/rpc/eth/gas_price.rs index cba06811f..c34122654 100644 --- a/crates/networking/rpc/eth/gas_price.rs +++ b/crates/networking/rpc/eth/gas_price.rs @@ -114,11 +114,12 @@ mod tests { }, Address, Bloom, H256, U256, }; - use ethrex_net::types::Node; + use ethrex_net::{sync::SyncManager, types::Node}; use ethrex_storage::{EngineType, Store}; use hex_literal::hex; use serde_json::json; - use std::{net::Ipv4Addr, str::FromStr}; + use std::{net::Ipv4Addr, str::FromStr, sync::Arc}; + use tokio::sync::Mutex; // Base price for each test transaction. const BASE_PRICE_IN_WEI: u64 = 10_u64.pow(9); fn test_header(block_num: u64) -> BlockHeader { @@ -385,6 +386,7 @@ mod tests { node_id: Default::default(), }, active_filters: Default::default(), + syncer: Arc::new(Mutex::new(SyncManager::dummy())), } } } diff --git a/crates/networking/rpc/rpc.rs b/crates/networking/rpc/rpc.rs index d90762c2a..80581d2ab 100644 --- a/crates/networking/rpc/rpc.rs +++ b/crates/networking/rpc/rpc.rs @@ -32,6 +32,7 @@ use eth::{ GetTransactionByHashRequest, GetTransactionReceiptRequest, }, }; +use ethrex_net::sync::SyncManager; use serde_json::Value; use std::{ collections::HashMap, @@ -40,7 +41,7 @@ use std::{ sync::{Arc, Mutex}, time::Duration, }; -use tokio::net::TcpListener; +use tokio::{net::TcpListener, sync::Mutex as TokioMutex}; use tracing::info; use types::transaction::SendRawTransactionRequest; use utils::{ @@ -65,6 +66,7 @@ pub struct RpcApiContext { jwt_secret: Bytes, local_p2p_node: Node, active_filters: ActiveFilters, + syncer: Arc>, } trait RpcHandler: Sized { @@ -92,6 +94,7 @@ pub async fn start_api( storage: Store, jwt_secret: Bytes, local_p2p_node: Node, + syncer: SyncManager, ) { // TODO: Refactor how filters are handled, // filters are used by the filters endpoints (eth_newFilter, eth_getFilterChanges, ...etc) @@ -101,6 +104,7 @@ pub async fn start_api( jwt_secret, local_p2p_node, active_filters: active_filters.clone(), + syncer: Arc::new(TokioMutex::new(syncer)), }; // Periodically clean up the active filters for the filters endpoints. @@ -325,6 +329,7 @@ mod tests { storage, jwt_secret: Default::default(), active_filters: Default::default(), + syncer: Arc::new(TokioMutex::new(SyncManager::dummy())), }; let result = map_http_requests(&request, context); let rpc_response = rpc_response(request.id, result); @@ -362,6 +367,7 @@ mod tests { storage, jwt_secret: Default::default(), active_filters: Default::default(), + syncer: Arc::new(TokioMutex::new(SyncManager::dummy())), }; let result = map_http_requests(&request, context); let response = rpc_response(request.id, result); @@ -391,6 +397,7 @@ mod tests { storage, jwt_secret: Default::default(), active_filters: Default::default(), + syncer: Arc::new(TokioMutex::new(SyncManager::dummy())), }; let result = map_http_requests(&request, context); let response = diff --git a/crates/networking/rpc/utils.rs b/crates/networking/rpc/utils.rs index 4ea572ed1..23e489b7f 100644 --- a/crates/networking/rpc/utils.rs +++ b/crates/networking/rpc/utils.rs @@ -247,7 +247,7 @@ pub mod test_utils { use std::{net::SocketAddr, str::FromStr}; use ethrex_core::H512; - use ethrex_net::types::Node; + use ethrex_net::{sync::SyncManager, types::Node}; use ethrex_storage::{EngineType, Store}; use crate::start_api; @@ -285,6 +285,14 @@ pub mod test_utils { let jwt_secret = Default::default(); let local_p2p_node = example_p2p_node(); - start_api(http_addr, authrpc_addr, storage, jwt_secret, local_p2p_node).await; + start_api( + http_addr, + authrpc_addr, + storage, + jwt_secret, + local_p2p_node, + SyncManager::dummy(), + ) + .await; } } diff --git a/crates/storage/trie/db.rs b/crates/storage/trie/db.rs index c124ff848..e2f5249de 100644 --- a/crates/storage/trie/db.rs +++ b/crates/storage/trie/db.rs @@ -3,7 +3,6 @@ pub mod in_memory; pub mod libmdbx; #[cfg(feature = "libmdbx")] pub mod libmdbx_dupsort; -pub mod null; use crate::error::TrieError; diff --git a/crates/storage/trie/db/null.rs b/crates/storage/trie/db/null.rs deleted file mode 100644 index 69df1a52d..000000000 --- a/crates/storage/trie/db/null.rs +++ /dev/null @@ -1,15 +0,0 @@ -use super::TrieDB; -use crate::error::TrieError; - -/// Used for small/pruned tries that don't have a database and just cache their nodes. -pub struct NullTrieDB; - -impl TrieDB for NullTrieDB { - fn get(&self, _key: Vec) -> Result>, TrieError> { - Ok(None) - } - - fn put(&self, _key: Vec, _value: Vec) -> Result<(), TrieError> { - Ok(()) - } -} diff --git a/crates/storage/trie/trie.rs b/crates/storage/trie/trie.rs index af1e610e8..2177e7399 100644 --- a/crates/storage/trie/trie.rs +++ b/crates/storage/trie/trie.rs @@ -7,10 +7,6 @@ mod rlp; mod state; #[cfg(test)] mod test_utils; - -use std::collections::HashSet; - -use db::null::NullTrieDB; mod trie_iter; mod verify_range; use ethereum_types::H256; @@ -19,6 +15,7 @@ use nibbles::Nibbles; use node::Node; use node_hash::NodeHash; use sha3::{Digest, Keccak256}; +use std::collections::HashSet; #[cfg(feature = "libmdbx")] pub use self::db::{libmdbx::LibmdbxTrieDB, libmdbx_dupsort::LibmdbxDupsortTrieDB}; @@ -207,7 +204,7 @@ impl Trie { root_node: Option<&NodeRLP>, other_nodes: &[NodeRLP], ) -> Result { - let mut trie = Trie::new(Box::new(NullTrieDB)); + let mut trie = Trie::stateless(); if let Some(root_node) = root_node { let root_node = Node::decode_raw(root_node)?;