Skip to content

Commit

Permalink
feat(l1): fix snap sync + add healing (#1505)
Browse files Browse the repository at this point in the history
**Motivation**
Fix snap sync logic:
Instead of rebuilding all block's state via snap, we select a pivot
block (sync head - 64) fetch its state via snap and then execute all
blocks after it
Add Healing phase

Missing from this PR:
- Reorg handling
- Fetching receipts

<!-- Why does this pull request exist? What are its goals? -->

**Description**

<!-- A clear and concise general description of the changes this PR
introduces -->

<!-- Link to issues: Resolves #111, Resolves #222 -->

Closes #1455

---------

Co-authored-by: fkrause98 <[email protected]>
Co-authored-by: Martin Paulucci <[email protected]>
  • Loading branch information
3 people authored Dec 30, 2024
1 parent f63bba3 commit 3398ac8
Show file tree
Hide file tree
Showing 11 changed files with 620 additions and 168 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci_l1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ jobs:
- name: "Paris Engine tests"
simulation: ethereum/engine
test_pattern: "engine-api/RPC|Re-Org Back to Canonical Chain From Syncing Chain|Re-org to Previously Validated Sidechain Payload|Re-Org Back into Canonical Chain, Depth=5|Safe Re-Org|Transaction Re-Org|Inconsistent|Suggested Fee|PrevRandao|Fork ID|Unknown|Invalid PayloadAttributes|Bad Hash|Unique Payload ID|Re-Execute Payload|In-Order|Multiple New Payloads|Valid NewPayload|NewPayload with|Invalid NewPayload|Payload Build|Invalid NewPayload, Transaction|ParentHash equals|Build Payload|Invalid Missing Ancestor ReOrg"
- name: "Sync"
simulation: ethereum/sync
test_pattern: ""
steps:
- name: Checkout sources
uses: actions/checkout@v4
Expand Down
32 changes: 21 additions & 11 deletions crates/networking/docs/Sync.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,33 @@

## Snap Sync

A snap sync cycle begins by fetching all the block headers (via p2p) between the current head (latest canonical block) and the sync head (block hash sent by a forkChoiceUpdate).
The next two steps are performed in parallel:
On one side, blocks and receipts for all fetched headers are fetched via p2p and stored.
A snap sync cycle begins by fetching all the block headers (via eth p2p) between the current head (latest canonical block) and the sync head (block hash sent by a forkChoiceUpdate).

On the other side, the state is reconstructed via p2p snap requests. Our current implementation of this works as follows:
We will spawn two processes, the `bytecode_fetcher` which will remain active and process bytecode requests in batches by requesting bytecode batches from peers and storing them, and the `fetch_snap_state` process, which will iterate over the fetched headers and rebuild the block's state via `rebuild_state_trie`.
We will then fetch the block bodies from each header and at the same time select a pivot block (sync head - 64) and start rebuilding its state via snap p2p requests, if the pivot were to become stale during this rebuild we will select a newer pivot (sync head) and restart it.

`rebuild_state_trie` will spawn a `storage_fetcher` process (which works similarly to the `bytecode_fetcher` and is kept alive for the duration of the rebuild process), it will open a new state trie and will fetch the block's accounts in batches and for each account it will: send the account's code hash to the `bytecode_fetcher` (if not empty), send the account's address and storage root to the `storage_fetcher` (if not empty), and add the account to the state trie. Once all accounts are processed, the final state root will be checked and committed.
After we fully rebuilt the pivot state and fetched all the block bodies we will fetch and store the receipts for the range between the current head and the pivot (including it), and at the same time store all blocks in the same range and execute all blocks after the pivot (like in full sync).

(Not implemented yet) When `fetch_snap_state` runs out of available state (aka, the state we need to fetch is older than 128 blocks and peers don't provide it), it will begin the `state_healing` process.
This diagram illustrates the process described above:

![snap_sync](/crates/networking/docs/diagrams/snap_sync.jpg)
![snap_sync](/crates/networking/docs/diagrams/snap_sync.jpg).

### Snap State Rebuild

During snap sync we need to fully rebuild the pivot block's state. We can divide this process into the initial sync and the healing phase.
For the first phase we will spawn two processes, the `bytecode_fetcher` and the `storage_fetcher` which will both remain active and listening for requests from the main rebuild process which they will then queue and process in fixed size batches (more on this later). It will then request the full extent of accounts from the pivot block's state trie via p2p snap requests. For each obtained range we will send the account's code hash and storage root to the `bytecode_fetcher` and `storage_fetcher` respectively for fetching. Once we fetch all accounts (or the account state is no longer available), we will signal the `storage_fetcher` to finish all pending requests and move on to the next phase, while keeping the `bytecode_fetcher` active.

In the healing phase we will spawn another queue-like process called `storage_healer`, and we will begin requesting state trie nodes. We will begin by requesting the pivot block's state's root node proceed by requesting the current node's children (if they are not already part of the state) until we have the full trie stored (aka all child nodes are known). For each fetched leaf node we will send its code hash to the `bytecode_fetcher` and account hash to the `storage_healer`.

The `storage_healer` will contain a list of pending account hashes and paths. And will add new entries by either adding the root node of an account's storage trie when receiving an account hash from the main process or by adding the unknown children of nodes returned by peers.

The `bytecode_fetcher` has its own channel where it receives code hashes from active `rebuild_state_trie` processes. Once a code hash is received, it is added to a pending queue. When the queue has enough messages for a full batch it will request a batch of bytecodes via snap p2p and store them. If a bytecode could not be fetched by the request (aka, we reached the response limit) it is added back to the pending queue. After the whole state is synced `fetch_snap_state` will send an empty list to the `bytecode_fetcher` to signal the end of the requests so it can request the last (incomplete) bytecode batch and end gracefully.
This diagram illustrates the process described above:

![snap_sync](/crates/networking/docs/diagrams/bytecode_fetcher.jpg)
![rebuild_state](/crates/networking/docs/diagrams/rebuild_state_trie.jpg).

The `storage_fetcher` works almost alike, but one will be spawned for each `rebuild_state_trie` process as we can't fetch storages from different blocks in the same request.
To exemplify how queue-like processes work we will explain how the `bytecode_fetcher` works:

The `bytecode_fetcher` has its own channel where it receives code hashes from an active `rebuild_state_trie` process. Once a code hash is received, it is added to a pending queue. When the queue has enough messages for a full batch it will request a batch of bytecodes via snap p2p and store them. If a bytecode could not be fetched by the request (aka, we reached the response limit) it is added back to the pending queue. After the whole state is synced `fetch_snap_state` will send an empty list to the `bytecode_fetcher` to signal the end of the requests so it can request the last (incomplete) bytecode batch and end gracefully.

This diagram illustrates the process described above:

![snap_sync](/crates/networking/docs/diagrams/bytecode_fetcher.jpg)
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified crates/networking/docs/diagrams/snap_sync.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
157 changes: 151 additions & 6 deletions crates/networking/p2p/peer_channels.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
use std::{sync::Arc, time::Duration};
use std::{collections::BTreeMap, sync::Arc, time::Duration};

use bytes::Bytes;
use ethrex_core::{
types::{AccountState, BlockBody, BlockHeader},
types::{AccountState, BlockBody, BlockHeader, Receipt},
H256, U256,
};
use ethrex_rlp::encode::RLPEncode;
use ethrex_trie::verify_range;
use ethrex_trie::Nibbles;
use ethrex_trie::{verify_range, Node};
use tokio::sync::{mpsc, Mutex};

use crate::{
rlpx::{
eth::blocks::{
BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, BLOCK_HEADER_LIMIT,
eth::{
blocks::{
BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, BLOCK_HEADER_LIMIT,
},
receipts::{GetReceipts, Receipts},
},
snap::{
AccountRange, ByteCodes, GetAccountRange, GetByteCodes, GetStorageRanges, StorageRanges,
AccountRange, ByteCodes, GetAccountRange, GetByteCodes, GetStorageRanges, GetTrieNodes,
StorageRanges, TrieNodes,
},
},
snap::encodable_to_proof,
Expand Down Expand Up @@ -121,6 +126,38 @@ impl PeerChannels {
(!block_bodies.is_empty() && block_bodies.len() <= block_hashes_len).then_some(block_bodies)
}

/// Requests all receipts in a set of blocks from the peer given their block hashes
/// Returns the lists of receipts or None if:
/// - There are no available peers (the node just started up or was rejected by all other nodes)
/// - The response timed out
/// - The response was empty or not valid
pub async fn request_receipts(&self, block_hashes: Vec<H256>) -> Option<Vec<Vec<Receipt>>> {
let block_hashes_len = block_hashes.len();
let request_id = rand::random();
let request = RLPxMessage::GetReceipts(GetReceipts {
id: request_id,
block_hashes,
});
self.sender.send(request).await.ok()?;
let mut receiver = self.receiver.lock().await;
let receipts = tokio::time::timeout(PEER_REPLY_TIMOUT, async move {
loop {
match receiver.recv().await {
Some(RLPxMessage::Receipts(Receipts { id, receipts })) if id == request_id => {
return Some(receipts)
}
// Ignore replies that don't match the expected id (such as late responses)
Some(_) => continue,
None => return None,
}
}
})
.await
.ok()??;
// Check that the response is not empty and does not contain more bodies than the ones requested
(!receipts.is_empty() && receipts.len() <= block_hashes_len).then_some(receipts)
}

/// Requests an account range from the peer given the state trie's root and the starting hash (the limit hash will be the maximum value of H256)
/// Will also return a boolean indicating if there is more state to be fetched towards the right of the trie
/// Returns the response message or None if:
Expand Down Expand Up @@ -318,4 +355,112 @@ impl PeerChannels {
}
Some((storage_keys, storage_values, should_continue))
}

/// Requests state trie nodes given the root of the trie where they are contained and their path (be them full or partial)
/// Returns the nodes or None if:
/// - There are no available peers (the node just started up or was rejected by all other nodes)
/// - The response timed out
/// - The response was empty or not valid
pub async fn request_state_trienodes(
&self,
state_root: H256,
paths: Vec<Nibbles>,
) -> Option<Vec<Node>> {
let request_id = rand::random();
let expected_nodes = paths.len();
let request = RLPxMessage::GetTrieNodes(GetTrieNodes {
id: request_id,
root_hash: state_root,
// [acc_path, acc_path,...] -> [[acc_path], [acc_path]]
paths: paths
.into_iter()
.map(|vec| vec![Bytes::from(vec.encode_compact())])
.collect(),
bytes: MAX_RESPONSE_BYTES,
});
self.sender.send(request).await.ok()?;
let mut receiver = self.receiver.lock().await;
let nodes = tokio::time::timeout(PEER_REPLY_TIMOUT, async move {
loop {
match receiver.recv().await {
Some(RLPxMessage::TrieNodes(TrieNodes { id, nodes })) if id == request_id => {
return Some(nodes)
}
// Ignore replies that don't match the expected id (such as late responses)
Some(_) => continue,
None => return None,
}
}
})
.await
.ok()??;
(!nodes.is_empty() && nodes.len() <= expected_nodes)
.then(|| {
nodes
.iter()
.map(|node| Node::decode_raw(node))
.collect::<Result<Vec<_>, _>>()
.ok()
})
.flatten()
}

/// Requests storage trie nodes given the root of the state trie where they are contained and
/// a hashmap mapping the path to the account in the state trie (aka hashed address) to the paths to the nodes in its storage trie (can be full or partial)
/// Returns the nodes or None if:
/// - There are no available peers (the node just started up or was rejected by all other nodes)
/// - The response timed out
/// - The response was empty or not valid
pub async fn request_storage_trienodes(
&self,
state_root: H256,
paths: BTreeMap<H256, Vec<Nibbles>>,
) -> Option<Vec<Node>> {
let request_id = rand::random();
let expected_nodes = paths.iter().fold(0, |acc, item| acc + item.1.len());
let request = RLPxMessage::GetTrieNodes(GetTrieNodes {
id: request_id,
root_hash: state_root,
// {acc_path: [path, path, ...]} -> [[acc_path, path, path, ...]]
paths: paths
.into_iter()
.map(|(acc_path, paths)| {
[
vec![Bytes::from(acc_path.0.to_vec())],
paths
.into_iter()
.map(|path| Bytes::from(path.encode_compact()))
.collect(),
]
.concat()
})
.collect(),
bytes: MAX_RESPONSE_BYTES,
});
self.sender.send(request).await.ok()?;
let mut receiver = self.receiver.lock().await;
let nodes = tokio::time::timeout(PEER_REPLY_TIMOUT, async move {
loop {
match receiver.recv().await {
Some(RLPxMessage::TrieNodes(TrieNodes { id, nodes })) if id == request_id => {
return Some(nodes)
}
// Ignore replies that don't match the expected id (such as late responses)
Some(_) => continue,
None => return None,
}
}
})
.await
.ok()??;
(!nodes.is_empty() && nodes.len() <= expected_nodes)
.then(|| {
nodes
.iter()
.map(|node| Node::decode_raw(node))
.collect::<Result<Vec<_>, _>>()
.ok()
})
.flatten()
}
}
Loading

0 comments on commit 3398ac8

Please sign in to comment.