diff --git a/.github/workflows/ci_l1.yaml b/.github/workflows/ci_l1.yaml index 22bd57951..d305a2fd4 100644 --- a/.github/workflows/ci_l1.yaml +++ b/.github/workflows/ci_l1.yaml @@ -181,6 +181,9 @@ jobs: - name: "Paris Engine tests" simulation: ethereum/engine test_pattern: "engine-api/RPC|Re-Org Back to Canonical Chain From Syncing Chain|Re-org to Previously Validated Sidechain Payload|Re-Org Back into Canonical Chain, Depth=5|Safe Re-Org|Transaction Re-Org|Inconsistent|Suggested Fee|PrevRandao|Fork ID|Unknown|Invalid PayloadAttributes|Bad Hash|Unique Payload ID|Re-Execute Payload|In-Order|Multiple New Payloads|Valid NewPayload|NewPayload with|Invalid NewPayload|Payload Build|Invalid NewPayload, Transaction|ParentHash equals|Build Payload|Invalid Missing Ancestor ReOrg" + - name: "Sync" + simulation: ethereum/sync + test_pattern: "" steps: - name: Checkout sources uses: actions/checkout@v4 diff --git a/crates/networking/docs/Sync.md b/crates/networking/docs/Sync.md index 8beedf312..836b804e1 100644 --- a/crates/networking/docs/Sync.md +++ b/crates/networking/docs/Sync.md @@ -2,23 +2,33 @@ ## Snap Sync -A snap sync cycle begins by fetching all the block headers (via p2p) between the current head (latest canonical block) and the sync head (block hash sent by a forkChoiceUpdate). -The next two steps are performed in parallel: -On one side, blocks and receipts for all fetched headers are fetched via p2p and stored. +A snap sync cycle begins by fetching all the block headers (via eth p2p) between the current head (latest canonical block) and the sync head (block hash sent by a forkChoiceUpdate). -On the other side, the state is reconstructed via p2p snap requests. Our current implementation of this works as follows: -We will spawn two processes, the `bytecode_fetcher` which will remain active and process bytecode requests in batches by requesting bytecode batches from peers and storing them, and the `fetch_snap_state` process, which will iterate over the fetched headers and rebuild the block's state via `rebuild_state_trie`. +We will then fetch the block bodies from each header and at the same time select a pivot block (sync head - 64) and start rebuilding its state via snap p2p requests, if the pivot were to become stale during this rebuild we will select a newer pivot (sync head) and restart it. -`rebuild_state_trie` will spawn a `storage_fetcher` process (which works similarly to the `bytecode_fetcher` and is kept alive for the duration of the rebuild process), it will open a new state trie and will fetch the block's accounts in batches and for each account it will: send the account's code hash to the `bytecode_fetcher` (if not empty), send the account's address and storage root to the `storage_fetcher` (if not empty), and add the account to the state trie. Once all accounts are processed, the final state root will be checked and committed. +After we fully rebuilt the pivot state and fetched all the block bodies we will fetch and store the receipts for the range between the current head and the pivot (including it), and at the same time store all blocks in the same range and execute all blocks after the pivot (like in full sync). -(Not implemented yet) When `fetch_snap_state` runs out of available state (aka, the state we need to fetch is older than 128 blocks and peers don't provide it), it will begin the `state_healing` process. This diagram illustrates the process described above: -![snap_sync](/crates/networking/docs/diagrams/snap_sync.jpg) +![snap_sync](/crates/networking/docs/diagrams/snap_sync.jpg). + +### Snap State Rebuild + +During snap sync we need to fully rebuild the pivot block's state. We can divide this process into the initial sync and the healing phase. +For the first phase we will spawn two processes, the `bytecode_fetcher` and the `storage_fetcher` which will both remain active and listening for requests from the main rebuild process which they will then queue and process in fixed size batches (more on this later). It will then request the full extent of accounts from the pivot block's state trie via p2p snap requests. For each obtained range we will send the account's code hash and storage root to the `bytecode_fetcher` and `storage_fetcher` respectively for fetching. Once we fetch all accounts (or the account state is no longer available), we will signal the `storage_fetcher` to finish all pending requests and move on to the next phase, while keeping the `bytecode_fetcher` active. + +In the healing phase we will spawn another queue-like process called `storage_healer`, and we will begin requesting state trie nodes. We will begin by requesting the pivot block's state's root node proceed by requesting the current node's children (if they are not already part of the state) until we have the full trie stored (aka all child nodes are known). For each fetched leaf node we will send its code hash to the `bytecode_fetcher` and account hash to the `storage_healer`. + +The `storage_healer` will contain a list of pending account hashes and paths. And will add new entries by either adding the root node of an account's storage trie when receiving an account hash from the main process or by adding the unknown children of nodes returned by peers. -The `bytecode_fetcher` has its own channel where it receives code hashes from active `rebuild_state_trie` processes. Once a code hash is received, it is added to a pending queue. When the queue has enough messages for a full batch it will request a batch of bytecodes via snap p2p and store them. If a bytecode could not be fetched by the request (aka, we reached the response limit) it is added back to the pending queue. After the whole state is synced `fetch_snap_state` will send an empty list to the `bytecode_fetcher` to signal the end of the requests so it can request the last (incomplete) bytecode batch and end gracefully. This diagram illustrates the process described above: -![snap_sync](/crates/networking/docs/diagrams/bytecode_fetcher.jpg) +![rebuild_state](/crates/networking/docs/diagrams/rebuild_state_trie.jpg). -The `storage_fetcher` works almost alike, but one will be spawned for each `rebuild_state_trie` process as we can't fetch storages from different blocks in the same request. +To exemplify how queue-like processes work we will explain how the `bytecode_fetcher` works: + +The `bytecode_fetcher` has its own channel where it receives code hashes from an active `rebuild_state_trie` process. Once a code hash is received, it is added to a pending queue. When the queue has enough messages for a full batch it will request a batch of bytecodes via snap p2p and store them. If a bytecode could not be fetched by the request (aka, we reached the response limit) it is added back to the pending queue. After the whole state is synced `fetch_snap_state` will send an empty list to the `bytecode_fetcher` to signal the end of the requests so it can request the last (incomplete) bytecode batch and end gracefully. + +This diagram illustrates the process described above: + +![snap_sync](/crates/networking/docs/diagrams/bytecode_fetcher.jpg) diff --git a/crates/networking/docs/diagrams/rebuild_state_trie.jpg b/crates/networking/docs/diagrams/rebuild_state_trie.jpg new file mode 100644 index 000000000..4ac255e82 Binary files /dev/null and b/crates/networking/docs/diagrams/rebuild_state_trie.jpg differ diff --git a/crates/networking/docs/diagrams/snap_sync.jpg b/crates/networking/docs/diagrams/snap_sync.jpg index 7f2c93ae1..79b9ff3be 100644 Binary files a/crates/networking/docs/diagrams/snap_sync.jpg and b/crates/networking/docs/diagrams/snap_sync.jpg differ diff --git a/crates/networking/p2p/peer_channels.rs b/crates/networking/p2p/peer_channels.rs index 4a722805e..ba01aa1fa 100644 --- a/crates/networking/p2p/peer_channels.rs +++ b/crates/networking/p2p/peer_channels.rs @@ -1,21 +1,26 @@ -use std::{sync::Arc, time::Duration}; +use std::{collections::BTreeMap, sync::Arc, time::Duration}; use bytes::Bytes; use ethrex_core::{ - types::{AccountState, BlockBody, BlockHeader}, + types::{AccountState, BlockBody, BlockHeader, Receipt}, H256, U256, }; use ethrex_rlp::encode::RLPEncode; -use ethrex_trie::verify_range; +use ethrex_trie::Nibbles; +use ethrex_trie::{verify_range, Node}; use tokio::sync::{mpsc, Mutex}; use crate::{ rlpx::{ - eth::blocks::{ - BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, BLOCK_HEADER_LIMIT, + eth::{ + blocks::{ + BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, BLOCK_HEADER_LIMIT, + }, + receipts::{GetReceipts, Receipts}, }, snap::{ - AccountRange, ByteCodes, GetAccountRange, GetByteCodes, GetStorageRanges, StorageRanges, + AccountRange, ByteCodes, GetAccountRange, GetByteCodes, GetStorageRanges, GetTrieNodes, + StorageRanges, TrieNodes, }, }, snap::encodable_to_proof, @@ -121,6 +126,38 @@ impl PeerChannels { (!block_bodies.is_empty() && block_bodies.len() <= block_hashes_len).then_some(block_bodies) } + /// Requests all receipts in a set of blocks from the peer given their block hashes + /// Returns the lists of receipts or None if: + /// - There are no available peers (the node just started up or was rejected by all other nodes) + /// - The response timed out + /// - The response was empty or not valid + pub async fn request_receipts(&self, block_hashes: Vec) -> Option>> { + let block_hashes_len = block_hashes.len(); + let request_id = rand::random(); + let request = RLPxMessage::GetReceipts(GetReceipts { + id: request_id, + block_hashes, + }); + self.sender.send(request).await.ok()?; + let mut receiver = self.receiver.lock().await; + let receipts = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { + loop { + match receiver.recv().await { + Some(RLPxMessage::Receipts(Receipts { id, receipts })) if id == request_id => { + return Some(receipts) + } + // Ignore replies that don't match the expected id (such as late responses) + Some(_) => continue, + None => return None, + } + } + }) + .await + .ok()??; + // Check that the response is not empty and does not contain more bodies than the ones requested + (!receipts.is_empty() && receipts.len() <= block_hashes_len).then_some(receipts) + } + /// Requests an account range from the peer given the state trie's root and the starting hash (the limit hash will be the maximum value of H256) /// Will also return a boolean indicating if there is more state to be fetched towards the right of the trie /// Returns the response message or None if: @@ -318,4 +355,112 @@ impl PeerChannels { } Some((storage_keys, storage_values, should_continue)) } + + /// Requests state trie nodes given the root of the trie where they are contained and their path (be them full or partial) + /// Returns the nodes or None if: + /// - There are no available peers (the node just started up or was rejected by all other nodes) + /// - The response timed out + /// - The response was empty or not valid + pub async fn request_state_trienodes( + &self, + state_root: H256, + paths: Vec, + ) -> Option> { + let request_id = rand::random(); + let expected_nodes = paths.len(); + let request = RLPxMessage::GetTrieNodes(GetTrieNodes { + id: request_id, + root_hash: state_root, + // [acc_path, acc_path,...] -> [[acc_path], [acc_path]] + paths: paths + .into_iter() + .map(|vec| vec![Bytes::from(vec.encode_compact())]) + .collect(), + bytes: MAX_RESPONSE_BYTES, + }); + self.sender.send(request).await.ok()?; + let mut receiver = self.receiver.lock().await; + let nodes = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { + loop { + match receiver.recv().await { + Some(RLPxMessage::TrieNodes(TrieNodes { id, nodes })) if id == request_id => { + return Some(nodes) + } + // Ignore replies that don't match the expected id (such as late responses) + Some(_) => continue, + None => return None, + } + } + }) + .await + .ok()??; + (!nodes.is_empty() && nodes.len() <= expected_nodes) + .then(|| { + nodes + .iter() + .map(|node| Node::decode_raw(node)) + .collect::, _>>() + .ok() + }) + .flatten() + } + + /// Requests storage trie nodes given the root of the state trie where they are contained and + /// a hashmap mapping the path to the account in the state trie (aka hashed address) to the paths to the nodes in its storage trie (can be full or partial) + /// Returns the nodes or None if: + /// - There are no available peers (the node just started up or was rejected by all other nodes) + /// - The response timed out + /// - The response was empty or not valid + pub async fn request_storage_trienodes( + &self, + state_root: H256, + paths: BTreeMap>, + ) -> Option> { + let request_id = rand::random(); + let expected_nodes = paths.iter().fold(0, |acc, item| acc + item.1.len()); + let request = RLPxMessage::GetTrieNodes(GetTrieNodes { + id: request_id, + root_hash: state_root, + // {acc_path: [path, path, ...]} -> [[acc_path, path, path, ...]] + paths: paths + .into_iter() + .map(|(acc_path, paths)| { + [ + vec![Bytes::from(acc_path.0.to_vec())], + paths + .into_iter() + .map(|path| Bytes::from(path.encode_compact())) + .collect(), + ] + .concat() + }) + .collect(), + bytes: MAX_RESPONSE_BYTES, + }); + self.sender.send(request).await.ok()?; + let mut receiver = self.receiver.lock().await; + let nodes = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { + loop { + match receiver.recv().await { + Some(RLPxMessage::TrieNodes(TrieNodes { id, nodes })) if id == request_id => { + return Some(nodes) + } + // Ignore replies that don't match the expected id (such as late responses) + Some(_) => continue, + None => return None, + } + } + }) + .await + .ok()??; + (!nodes.is_empty() && nodes.len() <= expected_nodes) + .then(|| { + nodes + .iter() + .map(|node| Node::decode_raw(node)) + .collect::, _>>() + .ok() + }) + .flatten() + } } diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 8efc7cb4e..4c415ef37 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -1,13 +1,12 @@ -use std::sync::Arc; - use ethrex_blockchain::error::ChainError; use ethrex_core::{ - types::{Block, BlockHash, BlockHeader, EMPTY_KECCACK_HASH}, + types::{AccountState, Block, BlockBody, BlockHash, BlockHeader, EMPTY_KECCACK_HASH}, H256, }; -use ethrex_rlp::encode::RLPEncode; +use ethrex_rlp::{decode::RLPDecode, encode::RLPEncode, error::RLPDecodeError}; use ethrex_storage::{error::StoreError, Store}; -use ethrex_trie::EMPTY_TRIE_HASH; +use ethrex_trie::{Nibbles, Node, TrieError, TrieState, EMPTY_TRIE_HASH}; +use std::{collections::BTreeMap, sync::Arc}; use tokio::{ sync::{ mpsc::{self, error::SendError, Receiver, Sender}, @@ -19,6 +18,12 @@ use tracing::{debug, info, warn}; use crate::kademlia::KademliaTable; +/// Maximum amount of times we will ask a peer for an account/storage range +/// If the max amount of retries is exceeded we will asume that the state we are requesting is old and no longer available +const MAX_RETRIES: usize = 10; +/// The minimum amount of blocks from the head that we want to full sync during a snap sync +const MIN_FULL_BLOCKS: usize = 64; + #[derive(Debug)] pub enum SyncMode { Full, @@ -31,11 +36,19 @@ pub enum SyncMode { pub struct SyncManager { sync_mode: SyncMode, peers: Arc>, + /// The last block number used as a pivot for snap-sync + /// Syncing beyond this pivot should re-enable snap-sync (as we will not have that state stored) + /// TODO: Reorgs + last_snap_pivot: u64, } impl SyncManager { pub fn new(peers: Arc>, sync_mode: SyncMode) -> Self { - Self { sync_mode, peers } + Self { + sync_mode, + peers, + last_snap_pivot: 0, + } } /// Creates a dummy SyncManager for tests where syncing is not needed @@ -45,6 +58,7 @@ impl SyncManager { Self { sync_mode: SyncMode::Full, peers: dummy_peer_table, + last_snap_pivot: 0, } } @@ -114,48 +128,61 @@ impl SyncManager { match self.sync_mode { SyncMode::Snap => { // snap-sync: launch tasks to fetch blocks and state in parallel - // - Fetch each block's state via snap p2p requests - // - Fetch each blocks and its receipts via eth p2p requests - // TODO: We are currently testing against our implementation that doesn't hold an independant snapshot and can provide all historic state - // We should fetch all available state and then resort to state healing to fetch the rest - let (bytecode_sender, bytecode_receiver) = mpsc::channel::>(500); - let mut set = tokio::task::JoinSet::new(); - set.spawn(bytecode_fetcher( - bytecode_receiver, - self.peers.clone(), - store.clone(), - )); - set.spawn(fetch_blocks_and_receipts( + // - Fetch each block's body and its receipt via eth p2p requests + // - Fetch the pivot block's state via snap p2p requests + // - Execute blocks after the pivote (like in full-sync) + let fetch_bodies_handle = tokio::spawn(fetch_block_bodies( all_block_hashes.clone(), self.peers.clone(), - store.clone(), )); - let state_roots = all_block_headers - .iter() - .map(|header| header.state_root) - .collect::>(); - set.spawn(fetch_snap_state( - bytecode_sender, - state_roots.clone(), + let mut pivot_idx = if all_block_headers.len() > MIN_FULL_BLOCKS { + all_block_headers.len() - MIN_FULL_BLOCKS + } else { + all_block_headers.len() - 1 + }; + let mut pivot_root = all_block_headers[pivot_idx].state_root; + let mut pivot_number = all_block_headers[pivot_idx].number; + + let mut stale_pivot = + !rebuild_state_trie(pivot_root, self.peers.clone(), store.clone()).await?; + // If the pivot became stale, set a further pivot and try again + if stale_pivot && pivot_idx != all_block_headers.len() - 1 { + warn!("Stale pivot, switching to newer head"); + pivot_idx = all_block_headers.len() - 1; + pivot_root = all_block_headers[pivot_idx].state_root; + pivot_number = all_block_headers[pivot_idx].number; + stale_pivot = + !rebuild_state_trie(pivot_root, self.peers.clone(), store.clone()).await?; + } + if stale_pivot { + warn!("Stale pivot, aborting sync"); + return Ok(()); + } + // Wait for all bodies to be downloaded + let all_block_bodies = fetch_bodies_handle.await??; + // For all blocks before the pivot: Store the bodies and fetch the receipts + // For all blocks after the pivot: Process them fully + let store_receipts_handle = tokio::spawn(store_receipts( + all_block_hashes[pivot_idx..].to_vec(), self.peers.clone(), store.clone(), )); - // Store headers - let mut latest_block_number = 0; - for (header, hash) in all_block_headers - .into_iter() - .zip(all_block_hashes.into_iter()) - { - latest_block_number = header.number; - store.set_canonical_block(header.number, hash)?; - store.add_block_header(hash, header)?; - } - // If all processes failed then they are likely to have a common cause (such as unaccessible storage), so return the first error - for result in set.join_all().await { - result?; + for (hash, (header, body)) in all_block_hashes.into_iter().zip( + all_block_headers + .into_iter() + .zip(all_block_bodies.into_iter()), + ) { + if header.number <= pivot_number { + store.set_canonical_block(header.number, hash)?; + store.add_block(Block::new(header, body))?; + } else { + store.set_canonical_block(header.number, hash)?; + store.update_latest_block_number(header.number)?; + ethrex_blockchain::add_block(&Block::new(header, body), &store)?; + } } - // Set latest block number here to avoid reading state that is currently being synced - store.update_latest_block_number(latest_block_number)?; + store_receipts_handle.await??; + self.last_snap_pivot = pivot_number; } SyncMode::Full => { // full-sync: Fetch all block bodies and execute them sequentially to build the state @@ -210,71 +237,69 @@ async fn download_and_run_blocks( Ok(()) } -async fn fetch_blocks_and_receipts( +/// Fetches all block bodies for the given block hashes via p2p and returns them +async fn fetch_block_bodies( mut block_hashes: Vec, peers: Arc>, - store: Store, -) -> Result<(), SyncError> { - // Snap state fetching will take much longer than this so we don't need to paralelize fetching blocks and receipts - // Fetch Block Bodies +) -> Result, SyncError> { + let mut all_block_bodies = Vec::new(); loop { let peer = peers.lock().await.get_peer_channels().await; debug!("Requesting Block Headers "); if let Some(block_bodies) = peer.request_block_bodies(block_hashes.clone()).await { debug!(" Received {} Block Bodies", block_bodies.len()); // Track which bodies we have already fetched - let (fetched_hashes, remaining_hashes) = block_hashes.split_at(block_bodies.len()); - // Store Block Bodies - for (hash, body) in fetched_hashes.iter().zip(block_bodies.into_iter()) { - store.add_block_body(*hash, body)? - } - + block_hashes.drain(0..block_bodies.len()); + all_block_bodies.extend(block_bodies); // Check if we need to ask for another batch - if remaining_hashes.is_empty() { + if block_hashes.is_empty() { break; - } else { - block_hashes = remaining_hashes.to_vec(); } } } - // TODO: Fetch Receipts and store them - Ok(()) + Ok(all_block_bodies) } -async fn fetch_snap_state( - bytecode_sender: Sender>, - state_roots: Vec, +/// Fetches all receipts for the given block hashes via p2p and stores them +async fn store_receipts( + mut block_hashes: Vec, peers: Arc>, store: Store, ) -> Result<(), SyncError> { - debug!("Syncing state roots: {}", state_roots.len()); - // Fetch newer state first: This will be useful to detect where to switch to healing - for state_root in state_roots.into_iter().rev() { - // TODO: maybe spawn taks here instead of awaiting - rebuild_state_trie( - bytecode_sender.clone(), - state_root, - peers.clone(), - store.clone(), - ) - .await? + loop { + let peer = peers.lock().await.get_peer_channels().await; + debug!("Requesting Block Headers "); + if let Some(receipts) = peer.request_receipts(block_hashes.clone()).await { + debug!(" Received {} Receipts", receipts.len()); + // Track which blocks we have already fetched receipts for + for (block_hash, receipts) in block_hashes.drain(0..receipts.len()).zip(receipts) { + store.add_receipts(block_hash, receipts)?; + } + // Check if we need to ask for another batch + if block_hashes.is_empty() { + break; + } + } } - // We finished syncing the available state, lets make the fetcher processes aware - // Send empty batches to signal that no more batches are incoming - bytecode_sender.send(vec![]).await?; Ok(()) } -/// Rebuilds a Block's state trie by requesting snap state from peers +/// Rebuilds a Block's state trie by requesting snap state from peers, also performs state healing +/// Returns true if all state was fetched or false if the block is too old and the state is no longer available async fn rebuild_state_trie( - bytecode_sender: Sender>, state_root: H256, peers: Arc>, store: Store, -) -> Result<(), SyncError> { - // Spawn a storage fetcher for this blocks's storage +) -> Result { + // Spawn storage & bytecode fetchers + let (bytecode_sender, bytecode_receiver) = mpsc::channel::>(500); let (storage_sender, storage_receiver) = mpsc::channel::>(500); - let storage_fetcher_handler = tokio::spawn(storage_fetcher( + let bytecode_fetcher_handle = tokio::spawn(bytecode_fetcher( + bytecode_receiver, + peers.clone(), + store.clone(), + )); + let storage_fetcher_handle = tokio::spawn(storage_fetcher( storage_receiver, peers.clone(), store.clone(), @@ -285,7 +310,8 @@ async fn rebuild_state_trie( // We cannot keep an open trie here so we will track the root between lookups let mut current_state_root = *EMPTY_TRIE_HASH; // Fetch Account Ranges - loop { + // If we reached the maximum amount of retries then it means the state we are requesting is probably old and no longer available + for _ in 0..MAX_RETRIES { let peer = peers.clone().lock().await.get_peer_channels().await; debug!("Requesting Account Range for state root {state_root}, starting hash: {start_account_hash}"); if let Some((account_hashes, accounts, should_continue)) = peer @@ -308,10 +334,10 @@ async fn rebuild_state_trie( code_hashes.push(account.code_hash) } // Build the batch of hashes and roots to send to the storage fetcher - // Ignore accounts without storage - // TODO: We could also check if the account's storage root is already part of the trie - // Aka, if the account was not changed shouldn't fetch the state we already have - if account.storage_root != *EMPTY_TRIE_HASH { + // Ignore accounts without storage and account's which storage hasn't changed from our current stored state + if account.storage_root != *EMPTY_TRIE_HASH + && !store.contains_storage_node(*account_hash, account.storage_root)? + { account_hashes_and_storage_roots.push((*account_hash, account.storage_root)); } } @@ -328,10 +354,9 @@ async fn rebuild_state_trie( // Update trie let mut trie = store.open_state_trie(current_state_root); for (account_hash, account) in account_hashes.iter().zip(accounts.iter()) { - trie.insert(account_hash.0.to_vec(), account.encode_to_vec()) - .map_err(StoreError::Trie)?; + trie.insert(account_hash.0.to_vec(), account.encode_to_vec())?; } - current_state_root = trie.hash().map_err(StoreError::Trie)?; + current_state_root = trie.hash()?; if !should_continue { // All accounts fetched! @@ -339,16 +364,21 @@ async fn rebuild_state_trie( } } } - if current_state_root != state_root { - warn!("State sync failed for state root {state_root}"); - } // Send empty batch to signal that no more batches are incoming storage_sender.send(vec![]).await?; - storage_fetcher_handler - .await - .map_err(|_| StoreError::Custom(String::from("Failed to join storage_fetcher task")))??; - debug!("Completed state sync for state root {state_root}"); - Ok(()) + storage_fetcher_handle.await??; + let sync_complete = if current_state_root == state_root { + debug!("Completed state sync for state root {state_root}"); + true + } else { + // Perform state healing to fix any potential inconsistency in the rebuilt tries + // As we are not fetching different chunks of the same trie this step is not necessary + heal_state_trie(bytecode_sender.clone(), state_root, store, peers).await? + }; + // Send empty batch to signal that no more batches are incoming + bytecode_sender.send(vec![]).await?; + bytecode_fetcher_handle.await??; + Ok(sync_complete) } /// Waits for incoming code hashes from the receiver channel endpoint, queues them, and fetches and stores their bytecodes in batches @@ -358,34 +388,27 @@ async fn bytecode_fetcher( store: Store, ) -> Result<(), SyncError> { const BATCH_SIZE: usize = 200; - // Pending list of bytecodes to fetch let mut pending_bytecodes: Vec = vec![]; - loop { + let mut incoming = true; + while incoming { + // Fetch incoming requests match receiver.recv().await { Some(code_hashes) if !code_hashes.is_empty() => { - // Add hashes to the queue pending_bytecodes.extend(code_hashes); - // If we have enought pending bytecodes to fill a batch, spawn a fetch process - while pending_bytecodes.len() >= BATCH_SIZE { - let next_batch = pending_bytecodes.drain(..BATCH_SIZE).collect::>(); - let remaining = - fetch_bytecode_batch(next_batch, peers.clone(), store.clone()).await?; - // Add unfeched bytecodes back to the queue - pending_bytecodes.extend(remaining); - } } // Disconnect / Empty message signaling no more bytecodes to sync - _ => break, + _ => incoming = false, + } + // If we have enough pending bytecodes to fill a batch + // or if we have no more incoming batches, spawn a fetch process + while pending_bytecodes.len() >= BATCH_SIZE || !incoming && !pending_bytecodes.is_empty() { + let next_batch = pending_bytecodes + .drain(..BATCH_SIZE.min(pending_bytecodes.len())) + .collect::>(); + let remaining = fetch_bytecode_batch(next_batch, peers.clone(), store.clone()).await?; + // Add unfeched bytecodes back to the queue + pending_bytecodes.extend(remaining); } - } - // We have no more incoming requests, process the remaining batches - while !pending_bytecodes.is_empty() { - let next_batch = pending_bytecodes - .drain(..BATCH_SIZE.min(pending_bytecodes.len())) - .collect::>(); - let remaining = fetch_bytecode_batch(next_batch, peers.clone(), store.clone()).await?; - // Add unfeched bytecodes back to the queue - pending_bytecodes.extend(remaining); } Ok(()) } @@ -418,38 +441,31 @@ async fn storage_fetcher( state_root: H256, ) -> Result<(), StoreError> { const BATCH_SIZE: usize = 100; - // Pending list of bytecodes to fetch + // Pending list of storages to fetch let mut pending_storage: Vec<(H256, H256)> = vec![]; // TODO: Also add a queue for storages that were incompletely fecthed, // but for the first iteration we will asume not fully fetched -> fetch again - loop { + let mut incoming = true; + while incoming { + // Fetch incoming requests match receiver.recv().await { - Some(account_and_root) if !account_and_root.is_empty() => { - // Add hashes to the queue - pending_storage.extend(account_and_root); - // If we have enought pending bytecodes to fill a batch, spawn a fetch process - while pending_storage.len() >= BATCH_SIZE { - let next_batch = pending_storage.drain(..BATCH_SIZE).collect::>(); - let remaining = - fetch_storage_batch(next_batch, state_root, peers.clone(), store.clone()) - .await?; - // Add unfeched bytecodes back to the queue - pending_storage.extend(remaining); - } + Some(account_hashes_and_roots) if !account_hashes_and_roots.is_empty() => { + pending_storage.extend(account_hashes_and_roots); } // Disconnect / Empty message signaling no more bytecodes to sync - _ => break, + _ => incoming = false, + } + // If we have enough pending bytecodes to fill a batch + // or if we have no more incoming batches, spawn a fetch process + while pending_storage.len() >= BATCH_SIZE || !incoming && !pending_storage.is_empty() { + let next_batch = pending_storage + .drain(..BATCH_SIZE.min(pending_storage.len())) + .collect::>(); + let remaining = + fetch_storage_batch(next_batch, state_root, peers.clone(), store.clone()).await?; + // Add unfeched bytecodes back to the queue + pending_storage.extend(remaining); } - } - // We have no more incoming requests, process the remaining batches - while !pending_storage.is_empty() { - let next_batch = pending_storage - .drain(..BATCH_SIZE.min(pending_storage.len())) - .collect::>(); - let remaining = - fetch_storage_batch(next_batch, state_root, peers.clone(), store.clone()).await?; - // Add unfeched bytecodes back to the queue - pending_storage.extend(remaining); } Ok(()) } @@ -461,7 +477,7 @@ async fn fetch_storage_batch( peers: Arc>, store: Store, ) -> Result, StoreError> { - loop { + for _ in 0..MAX_RETRIES { let peer = peers.lock().await.get_peer_channels().await; let (batch_hahses, batch_roots) = batch.clone().into_iter().unzip(); if let Some((mut keys, mut values, incomplete)) = peer @@ -492,6 +508,205 @@ async fn fetch_storage_batch( return Ok(batch); } } + // This is a corner case where we fetched an account range for a block but the chain has moved on and the block + // was dropped by the peer's snapshot. We will keep the fetcher alive to avoid errors and stop fetching as from the next account + Ok(vec![]) +} + +/// Heals the trie given its state_root by fetching any missing nodes in it via p2p +async fn heal_state_trie( + bytecode_sender: Sender>, + state_root: H256, + store: Store, + peers: Arc>, +) -> Result { + // Spawn a storage healer for this blocks's storage + let (storage_sender, storage_receiver) = mpsc::channel::>(500); + let storage_healer_handler = tokio::spawn(storage_healer( + state_root, + storage_receiver, + peers.clone(), + store.clone(), + )); + // Begin by requesting the root node + let mut paths = vec![Nibbles::default()]; + // Count the number of request retries so we don't get stuck requesting old state + let mut retry_count = 0; + while !paths.is_empty() && retry_count < MAX_RETRIES { + let peer = peers.lock().await.get_peer_channels().await; + if let Some(nodes) = peer + .request_state_trienodes(state_root, paths.clone()) + .await + { + // Reset retry counter for next request + retry_count = 0; + let mut hahsed_addresses = vec![]; + let mut code_hashes = vec![]; + // For each fetched node: + // - Add its children to the queue (if we don't have them already) + // - If it is a leaf, request its bytecode & storage + // - Add it to the trie's state + for node in nodes { + let path = paths.remove(0); + // We cannot keep the trie state open + let mut trie = store.open_state_trie(*EMPTY_TRIE_HASH); + let trie_state = trie.state_mut(); + paths.extend(node_missing_children(&node, &path, trie_state)?); + if let Node::Leaf(node) = &node { + // Fetch bytecode & storage + let account = AccountState::decode(&node.value)?; + // By now we should have the full path = account hash + let path = &path.concat(node.partial.clone()).to_bytes(); + if path.len() != 32 { + // Something went wrong + return Err(SyncError::CorruptPath); + } + let account_hash = H256::from_slice(path); + if account.storage_root != *EMPTY_TRIE_HASH + && !store.contains_storage_node(account_hash, account.storage_root)? + { + hahsed_addresses.push(account_hash); + } + if account.code_hash != *EMPTY_KECCACK_HASH + && store.get_account_code(account.code_hash)?.is_none() + { + code_hashes.push(account.code_hash); + } + } + let hash = node.compute_hash(); + trie_state.write_node(node, hash)?; + } + // Send storage & bytecode requests + if !hahsed_addresses.is_empty() { + storage_sender.send(hahsed_addresses).await?; + } + if !code_hashes.is_empty() { + bytecode_sender.send(code_hashes).await?; + } + } else { + retry_count += 1; + } + } + // Send empty batch to signal that no more batches are incoming + storage_sender.send(vec![]).await?; + storage_healer_handler.await??; + Ok(retry_count < MAX_RETRIES) +} + +/// Waits for incoming hashed addresses from the receiver channel endpoint and queues the associated root nodes for state retrieval +/// Also retrieves their children nodes until we have the full storage trie stored +async fn storage_healer( + state_root: H256, + mut receiver: Receiver>, + peers: Arc>, + store: Store, +) -> Result<(), SyncError> { + const BATCH_SIZE: usize = 200; + // Pending list of bytecodes to fetch + let mut pending_storages: Vec<(H256, Nibbles)> = vec![]; + let mut incoming = true; + while incoming { + // Fetch incoming requests + match receiver.recv().await { + Some(account_paths) if !account_paths.is_empty() => { + // Add the root paths of each account trie to the queue + pending_storages.extend( + account_paths + .into_iter() + .map(|acc_path| (acc_path, Nibbles::default())), + ); + } + // Disconnect / Empty message signaling no more bytecodes to sync + _ => incoming = false, + } + // If we have enough pending storages to fill a batch + // or if we have no more incoming batches, spawn a fetch process + while pending_storages.len() >= BATCH_SIZE || !incoming && !pending_storages.is_empty() { + let mut next_batch: BTreeMap> = BTreeMap::new(); + // Group pending storages by account path + // We do this here instead of keeping them sorted so we don't prioritize further nodes from the first tries + for (account, path) in pending_storages.drain(..BATCH_SIZE.min(pending_storages.len())) + { + next_batch.entry(account).or_default().push(path); + } + let return_batch = + heal_storage_batch(state_root, next_batch, peers.clone(), store.clone()).await?; + for (acc_path, paths) in return_batch { + for path in paths { + pending_storages.push((acc_path, path)); + } + } + } + } + Ok(()) +} + +/// Receives a set of storage trie paths (grouped by their corresponding account's state trie path), +/// fetches their respective nodes, stores them, and returns their children paths and the paths that couldn't be fetched so they can be returned to the queue +async fn heal_storage_batch( + state_root: H256, + mut batch: BTreeMap>, + peers: Arc>, + store: Store, +) -> Result>, SyncError> { + for _ in 0..MAX_RETRIES { + let peer = peers.lock().await.get_peer_channels().await; + if let Some(mut nodes) = peer + .request_storage_trienodes(state_root, batch.clone()) + .await + { + debug!("Received {} nodes", nodes.len()); + // Process the nodes for each account path + for (acc_path, paths) in batch.iter_mut() { + let mut trie = store.open_storage_trie(*acc_path, *EMPTY_TRIE_HASH); + let trie_state = trie.state_mut(); + // Get the corresponding nodes + for node in nodes.drain(..paths.len().min(nodes.len())) { + let path = paths.remove(0); + // Add children to batch + let children = node_missing_children(&node, &path, trie_state)?; + paths.extend(children); + // Add node to the state + let hash = node.compute_hash(); + trie_state.write_node(node, hash)?; + } + // Cut the loop if we ran out of nodes + if nodes.is_empty() { + break; + } + } + // Return remaining and added paths to be added to the queue + return Ok(batch); + } + } + // This is a corner case where we fetched an account range for a block but the chain has moved on and the block + // was dropped by the peer's snapshot. We will keep the fetcher alive to avoid errors and stop fetching as from the next account + Ok(BTreeMap::new()) +} + +/// Returns the partial paths to the node's children if they are not already part of the trie state +fn node_missing_children( + node: &Node, + parent_path: &Nibbles, + trie_state: &TrieState, +) -> Result, TrieError> { + let mut paths = Vec::new(); + match &node { + Node::Branch(node) => { + for (index, child) in node.choices.iter().enumerate() { + if child.is_valid() && trie_state.get_node(child.clone())?.is_none() { + paths.push(parent_path.append_new(index as u8)); + } + } + } + Node::Extension(node) => { + if node.child.is_valid() && trie_state.get_node(node.child.clone())?.is_none() { + paths.push(parent_path.concat(node.prefix.clone())); + } + } + _ => {} + } + Ok(paths) } #[derive(thiserror::Error, Debug)] @@ -501,7 +716,15 @@ enum SyncError { #[error(transparent)] Store(#[from] StoreError), #[error(transparent)] - SendBytecode(#[from] SendError>), + SendHashes(#[from] SendError>), #[error(transparent)] SendStorage(#[from] SendError>), + #[error(transparent)] + Trie(#[from] TrieError), + #[error(transparent)] + Rlp(#[from] RLPDecodeError), + #[error("Corrupt path during state healing")] + CorruptPath, + #[error(transparent)] + JoinHandle(#[from] tokio::task::JoinError), } diff --git a/crates/storage/store/storage.rs b/crates/storage/store/storage.rs index 695ffddf0..c03228885 100644 --- a/crates/storage/store/storage.rs +++ b/crates/storage/store/storage.rs @@ -942,11 +942,20 @@ impl Store { .update_payload(payload_id, block, block_value, blobs_bundle, completed) } + pub fn get_receipts_for_block( + &self, + block_hash: &BlockHash, + ) -> Result, StoreError> { + self.engine.get_receipts_for_block(block_hash) + } + /// Creates a new state trie with an empty state root, for testing purposes only pub fn new_state_trie_for_test(&self) -> Trie { self.engine.open_state_trie(*EMPTY_TRIE_HASH) } + /// Methods exclusive for trie management during snap-syncing + // Obtain a state trie from the given state root // Doesn't check if the state root is valid pub fn open_state_trie(&self, state_root: H256) -> Trie { @@ -959,11 +968,28 @@ impl Store { self.engine.open_storage_trie(account_hash, storage_root) } - pub fn get_receipts_for_block( + /// Returns true if the given node is part of the state trie's internal storage + pub fn contains_state_node(&self, node_hash: H256) -> Result { + // Root is irrelevant, we only care about the internal state + Ok(self + .open_state_trie(*EMPTY_TRIE_HASH) + .state() + .get_node(node_hash.into())? + .is_some()) + } + + /// Returns true if the given node is part of the given storage trie's internal storage + pub fn contains_storage_node( &self, - block_hash: &BlockHash, - ) -> Result, StoreError> { - self.engine.get_receipts_for_block(block_hash) + hashed_address: H256, + node_hash: H256, + ) -> Result { + // Root is irrelevant, we only care about the internal state + Ok(self + .open_storage_trie(hashed_address, *EMPTY_TRIE_HASH) + .state() + .get_node(node_hash.into())? + .is_some()) } } diff --git a/crates/storage/trie/nibbles.rs b/crates/storage/trie/nibbles.rs index 6b7a91d3c..0eb2c58ab 100644 --- a/crates/storage/trie/nibbles.rs +++ b/crates/storage/trie/nibbles.rs @@ -8,7 +8,7 @@ use ethrex_rlp::{ }; /// Struct representing a list of nibbles (half-bytes) -#[derive(Debug, Clone, Default, PartialEq)] +#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)] pub struct Nibbles { data: Vec, } @@ -76,6 +76,7 @@ impl Nibbles { } /// Removes and returns the first nibble + #[allow(clippy::should_implement_trait)] pub fn next(&mut self) -> Option { (!self.is_empty()).then(|| self.data.remove(0)) } @@ -177,6 +178,20 @@ impl Nibbles { }) .collect::>() } + + /// Concatenates self and another Nibbles returning a new Nibbles + pub fn concat(&self, other: Nibbles) -> Nibbles { + Nibbles { + data: [self.data.clone(), other.data].concat(), + } + } + + /// Returns a copy of self with the nibble added at the and + pub fn append_new(&self, nibble: u8) -> Nibbles { + Nibbles { + data: [self.data.clone(), vec![nibble]].concat(), + } + } } impl AsRef<[u8]> for Nibbles { diff --git a/crates/storage/trie/node.rs b/crates/storage/trie/node.rs index 92c634a46..d6bf0b4a5 100644 --- a/crates/storage/trie/node.rs +++ b/crates/storage/trie/node.rs @@ -165,6 +165,15 @@ impl Node { } }) } + + /// Computes the node's hash + pub fn compute_hash(&self) -> NodeHash { + match self { + Node::Branch(n) => n.compute_hash(), + Node::Extension(n) => n.compute_hash(), + Node::Leaf(n) => n.compute_hash(), + } + } } fn decode_child(rlp: &[u8]) -> NodeHash { diff --git a/crates/storage/trie/state.rs b/crates/storage/trie/state.rs index 77fe4710e..bc849de63 100644 --- a/crates/storage/trie/state.rs +++ b/crates/storage/trie/state.rs @@ -91,4 +91,13 @@ impl TrieState { Ok(()) } + + /// Writes a node directly to the DB bypassing the cache + pub fn write_node(&mut self, node: Node, hash: NodeHash) -> Result<(), TrieError> { + // Don't insert the node if it is already inlined on the parent + if matches!(hash, NodeHash::Hashed(_)) { + self.db.put(hash.into(), node.encode_to_vec())?; + } + Ok(()) + } } diff --git a/crates/storage/trie/trie.rs b/crates/storage/trie/trie.rs index 067d75d01..ee61b639a 100644 --- a/crates/storage/trie/trie.rs +++ b/crates/storage/trie/trie.rs @@ -11,8 +11,6 @@ mod trie_iter; mod verify_range; use ethereum_types::H256; use ethrex_rlp::constants::RLP_NULL; -use nibbles::Nibbles; -use node::Node; use node_hash::NodeHash; use sha3::{Digest, Keccak256}; use std::collections::HashSet; @@ -21,10 +19,12 @@ use std::collections::HashSet; pub use self::db::{libmdbx::LibmdbxTrieDB, libmdbx_dupsort::LibmdbxDupsortTrieDB}; pub use self::db::{in_memory::InMemoryTrieDB, TrieDB}; +pub use self::nibbles::Nibbles; pub use self::verify_range::verify_range; +pub use self::{node::Node, state::TrieState}; pub use self::error::TrieError; -use self::{node::LeafNode, state::TrieState, trie_iter::TrieIterator}; +use self::{node::LeafNode, trie_iter::TrieIterator}; use lazy_static::lazy_static; @@ -320,6 +320,18 @@ impl Trie { } } + /// Returns a mutable reference to the trie's internal node state + /// [WARNING] This will allow directly manipulating the trie's state and + /// may lead to inconsistent trie structures if not used resposibly + pub fn state_mut(&mut self) -> &mut TrieState { + &mut self.state + } + + /// Returns a reference to the trie's internal node state + pub fn state(&mut self) -> &TrieState { + &self.state + } + #[cfg(all(test, feature = "libmdbx"))] /// Creates a new Trie based on a temporary Libmdbx DB fn new_temp() -> Self {