diff --git a/crates/networking/p2p/peer_channels.rs b/crates/networking/p2p/peer_channels.rs index 4a722805e..2f6ae9717 100644 --- a/crates/networking/p2p/peer_channels.rs +++ b/crates/networking/p2p/peer_channels.rs @@ -1,4 +1,4 @@ -use std::{sync::Arc, time::Duration}; +use std::{collections::BTreeMap, sync::Arc, time::Duration}; use bytes::Bytes; use ethrex_core::{ @@ -6,7 +6,8 @@ use ethrex_core::{ H256, U256, }; use ethrex_rlp::encode::RLPEncode; -use ethrex_trie::verify_range; +use ethrex_trie::Nibbles; +use ethrex_trie::{verify_range, Node}; use tokio::sync::{mpsc, Mutex}; use crate::{ @@ -15,7 +16,8 @@ use crate::{ BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, BLOCK_HEADER_LIMIT, }, snap::{ - AccountRange, ByteCodes, GetAccountRange, GetByteCodes, GetStorageRanges, StorageRanges, + AccountRange, ByteCodes, GetAccountRange, GetByteCodes, GetStorageRanges, GetTrieNodes, + StorageRanges, TrieNodes, }, }, snap::encodable_to_proof, @@ -318,4 +320,112 @@ impl PeerChannels { } Some((storage_keys, storage_values, should_continue)) } + + /// Requests state trie nodes given the root of the trie where they are contained and their path (be them full or partial) + /// Returns the nodes or None if: + /// - There are no available peers (the node just started up or was rejected by all other nodes) + /// - The response timed out + /// - The response was empty or not valid + pub async fn request_state_trienodes( + &self, + state_root: H256, + paths: Vec, + ) -> Option> { + let request_id = rand::random(); + let expected_nodes = paths.len(); + let request = RLPxMessage::GetTrieNodes(GetTrieNodes { + id: request_id, + root_hash: state_root, + // [acc_path, acc_path,...] -> [[acc_path], [acc_path]] + paths: paths + .into_iter() + .map(|vec| vec![Bytes::from(vec.encode_compact())]) + .collect(), + bytes: MAX_RESPONSE_BYTES, + }); + self.sender.send(request).await.ok()?; + let mut receiver = self.receiver.lock().await; + let nodes = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { + loop { + match receiver.recv().await { + Some(RLPxMessage::TrieNodes(TrieNodes { id, nodes })) if id == request_id => { + return Some(nodes) + } + // Ignore replies that don't match the expected id (such as late responses) + Some(_) => continue, + None => return None, + } + } + }) + .await + .ok()??; + (!nodes.is_empty() && nodes.len() <= expected_nodes) + .then(|| { + nodes + .iter() + .map(|node| Node::decode_raw(node)) + .collect::, _>>() + .ok() + }) + .flatten() + } + + /// Requests storage trie nodes given the root of the state trie where they are contained and + /// a hashmap mapping the path to the account in the state trie (aka hashed address) to the paths to the nodes in its storage trie (can be full or partial) + /// Returns the nodes or None if: + /// - There are no available peers (the node just started up or was rejected by all other nodes) + /// - The response timed out + /// - The response was empty or not valid + pub async fn request_storage_trienodes( + &self, + state_root: H256, + paths: BTreeMap>, + ) -> Option> { + let request_id = rand::random(); + let expected_nodes = paths.iter().fold(0, |acc, item| acc + item.1.len()); + let request = RLPxMessage::GetTrieNodes(GetTrieNodes { + id: request_id, + root_hash: state_root, + // {acc_path: [path, path, ...]} -> [[acc_path, path, path, ...]] + paths: paths + .into_iter() + .map(|(acc_path, paths)| { + [ + vec![Bytes::from(acc_path.0.to_vec())], + paths + .into_iter() + .map(|path| Bytes::from(path.encode_compact())) + .collect(), + ] + .concat() + }) + .collect(), + bytes: MAX_RESPONSE_BYTES, + }); + self.sender.send(request).await.ok()?; + let mut receiver = self.receiver.lock().await; + let nodes = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { + loop { + match receiver.recv().await { + Some(RLPxMessage::TrieNodes(TrieNodes { id, nodes })) if id == request_id => { + return Some(nodes) + } + // Ignore replies that don't match the expected id (such as late responses) + Some(_) => continue, + None => return None, + } + } + }) + .await + .ok()??; + (!nodes.is_empty() && nodes.len() <= expected_nodes) + .then(|| { + nodes + .iter() + .map(|node| Node::decode_raw(node)) + .collect::, _>>() + .ok() + }) + .flatten() + } } diff --git a/crates/networking/p2p/rlpx/snap.rs b/crates/networking/p2p/rlpx/snap.rs index 8d56438a9..55b465f32 100644 --- a/crates/networking/p2p/rlpx/snap.rs +++ b/crates/networking/p2p/rlpx/snap.rs @@ -450,7 +450,7 @@ impl RLPDecode for StorageSlot { let decoder = Decoder::new(rlp)?; let (hash, decoder) = decoder.decode_field("hash")?; let (data, decoder) = decoder.get_encoded_item()?; - let data = U256::decode(ethrex_rlp::decode::decode_bytes(&data)?.0)?; + let data = U256::decode(ðrex_rlp::decode::decode_bytes(&data)?.0)?; Ok((Self { hash, data }, decoder.finish()?)) } } diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 8efc7cb4e..1b5838019 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -1,13 +1,12 @@ -use std::sync::Arc; - use ethrex_blockchain::error::ChainError; use ethrex_core::{ - types::{Block, BlockHash, BlockHeader, EMPTY_KECCACK_HASH}, + types::{AccountState, Block, BlockHash, BlockHeader, EMPTY_KECCACK_HASH}, H256, }; -use ethrex_rlp::encode::RLPEncode; +use ethrex_rlp::{decode::RLPDecode, encode::RLPEncode, error::RLPDecodeError}; use ethrex_storage::{error::StoreError, Store}; -use ethrex_trie::EMPTY_TRIE_HASH; +use ethrex_trie::{Nibbles, Node, TrieError, TrieState, EMPTY_TRIE_HASH}; +use std::{collections::BTreeMap, sync::Arc}; use tokio::{ sync::{ mpsc::{self, error::SendError, Receiver, Sender}, @@ -19,6 +18,10 @@ use tracing::{debug, info, warn}; use crate::kademlia::KademliaTable; +/// Maximum amount of times we will ask a peer for an account/storage range +/// If the max amount of retries is exceeded we will asume that the state we are requesting is old and no longer available +const MAX_RETRIES: usize = 10; + #[derive(Debug)] pub enum SyncMode { Full, @@ -116,16 +119,18 @@ impl SyncManager { // snap-sync: launch tasks to fetch blocks and state in parallel // - Fetch each block's state via snap p2p requests // - Fetch each blocks and its receipts via eth p2p requests - // TODO: We are currently testing against our implementation that doesn't hold an independant snapshot and can provide all historic state - // We should fetch all available state and then resort to state healing to fetch the rest let (bytecode_sender, bytecode_receiver) = mpsc::channel::>(500); - let mut set = tokio::task::JoinSet::new(); - set.spawn(bytecode_fetcher( + // First set of tasks will be in charge of fetching all available state (state not older than 128 blocks) + let mut sync_set = tokio::task::JoinSet::new(); + // Second state of tasks will be active during the healing phase where we fetch the state we weren't able to in the first phase + let mut healing_set = tokio::task::JoinSet::new(); + // We need the bytecode fetcher to be active during healing too + healing_set.spawn(bytecode_fetcher( bytecode_receiver, self.peers.clone(), store.clone(), )); - set.spawn(fetch_blocks_and_receipts( + sync_set.spawn(fetch_blocks_and_receipts( all_block_hashes.clone(), self.peers.clone(), store.clone(), @@ -134,8 +139,8 @@ impl SyncManager { .iter() .map(|header| header.state_root) .collect::>(); - set.spawn(fetch_snap_state( - bytecode_sender, + sync_set.spawn(fetch_snap_state( + bytecode_sender.clone(), state_roots.clone(), self.peers.clone(), store.clone(), @@ -151,9 +156,26 @@ impl SyncManager { store.add_block_header(hash, header)?; } // If all processes failed then they are likely to have a common cause (such as unaccessible storage), so return the first error - for result in set.join_all().await { + for result in sync_set.join_all().await { result?; } + // Start state healing + info!("Starting state healing"); + let start_time = Instant::now(); + healing_set.spawn(state_healing( + bytecode_sender, + state_roots.clone(), + store.clone(), + self.peers.clone(), + )); + for result in healing_set.join_all().await { + result?; + } + info!( + "State healing finished in {} seconds", + start_time.elapsed().as_secs() + ); + // Set latest block number here to avoid reading state that is currently being synced store.update_latest_block_number(latest_block_number)?; } @@ -251,27 +273,30 @@ async fn fetch_snap_state( // Fetch newer state first: This will be useful to detect where to switch to healing for state_root in state_roots.into_iter().rev() { // TODO: maybe spawn taks here instead of awaiting - rebuild_state_trie( + if !rebuild_state_trie( bytecode_sender.clone(), state_root, peers.clone(), store.clone(), ) .await? + { + // If we reached the maximum number of retries then the state we are fetching is most probably old and no longer part of our peer's snapshots + // We should give up on fetching this and older block's state and instead begin state healing + break; + } } - // We finished syncing the available state, lets make the fetcher processes aware - // Send empty batches to signal that no more batches are incoming - bytecode_sender.send(vec![]).await?; Ok(()) } /// Rebuilds a Block's state trie by requesting snap state from peers +/// Returns true if all state was fetched or false if the block is too old and the state is no longer available async fn rebuild_state_trie( bytecode_sender: Sender>, state_root: H256, peers: Arc>, store: Store, -) -> Result<(), SyncError> { +) -> Result { // Spawn a storage fetcher for this blocks's storage let (storage_sender, storage_receiver) = mpsc::channel::>(500); let storage_fetcher_handler = tokio::spawn(storage_fetcher( @@ -285,13 +310,18 @@ async fn rebuild_state_trie( // We cannot keep an open trie here so we will track the root between lookups let mut current_state_root = *EMPTY_TRIE_HASH; // Fetch Account Ranges - loop { + // If we reached the maximum amount of retries then it means the state we are requesting is probably old and no longer available + // In that case we will delegate the work to state healing + let mut retry_count = 0; + while retry_count < MAX_RETRIES { let peer = peers.clone().lock().await.get_peer_channels().await; debug!("Requesting Account Range for state root {state_root}, starting hash: {start_account_hash}"); if let Some((account_hashes, accounts, should_continue)) = peer .request_account_range(state_root, start_account_hash) .await { + // Reset retry counter for the following batch (if needed) + retry_count = 0; // Update starting hash for next batch if should_continue { start_account_hash = *account_hashes.last().unwrap(); @@ -308,10 +338,10 @@ async fn rebuild_state_trie( code_hashes.push(account.code_hash) } // Build the batch of hashes and roots to send to the storage fetcher - // Ignore accounts without storage - // TODO: We could also check if the account's storage root is already part of the trie - // Aka, if the account was not changed shouldn't fetch the state we already have - if account.storage_root != *EMPTY_TRIE_HASH { + // Ignore accounts without storage and account's which storage hasn't changed from our current stored state + if account.storage_root != *EMPTY_TRIE_HASH + && !store.contains_storage_node(*account_hash, account.storage_root)? + { account_hashes_and_storage_roots.push((*account_hash, account.storage_root)); } } @@ -337,18 +367,19 @@ async fn rebuild_state_trie( // All accounts fetched! break; } + } else { + retry_count += 1; } } - if current_state_root != state_root { - warn!("State sync failed for state root {state_root}"); + if current_state_root == state_root { + debug!("Completed state sync for state root {state_root}"); } // Send empty batch to signal that no more batches are incoming storage_sender.send(vec![]).await?; storage_fetcher_handler .await .map_err(|_| StoreError::Custom(String::from("Failed to join storage_fetcher task")))??; - debug!("Completed state sync for state root {state_root}"); - Ok(()) + Ok(retry_count == MAX_RETRIES) } /// Waits for incoming code hashes from the receiver channel endpoint, queues them, and fetches and stores their bytecodes in batches @@ -358,34 +389,27 @@ async fn bytecode_fetcher( store: Store, ) -> Result<(), SyncError> { const BATCH_SIZE: usize = 200; - // Pending list of bytecodes to fetch let mut pending_bytecodes: Vec = vec![]; - loop { + let mut incoming = true; + while incoming { + // Fetch incoming requests match receiver.recv().await { Some(code_hashes) if !code_hashes.is_empty() => { - // Add hashes to the queue pending_bytecodes.extend(code_hashes); - // If we have enought pending bytecodes to fill a batch, spawn a fetch process - while pending_bytecodes.len() >= BATCH_SIZE { - let next_batch = pending_bytecodes.drain(..BATCH_SIZE).collect::>(); - let remaining = - fetch_bytecode_batch(next_batch, peers.clone(), store.clone()).await?; - // Add unfeched bytecodes back to the queue - pending_bytecodes.extend(remaining); - } } // Disconnect / Empty message signaling no more bytecodes to sync - _ => break, + _ => incoming = false, + } + // If we have enough pending bytecodes to fill a batch + // or if we have no more incoming batches, spawn a fetch process + while pending_bytecodes.len() >= BATCH_SIZE || !incoming && !pending_bytecodes.is_empty() { + let next_batch = pending_bytecodes + .drain(..BATCH_SIZE.min(pending_bytecodes.len())) + .collect::>(); + let remaining = fetch_bytecode_batch(next_batch, peers.clone(), store.clone()).await?; + // Add unfeched bytecodes back to the queue + pending_bytecodes.extend(remaining); } - } - // We have no more incoming requests, process the remaining batches - while !pending_bytecodes.is_empty() { - let next_batch = pending_bytecodes - .drain(..BATCH_SIZE.min(pending_bytecodes.len())) - .collect::>(); - let remaining = fetch_bytecode_batch(next_batch, peers.clone(), store.clone()).await?; - // Add unfeched bytecodes back to the queue - pending_bytecodes.extend(remaining); } Ok(()) } @@ -418,38 +442,31 @@ async fn storage_fetcher( state_root: H256, ) -> Result<(), StoreError> { const BATCH_SIZE: usize = 100; - // Pending list of bytecodes to fetch + // Pending list of storages to fetch let mut pending_storage: Vec<(H256, H256)> = vec![]; // TODO: Also add a queue for storages that were incompletely fecthed, // but for the first iteration we will asume not fully fetched -> fetch again - loop { + let mut incoming = true; + while incoming { + // Fetch incoming requests match receiver.recv().await { - Some(account_and_root) if !account_and_root.is_empty() => { - // Add hashes to the queue - pending_storage.extend(account_and_root); - // If we have enought pending bytecodes to fill a batch, spawn a fetch process - while pending_storage.len() >= BATCH_SIZE { - let next_batch = pending_storage.drain(..BATCH_SIZE).collect::>(); - let remaining = - fetch_storage_batch(next_batch, state_root, peers.clone(), store.clone()) - .await?; - // Add unfeched bytecodes back to the queue - pending_storage.extend(remaining); - } + Some(account_hashes_and_roots) if !account_hashes_and_roots.is_empty() => { + pending_storage.extend(account_hashes_and_roots); } // Disconnect / Empty message signaling no more bytecodes to sync - _ => break, + _ => incoming = false, + } + // If we have enough pending bytecodes to fill a batch + // or if we have no more incoming batches, spawn a fetch process + while pending_storage.len() >= BATCH_SIZE || !incoming && !pending_storage.is_empty() { + let next_batch = pending_storage + .drain(..BATCH_SIZE.min(pending_storage.len())) + .collect::>(); + let remaining = + fetch_storage_batch(next_batch, state_root, peers.clone(), store.clone()).await?; + // Add unfeched bytecodes back to the queue + pending_storage.extend(remaining); } - } - // We have no more incoming requests, process the remaining batches - while !pending_storage.is_empty() { - let next_batch = pending_storage - .drain(..BATCH_SIZE.min(pending_storage.len())) - .collect::>(); - let remaining = - fetch_storage_batch(next_batch, state_root, peers.clone(), store.clone()).await?; - // Add unfeched bytecodes back to the queue - pending_storage.extend(remaining); } Ok(()) } @@ -461,7 +478,7 @@ async fn fetch_storage_batch( peers: Arc>, store: Store, ) -> Result, StoreError> { - loop { + for _ in 0..MAX_RETRIES { let peer = peers.lock().await.get_peer_channels().await; let (batch_hahses, batch_roots) = batch.clone().into_iter().unzip(); if let Some((mut keys, mut values, incomplete)) = peer @@ -492,6 +509,220 @@ async fn fetch_storage_batch( return Ok(batch); } } + // This is a corner case where we fetched an account range for a block but the chain has moved on and the block + // was dropped by the peer's snapshot. We will keep the fetcher alive to avoid errors and stop fetching as from the next block + Ok(vec![]) +} + +async fn state_healing( + bytecode_sender: Sender>, + state_roots: Vec, + store: Store, + peers: Arc>, +) -> Result<(), SyncError> { + for state_root in state_roots { + // If we don't have the root node stored then we must fetch it + if !store.contains_state_node(state_root)? { + heal_state_trie( + bytecode_sender.clone(), + state_root, + store.clone(), + peers.clone(), + ) + .await?; + } + } + // We finished both sync & healing, lets make the bytecode fetcher process aware + // Send empty batches to signal that no more batches are incoming + bytecode_sender.send(vec![]).await?; + Ok(()) +} + +async fn heal_state_trie( + bytecode_sender: Sender>, + state_root: H256, + store: Store, + peers: Arc>, +) -> Result<(), SyncError> { + // Spawn a storage healer for this blocks's storage + let (storage_sender, storage_receiver) = mpsc::channel::>(500); + let storage_healer_handler = tokio::spawn(storage_healer( + state_root, + storage_receiver, + peers.clone(), + store.clone(), + )); + // Begin by requesting the root node + let mut paths = vec![Nibbles::default()]; + while !paths.is_empty() { + let peer = peers.lock().await.get_peer_channels().await; + if let Some(nodes) = peer + .request_state_trienodes(state_root, paths.clone()) + .await + { + let mut hahsed_addresses = vec![]; + let mut code_hashes = vec![]; + // For each fetched node: + // - Add its children to the queue (if we don't have them already) + // - If it is a leaf, request its bytecode & storage + // - Add it to the trie's state + for node in nodes { + let path = paths.remove(0); + // We cannot keep the trie state open + let mut trie = store.open_state_trie(*EMPTY_TRIE_HASH); + let trie_state = trie.state_mut(); + paths.extend(node_missing_children(&node, &path, &trie_state)?); + if let Node::Leaf(node) = &node { + // Fetch bytecode & storage + let account = AccountState::decode(&node.value)?; + // By now we should have the full path = account hash + let path = &path.concat(node.partial.clone()).to_bytes(); + if path.len() != 32 { + // Something went wrong + return Err(SyncError::CorruptPath); + } + let account_hash = H256::from_slice(&path); + if account.storage_root != *EMPTY_TRIE_HASH + && !store.contains_storage_node(account_hash, account.storage_root)? + { + hahsed_addresses.push(account_hash); + } + if account.code_hash != *EMPTY_KECCACK_HASH + && store.get_account_code(account.code_hash)?.is_none() + { + code_hashes.push(account.code_hash); + } + } + let hash = node.compute_hash(); + trie_state.write_node(node, hash)?; + } + // Send storage & bytecode requests + if !hahsed_addresses.is_empty() { + storage_sender.send(hahsed_addresses).await?; + } + if !code_hashes.is_empty() { + bytecode_sender.send(code_hashes).await?; + } + } + } + // Send empty batch to signal that no more batches are incoming + storage_sender.send(vec![]).await?; + storage_healer_handler + .await + .map_err(|_| StoreError::Custom(String::from("Failed to join storage_handler task")))??; + Ok(()) +} + +/// Waits for incoming hashed addresses from the receiver channel endpoint and queues the associated root nodes for state retrieval +/// Also retrieves their children nodes until we have the full storage trie stored +async fn storage_healer( + state_root: H256, + mut receiver: Receiver>, + peers: Arc>, + store: Store, +) -> Result<(), SyncError> { + const BATCH_SIZE: usize = 200; + // Pending list of bytecodes to fetch + let mut pending_storages: Vec<(H256, Nibbles)> = vec![]; + let mut incoming = true; + while incoming { + // Fetch incoming requests + match receiver.recv().await { + Some(account_paths) if !account_paths.is_empty() => { + // Add the root paths of each account trie to the queue + pending_storages.extend( + account_paths + .into_iter() + .map(|acc_path| (acc_path, Nibbles::default())), + ); + } + // Disconnect / Empty message signaling no more bytecodes to sync + _ => incoming = false, + } + // If we have enough pending storages to fill a batch + // or if we have no more incoming batches, spawn a fetch process + while pending_storages.len() >= BATCH_SIZE || !incoming && !pending_storages.is_empty() { + let mut next_batch: BTreeMap> = BTreeMap::new(); + // Group pending storages by account path + // We do this here instead of keeping them sorted so we don't prioritize further nodes from the first tries + for (account, path) in pending_storages.drain(..BATCH_SIZE.min(pending_storages.len())) { + next_batch.entry(account).or_default().push(path); + } + let return_batch = + heal_storage_batch(state_root, next_batch, peers.clone(), store.clone()).await?; + for (acc_path, paths) in return_batch { + for path in paths { + pending_storages.push((acc_path, path)); + } + } + } + } + Ok(()) +} + +/// Receives a set of storage trie paths (grouped by their corresponding account's state trie path), +/// fetches their respective nodes, stores them, and returns their children paths and the paths that couldn't be fetched so they can be returned to the queue +async fn heal_storage_batch( + state_root: H256, + mut batch: BTreeMap>, + peers: Arc>, + store: Store, +) -> Result>, StoreError> { + loop { + let peer = peers.lock().await.get_peer_channels().await; + if let Some(mut nodes) = peer + .request_storage_trienodes(state_root, batch.clone()) + .await + { + debug!("Received {} nodes", nodes.len()); + // Process the nodes for each account path + for (acc_path, paths) in batch.iter_mut() { + let mut trie = store.open_storage_trie(*acc_path, *EMPTY_TRIE_HASH); + let trie_state = trie.state_mut(); + // Get the corresponding nodes + for node in nodes.drain(..paths.len().min(nodes.len())) { + let path = paths.remove(0); + // Add children to batch + let children = node_missing_children(&node, &path, trie_state)?; + paths.extend(children); + // Add node to the state + let hash = node.compute_hash(); + trie_state.write_node(node, hash)?; + } + // Cut the loop if we ran out of nodes + if nodes.is_empty() { + break; + } + } + // Return remaining and added paths to be added to the queue + return Ok(batch); + } + } +} + +/// Returns the partial paths to the node's children if they are not already part of the trie state +fn node_missing_children( + node: &Node, + parent_path: &Nibbles, + trie_state: &TrieState, +) -> Result, TrieError> { + let mut paths = Vec::new(); + match &node { + Node::Branch(node) => { + for (index, child) in node.choices.iter().enumerate() { + if child.is_valid() && trie_state.get_node(child.clone())?.is_none() { + paths.push(parent_path.append_new(index as u8)); + } + } + } + Node::Extension(node) => { + if node.child.is_valid() && trie_state.get_node(node.child.clone())?.is_none() { + paths.push(parent_path.concat(node.prefix.clone())); + } + } + _ => {} + } + Ok(paths) } #[derive(thiserror::Error, Debug)] @@ -501,7 +732,13 @@ enum SyncError { #[error(transparent)] Store(#[from] StoreError), #[error(transparent)] - SendBytecode(#[from] SendError>), + SendHashes(#[from] SendError>), #[error(transparent)] SendStorage(#[from] SendError>), + #[error(transparent)] + Trie(#[from] TrieError), + #[error(transparent)] + RLP(#[from] RLPDecodeError), + #[error("Corrupt path during state healing")] + CorruptPath, } diff --git a/crates/storage/store/storage.rs b/crates/storage/store/storage.rs index d647f9467..93e721456 100644 --- a/crates/storage/store/storage.rs +++ b/crates/storage/store/storage.rs @@ -945,6 +945,8 @@ impl Store { self.engine.open_state_trie(*EMPTY_TRIE_HASH) } + /// Methods exclusive for trie management during snap-syncing + // Obtain a state trie from the given state root // Doesn't check if the state root is valid pub fn open_state_trie(&self, state_root: H256) -> Trie { @@ -956,6 +958,30 @@ impl Store { pub fn open_storage_trie(&self, account_hash: H256, storage_root: H256) -> Trie { self.engine.open_storage_trie(account_hash, storage_root) } + + /// Returns true if the given node is part of the state trie's internal storage + pub fn contains_state_node(&self, node_hash: H256) -> Result { + // Root is irrelevant, we only care about the internal state + Ok(self + .open_state_trie(*EMPTY_TRIE_HASH) + .state() + .get_node(node_hash.into())? + .is_some()) + } + + /// Returns true if the given node is part of the given storage trie's internal storage + pub fn contains_storage_node( + &self, + hashed_address: H256, + node_hash: H256, + ) -> Result { + // Root is irrelevant, we only care about the internal state + Ok(self + .open_storage_trie(hashed_address, *EMPTY_TRIE_HASH) + .state() + .get_node(node_hash.into())? + .is_some()) + } } pub fn hash_address(address: &Address) -> Vec { diff --git a/crates/storage/trie/nibbles.rs b/crates/storage/trie/nibbles.rs index 6b7a91d3c..300776713 100644 --- a/crates/storage/trie/nibbles.rs +++ b/crates/storage/trie/nibbles.rs @@ -8,7 +8,7 @@ use ethrex_rlp::{ }; /// Struct representing a list of nibbles (half-bytes) -#[derive(Debug, Clone, Default, PartialEq)] +#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)] pub struct Nibbles { data: Vec, } @@ -177,6 +177,18 @@ impl Nibbles { }) .collect::>() } + + pub fn concat(&self, other: Nibbles) -> Nibbles { + Nibbles { + data: [self.data.clone(), other.data].concat(), + } + } + + pub fn append_new(&self, nibble: u8) -> Nibbles { + Nibbles { + data: [self.data.clone(), vec![nibble]].concat(), + } + } } impl AsRef<[u8]> for Nibbles { diff --git a/crates/storage/trie/node.rs b/crates/storage/trie/node.rs index 92c634a46..d6bf0b4a5 100644 --- a/crates/storage/trie/node.rs +++ b/crates/storage/trie/node.rs @@ -165,6 +165,15 @@ impl Node { } }) } + + /// Computes the node's hash + pub fn compute_hash(&self) -> NodeHash { + match self { + Node::Branch(n) => n.compute_hash(), + Node::Extension(n) => n.compute_hash(), + Node::Leaf(n) => n.compute_hash(), + } + } } fn decode_child(rlp: &[u8]) -> NodeHash { diff --git a/crates/storage/trie/state.rs b/crates/storage/trie/state.rs index 77fe4710e..bc849de63 100644 --- a/crates/storage/trie/state.rs +++ b/crates/storage/trie/state.rs @@ -91,4 +91,13 @@ impl TrieState { Ok(()) } + + /// Writes a node directly to the DB bypassing the cache + pub fn write_node(&mut self, node: Node, hash: NodeHash) -> Result<(), TrieError> { + // Don't insert the node if it is already inlined on the parent + if matches!(hash, NodeHash::Hashed(_)) { + self.db.put(hash.into(), node.encode_to_vec())?; + } + Ok(()) + } } diff --git a/crates/storage/trie/trie.rs b/crates/storage/trie/trie.rs index 067d75d01..ee61b639a 100644 --- a/crates/storage/trie/trie.rs +++ b/crates/storage/trie/trie.rs @@ -11,8 +11,6 @@ mod trie_iter; mod verify_range; use ethereum_types::H256; use ethrex_rlp::constants::RLP_NULL; -use nibbles::Nibbles; -use node::Node; use node_hash::NodeHash; use sha3::{Digest, Keccak256}; use std::collections::HashSet; @@ -21,10 +19,12 @@ use std::collections::HashSet; pub use self::db::{libmdbx::LibmdbxTrieDB, libmdbx_dupsort::LibmdbxDupsortTrieDB}; pub use self::db::{in_memory::InMemoryTrieDB, TrieDB}; +pub use self::nibbles::Nibbles; pub use self::verify_range::verify_range; +pub use self::{node::Node, state::TrieState}; pub use self::error::TrieError; -use self::{node::LeafNode, state::TrieState, trie_iter::TrieIterator}; +use self::{node::LeafNode, trie_iter::TrieIterator}; use lazy_static::lazy_static; @@ -320,6 +320,18 @@ impl Trie { } } + /// Returns a mutable reference to the trie's internal node state + /// [WARNING] This will allow directly manipulating the trie's state and + /// may lead to inconsistent trie structures if not used resposibly + pub fn state_mut(&mut self) -> &mut TrieState { + &mut self.state + } + + /// Returns a reference to the trie's internal node state + pub fn state(&mut self) -> &TrieState { + &self.state + } + #[cfg(all(test, feature = "libmdbx"))] /// Creates a new Trie based on a temporary Libmdbx DB fn new_temp() -> Self {