Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new method to collect changes from all tx in a block #257

Merged
merged 7 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ importer-download *args="":
cargo run --bin importer-download -- --postgres {{postgres_url}} --external-rpc {{testnet_url}} {{args}}

# Importer: Import downloaded external RPC blocks to Stratus storage
importer-import:
cargo run --bin importer-import --release -- --postgres {{postgres_url}} --storage inmemory
importer-import *args="":
cargo run --bin importer-import --release -- --postgres {{postgres_url}} {{args}}

# ------------------------------------------------------------------------------
# Test tasks
Expand Down
10 changes: 5 additions & 5 deletions src/eth/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,16 +174,16 @@ impl EthExecutor {
to = ?transaction.to,
data_len = %transaction.input.len(),
data = %transaction.input,
"executing real transaction"
"executing transaction"
);

// validate
// validates
if transaction.signer.is_zero() {
tracing::warn!("rejecting transaction from zero address");
return Err(anyhow!("Transaction sent from zero address is not allowed."));
return Err(anyhow!("transaction sent from zero address is not allowed."));
}

//creates a block and performs the necessary notifications
// creates a block and performs the necessary notifications
self.mine_and_execute_transaction(transaction).await
}

Expand All @@ -201,7 +201,7 @@ impl EthExecutor {
}

async fn mine_and_execute_transaction(&self, transaction: TransactionInput) -> anyhow::Result<Execution> {
// execute transaction until no more conflicts
// executes transaction until no more conflicts
// TODO: must have a stop condition like timeout or max number of retries
let (execution, block) = loop {
// execute and check conflicts before mining block
Expand Down
32 changes: 32 additions & 0 deletions src/eth/primitives/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@
//! structure, such as querying block information or broadcasting newly mined
//! blocks.

use std::collections::HashMap;

use ethereum_types::H256;
use ethers_core::types::Block as EthersBlock;
use ethers_core::types::Transaction as EthersTransaction;
use itertools::Itertools;
use serde_json::Value as JsonValue;

use crate::eth::primitives::Address;
use crate::eth::primitives::BlockHeader;
use crate::eth::primitives::BlockNumber;
use crate::eth::primitives::ExecutionAccountChanges;
use crate::eth::primitives::ExternalBlock;
use crate::eth::primitives::ExternalTransactionExecution;
use crate::eth::primitives::Hash;
Expand Down Expand Up @@ -72,6 +76,34 @@ impl Block {
pub fn hash(&self) -> &Hash {
&self.header.hash
}

/// Compact block execution changes removing all intermediate changes, keeping only the last value for each modified nonce, balance, bytecode and slot.
pub fn compact_execution_changes(&self) -> Vec<ExecutionAccountChanges> {
let mut block_compacted_changes: HashMap<Address, ExecutionAccountChanges> = HashMap::new();
for transaction in &self.transactions {
for transaction_changes in transaction.execution.changes.clone().into_iter() {
let account_compacted_changes = block_compacted_changes
.entry(transaction_changes.address.clone())
.or_insert(transaction_changes.clone());
if let Some(nonce) = transaction_changes.nonce.take_modified() {
account_compacted_changes.nonce.set_modified(nonce);
}
if let Some(balance) = transaction_changes.balance.take_modified() {
account_compacted_changes.balance.set_modified(balance);
}
if let Some(bytecode) = transaction_changes.bytecode.take_modified() {
account_compacted_changes.bytecode.set_modified(bytecode);
}
for (slot_index, slot) in transaction_changes.slots {
let slot_compacted_changes = account_compacted_changes.slots.entry(slot_index).or_insert(slot.clone());
if let Some(slot_value) = slot.take_modified() {
slot_compacted_changes.set_modified(slot_value);
}
}
}
}
block_compacted_changes.into_values().collect_vec()
}
}

// -----------------------------------------------------------------------------
Expand Down
3 changes: 2 additions & 1 deletion src/eth/primitives/execution_account_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ impl ExecutionAccountChanges {
}

/// Creates a new [`ExecutionAccountChanges`] that represents an account being created by this transaction.
pub fn from_new_account(account: Account, modified_slots: Vec<Slot>) -> Self {
pub fn from_new_account(account: impl Into<Account>, modified_slots: Vec<Slot>) -> Self {
let account = account.into();
let mut changes = Self {
new_account: true,
address: account.address,
Expand Down
251 changes: 124 additions & 127 deletions src/eth/storage/postgres/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ impl PermanentStorage for Postgres {
i64::try_from(block.header.number).context("failed to convert block number")?,
block.header.hash.as_ref(),
block.header.transactions_root.as_ref(),
BigDecimal::try_from(block.header.gas)?,
BigDecimal::try_from(block.header.gas.clone())?,
block.header.bloom.as_ref(),
i64::try_from(block.header.timestamp).context("failed to convert block timestamp")?,
block.header.parent_hash.as_ref()
Expand All @@ -457,6 +457,8 @@ impl PermanentStorage for Postgres {
.await
.context("failed to insert block")?;

let account_changes = block.compact_execution_changes();

for transaction in block.transactions {
let is_success = transaction.is_success();
let to = <[u8; 20]>::from(*transaction.input.to.unwrap_or_default());
Expand Down Expand Up @@ -484,132 +486,6 @@ impl PermanentStorage for Postgres {
.await
.context("failed to insert transaction")?;

for change in transaction.execution.changes {
let (original_nonce, new_nonce) = change.nonce.take_both();
let (original_balance, new_balance) = change.balance.take_both();

let new_nonce: Option<BigDecimal> = match new_nonce {
Some(nonce) => Some(nonce.try_into()?),
None => None,
};

let new_balance: Option<BigDecimal> = match new_balance {
Some(balance) => Some(balance.try_into()?),
None => None,
};

let original_nonce: BigDecimal = original_nonce.unwrap_or_default().try_into()?;
let original_balance: BigDecimal = original_balance.unwrap_or_default().try_into()?;

let bytecode = if is_success {
change
.bytecode
.take()
.unwrap_or_else(|| {
tracing::debug!("bytecode not set, defaulting to None");
None
})
.map(|val| val.as_ref().to_owned())
} else {
None
};

let block_number = i64::try_from(block.header.number).context("failed to convert block number")?;

let account_result: PgQueryResult = sqlx::query_file!(
"src/eth/storage/postgres/queries/insert_account.sql",
change.address.as_ref(),
new_nonce.as_ref().unwrap_or(&original_nonce),
new_balance.as_ref().unwrap_or(&original_balance),
bytecode,
block_number,
original_nonce,
original_balance
)
.execute(&mut *tx)
.await
.context("failed to insert account")?;

// A successful insert/update with no conflicts will have one affected row
if account_result.rows_affected() != 1 {
tx.rollback().await.context("failed to rollback transaction")?;
let error: StorageError = StorageError::Conflict(ExecutionConflicts(nonempty![ExecutionConflict::Account {
address: change.address,
expected_balance: original_balance,
expected_nonce: original_nonce,
}]));
return Err(error);
}

if let Some(balance) = new_balance {
sqlx::query_file!(
"src/eth/storage/postgres/queries/insert_historical_balance.sql",
change.address.as_ref(),
balance,
block_number
)
.execute(&mut *tx)
.await
.context("failed to insert balance")?;
}

if let Some(nonce) = new_nonce {
sqlx::query_file!(
"src/eth/storage/postgres/queries/insert_historical_nonce.sql",
change.address.as_ref(),
nonce,
block_number
)
.execute(&mut *tx)
.await
.context("failed to insert nonce")?;
}

if is_success {
for (slot_idx, value) in change.slots {
let (original_value, val) = value.clone().take_both();
let idx: [u8; 32] = slot_idx.into();
let val: [u8; 32] = val.ok_or(anyhow::anyhow!("critical: no change for slot"))?.value.into(); // the or condition should never happen
let block_number = i64::try_from(block.header.number).context("failed to convert block number")?;
let original_value: [u8; 32] = original_value.unwrap_or_default().value.into();

let slot_result: PgQueryResult = sqlx::query_file!(
"src/eth/storage/postgres/queries/insert_account_slot.sql",
&idx,
&val,
change.address.as_ref(),
block_number,
&original_value
)
.execute(&mut *tx)
.await
.context("failed to insert slot")?;

// A successful insert/update with no conflicts will have one affected row
if slot_result.rows_affected() != 1 {
tx.rollback().await.context("failed to rollback transaction")?;
let error: StorageError = StorageError::Conflict(ExecutionConflicts(nonempty![ExecutionConflict::PgSlot {
address: change.address,
slot: idx,
expected: original_value,
}]));
return Err(error);
}

sqlx::query_file!(
"src/eth/storage/postgres/queries/insert_historical_slot.sql",
&idx,
&val,
change.address.as_ref(),
block_number
)
.execute(&mut *tx)
.await
.context("failed to insert slot to history")?;
}
}
}

if is_success {
for log in transaction.logs {
let addr = log.log.address.as_ref();
Expand Down Expand Up @@ -651,6 +527,127 @@ impl PermanentStorage for Postgres {
}
}

for change in account_changes {
// for change in transaction.execution.changes {
let (original_nonce, new_nonce) = change.nonce.take_both();
let (original_balance, new_balance) = change.balance.take_both();

let new_nonce: Option<BigDecimal> = match new_nonce {
Some(nonce) => Some(nonce.try_into()?),
None => None,
};

let new_balance: Option<BigDecimal> = match new_balance {
Some(balance) => Some(balance.try_into()?),
None => None,
};

let original_nonce: BigDecimal = original_nonce.unwrap_or_default().try_into()?;
let original_balance: BigDecimal = original_balance.unwrap_or_default().try_into()?;

let bytecode = change
.bytecode
.take()
.unwrap_or_else(|| {
tracing::debug!("bytecode not set, defaulting to None");
None
})
.map(|val| val.as_ref().to_owned());

let block_number = i64::try_from(block.header.number).context("failed to convert block number")?;

let account_result: PgQueryResult = sqlx::query_file!(
"src/eth/storage/postgres/queries/insert_account.sql",
change.address.as_ref(),
new_nonce.as_ref().unwrap_or(&original_nonce),
new_balance.as_ref().unwrap_or(&original_balance),
bytecode,
block_number,
original_nonce,
original_balance
)
.execute(&mut *tx)
.await
.context("failed to insert account")?;

// A successful insert/update with no conflicts will have one affected row
if account_result.rows_affected() != 1 {
tx.rollback().await.context("failed to rollback transaction")?;
let error: StorageError = StorageError::Conflict(ExecutionConflicts(nonempty![ExecutionConflict::Account {
address: change.address,
expected_balance: original_balance,
expected_nonce: original_nonce,
}]));
return Err(error);
}

if let Some(balance) = new_balance {
sqlx::query_file!(
"src/eth/storage/postgres/queries/insert_historical_balance.sql",
change.address.as_ref(),
balance,
block_number
)
.execute(&mut *tx)
.await
.context("failed to insert balance")?;
}

if let Some(nonce) = new_nonce {
sqlx::query_file!(
"src/eth/storage/postgres/queries/insert_historical_nonce.sql",
change.address.as_ref(),
nonce,
block_number
)
.execute(&mut *tx)
.await
.context("failed to insert nonce")?;
}

for (slot_idx, value) in change.slots {
let (original_value, val) = value.clone().take_both();
let idx: [u8; 32] = slot_idx.into();
let val: [u8; 32] = val.ok_or(anyhow::anyhow!("critical: no change for slot"))?.value.into(); // the or condition should never happen
let block_number = i64::try_from(block.header.number).context("failed to convert block number")?;
let original_value: [u8; 32] = original_value.unwrap_or_default().value.into();

let slot_result: PgQueryResult = sqlx::query_file!(
"src/eth/storage/postgres/queries/insert_account_slot.sql",
&idx,
&val,
change.address.as_ref(),
block_number,
&original_value
)
.execute(&mut *tx)
.await
.context("failed to insert slot")?;

// A successful insert/update with no conflicts will have one affected row
if slot_result.rows_affected() != 1 {
tx.rollback().await.context("failed to rollback transaction")?;
let error: StorageError = StorageError::Conflict(ExecutionConflicts(nonempty![ExecutionConflict::PgSlot {
address: change.address,
slot: idx,
expected: original_value,
}]));
return Err(error);
}

sqlx::query_file!(
"src/eth/storage/postgres/queries/insert_historical_slot.sql",
&idx,
&val,
change.address.as_ref(),
block_number
)
.execute(&mut *tx)
.await
.context("failed to insert slot to history")?;
}
}

tx.commit().await.context("failed to commit transaction")?;

Ok(())
Expand Down
Loading
Loading