Skip to content

Commit

Permalink
enha: check transaction index when importing block
Browse files Browse the repository at this point in the history
  • Loading branch information
marcospb19-cw committed Dec 11, 2024
1 parent 426da2f commit 2375226
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
30 changes: 29 additions & 1 deletion src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,35 @@ fn run_block_saver(miner: Arc<Miner>, from_executor_rx: mpsc::Receiver<BlocksToS

async fn fetch_blocks_and_receipts(rpc_storage: Arc<dyn ExternalRpc>, block_start: BlockNumber, block_end: BlockNumber) -> anyhow::Result<BlocksToExecute> {
tracing::info!(parent: None, %block_start, %block_end, "fetching blocks and receipts");
rpc_storage.read_block_and_receipts_in_range(block_start, block_end).await
let mut blocks = rpc_storage.read_block_and_receipts_in_range(block_start, block_end).await?;
for (block, receipts) in blocks.iter_mut() {
// Stably sort transactions and receipts by transaction_index
block.transactions.sort_by(|a, b| a.transaction_index.cmp(&b.transaction_index));
receipts.sort_by(|a, b| a.transaction_index.cmp(&b.transaction_index));

// perform additional checks on the transaction index
for window in block.transactions.windows(2) {
let tx_index = window[0].transaction_index.map_or(u32::MAX, |index| index.as_u32());
let next_tx_index = window[1].transaction_index.map_or(u32::MAX, |index| index.as_u32());
assert!(
tx_index + 1 == next_tx_index,
"two consecutive transactions must have consecutive indices: {} and {}",
tx_index,
next_tx_index,
);
}
for window in receipts.windows(2) {
let tx_index = window[0].transaction_index.as_u32();
let next_tx_index = window[1].transaction_index.as_u32();
assert!(
tx_index + 1 == next_tx_index,
"two consecutive receipts must have consecutive indices: {} and {}",
tx_index,
next_tx_index,
);
}
}
Ok(blocks)
}

async fn block_number_to_stop(rpc_storage: &Arc<dyn ExternalRpc>) -> anyhow::Result<BlockNumber> {
Expand Down
22 changes: 21 additions & 1 deletion src/eth/follower/importer/importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,27 @@ impl Importer {

// keep fetching in order
let mut tasks = futures::stream::iter(tasks).buffered(PARALLEL_BLOCKS);
while let Some((block, receipts)) = tasks.next().await {
while let Some((mut block, mut receipts)) = tasks.next().await {
// Stably sort transactions and receipts by transaction_index
block.transactions.sort_by(|a, b| a.transaction_index.cmp(&b.transaction_index));
receipts.sort_by(|a, b| a.transaction_index.cmp(&b.transaction_index));

// perform additional checks on the transaction index
for window in block.transactions.windows(2) {
let tx_index = window[0].transaction_index.map_or(u32::MAX, |index| index.as_u32());
let next_tx_index = window[1].transaction_index.map_or(u32::MAX, |index| index.as_u32());
if tx_index + 1 != next_tx_index {
tracing::error!(tx_index, next_tx_index, "two consecutive transactions must have consecutive indices");
}
}
for window in receipts.windows(2) {
let tx_index = window[0].transaction_index.as_u32();
let next_tx_index = window[1].transaction_index.as_u32();
if tx_index + 1 != next_tx_index {
tracing::error!(tx_index, next_tx_index, "two consecutive receipts must have consecutive indices");
}
}

if backlog_tx.send((block, receipts)).is_err() {
warn_task_rx_closed(TASK_NAME);
return Ok(());
Expand Down

0 comments on commit 2375226

Please sign in to comment.