Skip to content

Commit

Permalink
feat(l1): full sync (#1312)
Browse files Browse the repository at this point in the history
**Motivation**
Support syncing via fullsync
<!-- Why does this pull request exist? What are its goals? -->

**Description**

- Implement full sync
- Trigger full sync in fork choice update
- Implement communication between backend and peers in p2p crate via
channels that allow to send and receive messages from the peer data on
the kademlia table to the peer's active connection listen loop. This way
backend processes, such as syncing, can make requests and receive
responses from a chosen peer

Leftover work: #1317 #1318 

Status:
Succesfully passes `ethereum/sync` test suite when removing V3-specifc
fork choice & payload checks (see commit
[c6d6767](c6d6767))

This PR shares the same basis as snap-sync #1209 

<!-- Link to issues: Resolves #111, Resolves #222 -->

Closes: None, but is a good basis for snap sync

---------

Co-authored-by: Esteban Dimitroff Hodi <[email protected]>
Co-authored-by: Martin Paulucci <[email protected]>
Co-authored-by: Francisco Krause Arnim <[email protected]>
  • Loading branch information
4 people authored Nov 28, 2024
1 parent 3162649 commit 9a2f8e0
Show file tree
Hide file tree
Showing 24 changed files with 508 additions and 99 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ stop-localnet-silent:
@kurtosis enclave stop lambdanet >/dev/null 2>&1 || true
@kurtosis enclave rm lambdanet --force >/dev/null 2>&1 || true

HIVE_REVISION := fc6ddec210095e2369019e7f4ab2f9f38e35a8e8
HIVE_REVISION := f220e0c55fb222aaaffdf17d66aa0537cd16a67a
# Shallow clones can't specify a single revision, but at least we avoid working
# the whole history by making it shallow since a given date (one day before our
# target revision).
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ ethrex supports the following command line arguments:
- `--discovery.port <PORT>`: UDP port for P2P discovery. Default value: 30303.
- `--bootnodes <BOOTNODE_LIST>`: Comma separated enode URLs for P2P discovery bootstrap.
- `--log.level <LOG_LEVEL>`: The verbosity level used for logs. Default value: info. possible values: info, debug, trace, warn, error
- `--syncmode <SYNC_MODE>`: The way in which the node will sync its state. Can be either "full" or "snap" with "snap" as default value.

# ethrex L2

Expand Down
6 changes: 6 additions & 0 deletions cmd/ethrex/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ pub fn cli() -> Command {
.required(false)
.value_name("CHAIN_RLP_PATH"),
)
.arg(
Arg::new("syncmode")
.long("syncmode")
.required(false)
.value_name("SYNC_MODE"),
)
.arg(
Arg::new("import_dir")
.long("import_dir")
Expand Down
54 changes: 38 additions & 16 deletions cmd/ethrex/ethrex.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,29 @@
use bytes::Bytes;
use directories::ProjectDirs;

use ethrex_blockchain::add_block;
use ethrex_blockchain::fork_choice::apply_fork_choice;
use ethrex_core::types::{Block, Genesis};
use ethrex_core::H256;
use ethrex_net::bootnode::BootNode;
use ethrex_net::node_id_from_signing_key;
use ethrex_net::types::Node;
use ethrex_blockchain::{add_block, fork_choice::apply_fork_choice};
use ethrex_core::{
types::{Block, Genesis},
H256,
};
use ethrex_net::{
bootnode::BootNode, node_id_from_signing_key, peer_table, sync::SyncManager, types::Node,
};
use ethrex_rlp::decode::RLPDecode;
use ethrex_storage::{EngineType, Store};
use k256::ecdsa::SigningKey;
use local_ip_address::local_ip;
use std::fs;
use std::future::IntoFuture;
use std::path::Path;
use std::str::FromStr as _;
use std::time::Duration;
use std::{
fs::File,
fs::{self, File},
future::IntoFuture,
io,
net::{Ipv4Addr, SocketAddr, ToSocketAddrs},
path::Path,
str::FromStr as _,
time::Duration,
};
use tokio_util::task::TaskTracker;
use tracing::{error, info, warn};
use tracing_subscriber::filter::Directive;
use tracing_subscriber::{EnvFilter, FmtSubscriber};
use tracing_subscriber::{filter::Directive, EnvFilter, FmtSubscriber};
mod cli;
mod decode;

Expand Down Expand Up @@ -117,6 +115,11 @@ async fn main() {
.get_one::<String>("datadir")
.map_or(set_datadir(DEFAULT_DATADIR), |datadir| set_datadir(datadir));

let snap_sync = is_snap_sync(&matches);
if snap_sync {
info!("snap-sync not available, defaulting to full-sync");
}

let store = Store::new(&data_dir, EngineType::Libmdbx).expect("Failed to create Store");

let genesis = read_genesis_file(genesis_file_path);
Expand Down Expand Up @@ -173,6 +176,10 @@ async fn main() {
tcp_port: tcp_socket_addr.port(),
node_id: local_node_id,
};
// Create Kademlia Table here so we can access it from rpc server (for syncing)
let peer_table = peer_table(signer.clone());
// Create SyncManager
let syncer = SyncManager::new(peer_table.clone(), snap_sync);

// TODO: Check every module starts properly.
let tracker = TaskTracker::new();
Expand All @@ -182,6 +189,7 @@ async fn main() {
store.clone(),
jwt_secret,
local_p2p_node,
syncer,
)
.into_future();

Expand Down Expand Up @@ -215,6 +223,7 @@ async fn main() {
tcp_socket_addr,
bootnodes,
signer,
peer_table,
store,
)
.into_future();
Expand Down Expand Up @@ -282,6 +291,19 @@ fn parse_socket_addr(addr: &str, port: &str) -> io::Result<SocketAddr> {
))
}

fn is_snap_sync(matches: &clap::ArgMatches) -> bool {
let syncmode = matches.get_one::<String>("syncmode");
if let Some(syncmode) = syncmode {
match &**syncmode {
"full" => false,
"snap" => true,
other => panic!("Invalid syncmode {other} expected either snap or full"),
}
} else {
true
}
}

fn set_datadir(datadir: &str) -> String {
let project_dir = ProjectDirs::from("", "", datadir).expect("Couldn't find home directory");
project_dir
Expand Down
3 changes: 2 additions & 1 deletion crates/networking/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ edition = "2021"

[dependencies]
ethrex-core.workspace = true
ethrex-blockchain.workspace = true
ethrex-rlp.workspace = true
ethrex-storage.workspace = true
ethrex-blockchain.workspace = true
ethrex-trie.workspace = true

tracing.workspace = true
tokio.workspace = true
Expand Down
41 changes: 40 additions & 1 deletion crates/networking/p2p/kademlia.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::{
discv4::{time_now_unix, FindNodeRequest},
peer_channels::PeerChannels,
types::Node,
};
use ethrex_core::{H256, H512, U256};
use sha3::{Digest, Keccak256};
use tokio::sync::mpsc::UnboundedSender;
use tracing::info;

pub const MAX_NODES_PER_BUCKET: usize = 16;
const NUMBER_OF_BUCKETS: usize = 256;
Expand Down Expand Up @@ -187,7 +189,7 @@ impl KademliaTable {
/// ## Dev note:
/// This function should be improved:
/// We might keep the `peers` list sorted by last_ping as we would avoid unnecessary loops
pub fn get_least_recently_pinged_peers(&mut self, limit: usize) -> Vec<PeerData> {
pub fn get_least_recently_pinged_peers(&self, limit: usize) -> Vec<PeerData> {
let mut peers = vec![];

for bucket in &self.buckets {
Expand Down Expand Up @@ -254,6 +256,40 @@ impl KademliaTable {

None
}

/// Set the sender end of the channel between the kademlia table and the peer's active connection
/// This function should be called each time a connection is established so the backend can send requests to the peers
pub fn set_channels(&mut self, node_id: H512, channels: PeerChannels) {
let bucket_idx = bucket_number(self.local_node_id, node_id);
if let Some(peer) = self.buckets.get_mut(bucket_idx).and_then(|bucket| {
bucket
.peers
.iter_mut()
.find(|peer| peer.node.node_id == node_id)
}) {
peer.channels = Some(channels)
}
}

/// TODO: Randomly select peer
pub fn get_peer(&self) -> Option<PeerData> {
self.get_least_recently_pinged_peers(1).pop()
}

/// Returns the channel ends to an active peer connection
/// The peer is selected randomly (TODO), and doesn't guarantee that the selected peer is not currenlty busy
/// If no peer is found, this method will try again after 10 seconds
/// TODO: Filter peers by capabilities, set max amount of retries
pub async fn get_peer_channels(&self) -> PeerChannels {
loop {
if let Some(channels) = self.get_peer().and_then(|peer| peer.channels) {
return channels;
}
info!("[Sync] No peers available, retrying in 10 sec");
// This is the unlikely case where we just started the node and don't have peers, wait a bit and try again
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
}
}
}

/// Computes the distance between two nodes according to the discv4 protocol
Expand All @@ -279,6 +315,8 @@ pub struct PeerData {
pub liveness: u16,
/// if a revalidation was sent to the peer, the bool marks if it has answered
pub revalidation: Option<bool>,
/// communication channels between the peer data and its active connection
pub channels: Option<PeerChannels>,
}

impl PeerData {
Expand All @@ -292,6 +330,7 @@ impl PeerData {
last_ping_hash: None,
find_node_request: None,
revalidation: None,
channels: None,
}
}

Expand Down
17 changes: 12 additions & 5 deletions crates/networking/p2p/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use k256::{
ecdsa::SigningKey,
elliptic_curve::{sec1::ToEncodedPoint, PublicKey},
};
use kademlia::{bucket_number, KademliaTable, MAX_NODES_PER_BUCKET};
pub use kademlia::KademliaTable;
use kademlia::{bucket_number, MAX_NODES_PER_BUCKET};
use rand::rngs::OsRng;
use rlpx::{connection::RLPxConnection, message::Message as RLPxMessage};
use tokio::{
Expand All @@ -30,8 +31,10 @@ use types::{Endpoint, Node};
pub mod bootnode;
pub(crate) mod discv4;
pub(crate) mod kademlia;
pub mod peer_channels;
pub mod rlpx;
pub(crate) mod snap;
pub mod sync;
pub mod types;

const MAX_DISC_PACKET_SIZE: usize = 1280;
Expand All @@ -42,17 +45,21 @@ const MAX_DISC_PACKET_SIZE: usize = 1280;
// we should bump this limit.
const MAX_MESSAGES_TO_BROADCAST: usize = 1000;

pub fn peer_table(signer: SigningKey) -> Arc<Mutex<KademliaTable>> {
let local_node_id = node_id_from_signing_key(&signer);
Arc::new(Mutex::new(KademliaTable::new(local_node_id)))
}

pub async fn start_network(
udp_addr: SocketAddr,
tcp_addr: SocketAddr,
bootnodes: Vec<BootNode>,
signer: SigningKey,
peer_table: Arc<Mutex<KademliaTable>>,
storage: Store,
) {
info!("Starting discovery service at {udp_addr}");
info!("Listening for requests at {tcp_addr}");
let local_node_id = node_id_from_signing_key(&signer);
let table = Arc::new(Mutex::new(KademliaTable::new(local_node_id)));
let (channel_broadcast_send_end, _) = tokio::sync::broadcast::channel::<(
tokio::task::Id,
Arc<RLPxMessage>,
Expand All @@ -61,15 +68,15 @@ pub async fn start_network(
udp_addr,
signer.clone(),
storage.clone(),
table.clone(),
peer_table.clone(),
bootnodes,
channel_broadcast_send_end.clone(),
));
let server_handle = tokio::spawn(serve_requests(
tcp_addr,
signer.clone(),
storage.clone(),
table.clone(),
peer_table.clone(),
channel_broadcast_send_end,
));

Expand Down
Loading

0 comments on commit 9a2f8e0

Please sign in to comment.