From 43ca6efc05b125fc2a14d296f4d86d9b364047dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Thu, 28 Nov 2024 17:05:51 +0100 Subject: [PATCH 01/15] receive new pooled transaction hashes message --- crates/networking/p2p/rlpx/connection.rs | 4 ++++ crates/networking/p2p/rlpx/message.rs | 11 ++++++++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/crates/networking/p2p/rlpx/connection.rs b/crates/networking/p2p/rlpx/connection.rs index ca2200030..5824cc6ad 100644 --- a/crates/networking/p2p/rlpx/connection.rs +++ b/crates/networking/p2p/rlpx/connection.rs @@ -349,6 +349,10 @@ impl RLPxConnection { }; self.send(Message::BlockBodies(response)).await?; } + Message::NewPooledTransactionHashes(msg) if peer_supports_eth => { + // For now we always ask for the transactions we don't know. We might want to add checks to avoid + // repetitions in the future. + } Message::GetStorageRanges(req) => { let response = process_storage_ranges_request(req, self.storage.clone())?; self.send(Message::StorageRanges(response)).await? diff --git a/crates/networking/p2p/rlpx/message.rs b/crates/networking/p2p/rlpx/message.rs index d44f1b4ca..0bdbf99c1 100644 --- a/crates/networking/p2p/rlpx/message.rs +++ b/crates/networking/p2p/rlpx/message.rs @@ -4,7 +4,7 @@ use std::fmt::Display; use super::eth::blocks::{BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders}; use super::eth::status::StatusMessage; -use super::eth::transactions::Transactions; +use super::eth::transactions::{NewPooledTransactionHashes, Transactions}; use super::p2p::{DisconnectMessage, HelloMessage, PingMessage, PongMessage}; use super::snap::{ AccountRange, ByteCodes, GetAccountRange, GetByteCodes, GetStorageRanges, GetTrieNodes, @@ -31,6 +31,7 @@ pub(crate) enum Message { Transactions(Transactions), GetBlockBodies(GetBlockBodies), BlockBodies(BlockBodies), + NewPooledTransactionHashes(NewPooledTransactionHashes), // snap capability GetAccountRange(GetAccountRange), AccountRange(AccountRange), @@ -64,6 +65,9 @@ impl Message { 0x13 => Ok(Message::GetBlockHeaders(GetBlockHeaders::decode(msg_data)?)), 0x14 => Ok(Message::BlockHeaders(BlockHeaders::decode(msg_data)?)), 0x15 => Ok(Message::GetBlockBodies(GetBlockBodies::decode(msg_data)?)), + 0x18 => Ok(Message::NewPooledTransactionHashes( + NewPooledTransactionHashes::decode(msg_data)?, + )), 0x21 => Ok(Message::GetAccountRange(GetAccountRange::decode(msg_data)?)), 0x22 => Ok(Message::AccountRange(AccountRange::decode(msg_data)?)), 0x23 => Ok(Message::GetStorageRanges(GetStorageRanges::decode( @@ -120,6 +124,10 @@ impl Message { 0x16_u8.encode(buf); msg.encode(buf) } + Message::NewPooledTransactionHashes(msg) => { + 0x18_u8.encode(buf); + msg.encode(buf) + } Message::GetAccountRange(msg) => { 0x21_u8.encode(buf); msg.encode(buf) @@ -167,6 +175,7 @@ impl Display for Message { Message::GetBlockHeaders(_) => "eth:getBlockHeaders".fmt(f), Message::BlockHeaders(_) => "eth:BlockHeaders".fmt(f), Message::BlockBodies(_) => "eth:BlockBodies".fmt(f), + Message::NewPooledTransactionHashes(_) => "eth:NewPooledTransactionHashes".fmt(f), Message::Transactions(_) => "eth:TransactionsMessage".fmt(f), Message::GetBlockBodies(_) => "eth:GetBlockBodies".fmt(f), Message::GetAccountRange(_) => "snap:GetAccountRange".fmt(f), From c2474e830c22dcf3e4b2529be9442ea178a3b9cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Mon, 2 Dec 2024 19:40:21 +0100 Subject: [PATCH 02/15] Add pooled transactions messages roundtrip --- crates/networking/p2p/rlpx/connection.rs | 14 ++++++++++--- .../networking/p2p/rlpx/eth/transactions.rs | 5 +++++ crates/networking/p2p/rlpx/message.rs | 11 +++++++++- crates/storage/store/storage.rs | 20 ++++++++++++++++++- 4 files changed, 45 insertions(+), 5 deletions(-) diff --git a/crates/networking/p2p/rlpx/connection.rs b/crates/networking/p2p/rlpx/connection.rs index fb0b77ad3..95c2dd157 100644 --- a/crates/networking/p2p/rlpx/connection.rs +++ b/crates/networking/p2p/rlpx/connection.rs @@ -21,6 +21,7 @@ use crate::{ use super::{ error::RLPxError, + eth::transactions::GetPooledTransactions, frame, handshake::{decode_ack_message, decode_auth_message, encode_auth_message}, message as rlpx, @@ -28,7 +29,7 @@ use super::{ utils::{ecdh_xchng, pubkey2id}, }; use aes::cipher::KeyIvInit; -use ethrex_blockchain::mempool; +use ethrex_blockchain::mempool::{self}; use ethrex_core::{H256, H512}; use ethrex_rlp::decode::RLPDecode; use ethrex_storage::Store; @@ -36,6 +37,7 @@ use k256::{ ecdsa::{RecoveryId, Signature, SigningKey, VerifyingKey}, PublicKey, SecretKey, }; +use rand::random; use sha3::{Digest, Keccak256}; use tokio::{ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, @@ -353,9 +355,15 @@ impl RLPxConnection { }; self.send(Message::BlockBodies(response)).await?; } - Message::NewPooledTransactionHashes(msg) if peer_supports_eth => { + Message::NewPooledTransactionHashes(new_pooled_transaction_hashes) + if peer_supports_eth => + { // For now we always ask for the transactions we don't know. We might want to add checks to avoid - // repetitions in the future. + // repetitions in the future (that is, if there's already an ongoing request for any of those transactions). + let hashes = + new_pooled_transaction_hashes.get_transactions_to_request(&self.storage)?; + let request = GetPooledTransactions::new(random(), hashes); + self.send(Message::GetPooledTransactions(request)).await?; } Message::GetStorageRanges(req) => { let response = process_storage_ranges_request(req, self.storage.clone())?; diff --git a/crates/networking/p2p/rlpx/eth/transactions.rs b/crates/networking/p2p/rlpx/eth/transactions.rs index 8ad8c627c..b7c39ab31 100644 --- a/crates/networking/p2p/rlpx/eth/transactions.rs +++ b/crates/networking/p2p/rlpx/eth/transactions.rs @@ -4,6 +4,7 @@ use ethrex_rlp::{ error::{RLPDecodeError, RLPEncodeError}, structs::{Decoder, Encoder}, }; +use ethrex_storage::{error::StoreError, Store}; use crate::rlpx::{ message::RLPxMessage, @@ -93,6 +94,10 @@ impl NewPooledTransactionHashes { transaction_hashes, } } + + pub fn get_transactions_to_request(&self, storage: &Store) -> Result, StoreError> { + Ok(storage.get_unknown_transactions(&self.transaction_hashes)?) + } } impl RLPxMessage for NewPooledTransactionHashes { diff --git a/crates/networking/p2p/rlpx/message.rs b/crates/networking/p2p/rlpx/message.rs index 0bdbf99c1..4bf206638 100644 --- a/crates/networking/p2p/rlpx/message.rs +++ b/crates/networking/p2p/rlpx/message.rs @@ -4,7 +4,7 @@ use std::fmt::Display; use super::eth::blocks::{BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders}; use super::eth::status::StatusMessage; -use super::eth::transactions::{NewPooledTransactionHashes, Transactions}; +use super::eth::transactions::{GetPooledTransactions, NewPooledTransactionHashes, Transactions}; use super::p2p::{DisconnectMessage, HelloMessage, PingMessage, PongMessage}; use super::snap::{ AccountRange, ByteCodes, GetAccountRange, GetByteCodes, GetStorageRanges, GetTrieNodes, @@ -32,6 +32,7 @@ pub(crate) enum Message { GetBlockBodies(GetBlockBodies), BlockBodies(BlockBodies), NewPooledTransactionHashes(NewPooledTransactionHashes), + GetPooledTransactions(GetPooledTransactions), // snap capability GetAccountRange(GetAccountRange), AccountRange(AccountRange), @@ -68,6 +69,9 @@ impl Message { 0x18 => Ok(Message::NewPooledTransactionHashes( NewPooledTransactionHashes::decode(msg_data)?, )), + 0x19 => Ok(Message::GetPooledTransactions( + GetPooledTransactions::decode(msg_data)?, + )), 0x21 => Ok(Message::GetAccountRange(GetAccountRange::decode(msg_data)?)), 0x22 => Ok(Message::AccountRange(AccountRange::decode(msg_data)?)), 0x23 => Ok(Message::GetStorageRanges(GetStorageRanges::decode( @@ -128,6 +132,10 @@ impl Message { 0x18_u8.encode(buf); msg.encode(buf) } + Message::GetPooledTransactions(msg) => { + 0x19_u8.encode(buf); + msg.encode(buf) + } Message::GetAccountRange(msg) => { 0x21_u8.encode(buf); msg.encode(buf) @@ -176,6 +184,7 @@ impl Display for Message { Message::BlockHeaders(_) => "eth:BlockHeaders".fmt(f), Message::BlockBodies(_) => "eth:BlockBodies".fmt(f), Message::NewPooledTransactionHashes(_) => "eth:NewPooledTransactionHashes".fmt(f), + Message::GetPooledTransactions(_) => "eth::GetPooledTransactions".fmt(f), Message::Transactions(_) => "eth:TransactionsMessage".fmt(f), Message::GetBlockBodies(_) => "eth:GetBlockBodies".fmt(f), Message::GetAccountRange(_) => "snap:GetAccountRange".fmt(f), diff --git a/crates/storage/store/storage.rs b/crates/storage/store/storage.rs index d6fbbe736..1a4dbdf79 100644 --- a/crates/storage/store/storage.rs +++ b/crates/storage/store/storage.rs @@ -15,7 +15,7 @@ use ethrex_rlp::encode::RLPEncode; use ethrex_trie::Trie; use serde::{Deserialize, Serialize}; use sha3::{Digest as _, Keccak256}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::sync::{Arc, Mutex}; use tracing::info; @@ -315,6 +315,24 @@ impl Store { Ok(txs_by_sender) } + /// Gets hashes from possible_hashes that are not already known in the mempool. + pub fn get_unknown_transactions( + &self, + possible_hashes: &Vec, + ) -> Result, StoreError> { + let mempool = self + .mempool + .lock() + .map_err(|error| StoreError::Custom(error.to_string()))?; + + let tx_set: HashSet<_> = mempool.iter().map(|(hash, _)| hash).collect(); + Ok(possible_hashes + .iter() + .filter(|hash| !tx_set.contains(hash)) + .map(|hash| *hash) + .collect()) + } + fn add_account_code(&self, code_hash: H256, code: Bytes) -> Result<(), StoreError> { self.engine.add_account_code(code_hash, code) } From 0b1e9c8eedcaa0d36f4790052e9ab850ed08ac05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Mon, 2 Dec 2024 20:08:03 +0100 Subject: [PATCH 03/15] fix linter errors --- crates/networking/p2p/rlpx/eth/transactions.rs | 2 +- crates/storage/store/storage.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/networking/p2p/rlpx/eth/transactions.rs b/crates/networking/p2p/rlpx/eth/transactions.rs index b7c39ab31..14079fc87 100644 --- a/crates/networking/p2p/rlpx/eth/transactions.rs +++ b/crates/networking/p2p/rlpx/eth/transactions.rs @@ -96,7 +96,7 @@ impl NewPooledTransactionHashes { } pub fn get_transactions_to_request(&self, storage: &Store) -> Result, StoreError> { - Ok(storage.get_unknown_transactions(&self.transaction_hashes)?) + storage.get_unknown_transactions(&self.transaction_hashes) } } diff --git a/crates/storage/store/storage.rs b/crates/storage/store/storage.rs index 1a4dbdf79..a404c26b2 100644 --- a/crates/storage/store/storage.rs +++ b/crates/storage/store/storage.rs @@ -318,7 +318,7 @@ impl Store { /// Gets hashes from possible_hashes that are not already known in the mempool. pub fn get_unknown_transactions( &self, - possible_hashes: &Vec, + possible_hashes: &[H256], ) -> Result, StoreError> { let mempool = self .mempool @@ -329,7 +329,7 @@ impl Store { Ok(possible_hashes .iter() .filter(|hash| !tx_set.contains(hash)) - .map(|hash| *hash) + .copied() .collect()) } From d3b503ba30db65e6570df6979e6a02e9627944d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Wed, 4 Dec 2024 15:59:16 +0100 Subject: [PATCH 04/15] Fix new pooled hashes decoding error, add new pooled txs hive test to ci --- .github/workflows/integration.yaml | 2 +- crates/networking/p2p/rlpx/connection.rs | 7 ++++--- crates/networking/p2p/rlpx/eth/transactions.rs | 8 ++++---- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index 40e1da3ef..a66675d54 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -84,7 +84,7 @@ jobs: run_command: make run-hive-on-latest SIMULATION=devp2p TEST_PATTERN="/AccountRange|StorageRanges|ByteCodes|TrieNodes" - simulation: eth name: "Devp2p eth tests" - run_command: make run-hive-on-latest SIMULATION=devp2p TEST_PATTERN="eth/Status|GetBlockHeaders|SimultaneousRequests|SameRequestID|ZeroRequestID|GetBlockBodies|MaliciousHandshake|MaliciousStatus|Transaction|InvalidTxs" + run_command: make run-hive-on-latest SIMULATION=devp2p TEST_PATTERN="eth/Status|GetBlockHeaders|SimultaneousRequests|SameRequestID|ZeroRequestID|GetBlockBodies|MaliciousHandshake|MaliciousStatus|Transaction|InvalidTxs|NewPooledTxs" - simulation: engine name: "Engine Auth and EC tests" run_command: make run-hive-on-latest SIMULATION=ethereum/engine TEST_PATTERN="engine-(auth|exchange-capabilities)/" diff --git a/crates/networking/p2p/rlpx/connection.rs b/crates/networking/p2p/rlpx/connection.rs index 95c2dd157..523a465a6 100644 --- a/crates/networking/p2p/rlpx/connection.rs +++ b/crates/networking/p2p/rlpx/connection.rs @@ -315,7 +315,7 @@ impl RLPxConnection { match message { Message::Disconnect(msg_data) => { debug!("Received Disconnect: {:?}", msg_data.reason); - // Returning a Disonnect error to be handled later at the call stack + // Returning a Disconnect error to be handled later at the call stack return Err(RLPxError::Disconnect()); } Message::Ping(_) => { @@ -358,8 +358,9 @@ impl RLPxConnection { Message::NewPooledTransactionHashes(new_pooled_transaction_hashes) if peer_supports_eth => { - // For now we always ask for the transactions we don't know. We might want to add checks to avoid - // repetitions in the future (that is, if there's already an ongoing request for any of those transactions). + // For now we always ask for the transactions we don't know. We might want to add + // checks to avoid repetitions in the future (that is, if there's already an + // ongoing request for any of those transactions). let hashes = new_pooled_transaction_hashes.get_transactions_to_request(&self.storage)?; let request = GetPooledTransactions::new(random(), hashes); diff --git a/crates/networking/p2p/rlpx/eth/transactions.rs b/crates/networking/p2p/rlpx/eth/transactions.rs index 14079fc87..c29e1e9b3 100644 --- a/crates/networking/p2p/rlpx/eth/transactions.rs +++ b/crates/networking/p2p/rlpx/eth/transactions.rs @@ -1,4 +1,5 @@ use bytes::BufMut; +use bytes::Bytes; use ethrex_core::{types::Transaction, H256}; use ethrex_rlp::{ error::{RLPDecodeError, RLPEncodeError}, @@ -65,7 +66,7 @@ impl RLPxMessage for Transactions { // Broadcast message #[derive(Debug)] pub(crate) struct NewPooledTransactionHashes { - transaction_types: Vec, + transaction_types: Bytes, transaction_sizes: Vec, transaction_hashes: Vec, } @@ -89,7 +90,7 @@ impl NewPooledTransactionHashes { transaction_hashes.push(transaction_hash); } Self { - transaction_types, + transaction_types: transaction_types.into(), transaction_sizes, transaction_hashes, } @@ -117,8 +118,7 @@ impl RLPxMessage for NewPooledTransactionHashes { fn decode(msg_data: &[u8]) -> Result { let decompressed_data = snappy_decompress(msg_data)?; let decoder = Decoder::new(&decompressed_data)?; - let (transaction_types, decoder): (Vec, _) = - decoder.decode_field("transactionTypes")?; + let (transaction_types, decoder): (Bytes, _) = decoder.decode_field("transactionTypes")?; let (transaction_sizes, decoder): (Vec, _) = decoder.decode_field("transactionSizes")?; let (transaction_hashes, _): (Vec, _) = decoder.decode_field("transactionHashes")?; From 13b7ea6af39113c9aaf3239fdd882ee2a8163aca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Wed, 4 Dec 2024 17:37:30 +0100 Subject: [PATCH 05/15] bump github versions to comply with linter --- .github/workflows/integration.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index a66675d54..c3601ed05 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -103,7 +103,7 @@ jobs: docker load --input /tmp/ethrex_image.tar - name: Checkout sources - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Rustup toolchain install uses: dtolnay/rust-toolchain@stable @@ -111,7 +111,7 @@ jobs: toolchain: ${{ env.RUST_VERSION }} - name: Setup Go - uses: actions/setup-go@v3 + uses: actions/setup-go@v5 - name: Run Hive Simulation run: ${{ matrix.run_command }} From bdffc385390211e9da0e0cdd71283be4bedb844f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Thu, 5 Dec 2024 15:05:07 +0100 Subject: [PATCH 06/15] merge with main, erase integration yaml --- .github/workflows/integration.yaml | 117 ----------------------------- 1 file changed, 117 deletions(-) delete mode 100644 .github/workflows/integration.yaml diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml deleted file mode 100644 index c3601ed05..000000000 --- a/.github/workflows/integration.yaml +++ /dev/null @@ -1,117 +0,0 @@ -name: Integration Test -on: - push: - branches: ["main"] - merge_group: - pull_request: - branches: ["**"] - -concurrency: - group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} - cancel-in-progress: true - -env: - RUST_VERSION: 1.80.1 - -jobs: - docker_build: - name: Docker Build image - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 - - - name: Build Docker image - uses: docker/build-push-action@v5 - with: - context: . - file: ./Dockerfile - load: true - tags: ethrex - outputs: type=docker,dest=/tmp/ethrex_image.tar - - - name: Upload artifacts - uses: actions/upload-artifact@v4 - with: - name: ethrex_image - path: /tmp/ethrex_image.tar - - run-assertoor: - name: Assertoor - Stability Check - runs-on: ubuntu-latest - needs: [docker_build] - steps: - - uses: actions/checkout@v4 - - - name: Download artifacts - uses: actions/download-artifact@v4 - with: - name: ethrex_image - path: /tmp - - - name: Load image - run: | - docker load --input /tmp/ethrex_image.tar - - - name: Setup kurtosis testnet and run assertoor tests - uses: ethpandaops/kurtosis-assertoor-github-action@v1 - with: - kurtosis_version: "1.4.2" - ethereum_package_url: "github.com/lambdaclass/ethereum-package" - ethereum_package_branch: "ethrex-integration" - ethereum_package_args: "./test_data/network_params.yaml" - - run-hive: - name: Hive - ${{ matrix.name }} - runs-on: ubuntu-latest - needs: [docker_build] - strategy: - matrix: - include: - - simulation: rpc-compat - name: "Rpc Compat tests" - run_command: make run-hive-on-latest SIMULATION=ethereum/rpc-compat TEST_PATTERN="/eth_chainId|eth_getTransactionByBlockHashAndIndex|eth_getTransactionByBlockNumberAndIndex|eth_getCode|eth_getStorageAt|eth_call|eth_getTransactionByHash|eth_getBlockByHash|eth_getBlockByNumber|eth_createAccessList|eth_getBlockTransactionCountByNumber|eth_getBlockTransactionCountByHash|eth_getBlockReceipts|eth_getTransactionReceipt|eth_blobGasPrice|eth_blockNumber|ethGetTransactionCount|debug_getRawHeader|debug_getRawBlock|debug_getRawTransaction|debug_getRawReceipts|eth_estimateGas|eth_getBalance|eth_sendRawTransaction|eth_getProof|eth_getLogs" - - simulation: rpc-auth - name: "Engine Auth tests" - run_command: make run-hive-on-latest SIMULATION=ethereum/engine TEST_PATTERN="auth/engine-auth" - - simulation: discv4 - name: "Devp2p discv4 tests" - run_command: make run-hive-on-latest SIMULATION=devp2p TEST_PATTERN="discv4" - - simulation: snap - name: "Devp2p snap tests" - run_command: make run-hive-on-latest SIMULATION=devp2p TEST_PATTERN="/AccountRange|StorageRanges|ByteCodes|TrieNodes" - - simulation: eth - name: "Devp2p eth tests" - run_command: make run-hive-on-latest SIMULATION=devp2p TEST_PATTERN="eth/Status|GetBlockHeaders|SimultaneousRequests|SameRequestID|ZeroRequestID|GetBlockBodies|MaliciousHandshake|MaliciousStatus|Transaction|InvalidTxs|NewPooledTxs" - - simulation: engine - name: "Engine Auth and EC tests" - run_command: make run-hive-on-latest SIMULATION=ethereum/engine TEST_PATTERN="engine-(auth|exchange-capabilities)/" - - simulation: engine-cancun - name: "Cancun Engine tests" - run_command: make run-hive-on-latest SIMULATION=ethereum/engine HIVE_EXTRA_ARGS="--sim.parallelism 4" TEST_PATTERN="engine-cancun/Blob Transactions On Block 1|Blob Transaction Ordering, Single|Blob Transaction Ordering, Multiple Accounts|Replace Blob Transactions|Parallel Blob Transactions|ForkchoiceUpdatedV3 Modifies Payload ID on Different Beacon Root|NewPayloadV3 After Cancun|NewPayloadV3 Versioned Hashes|Incorrect BlobGasUsed|Bad Hash|ParentHash equals BlockHash|RPC:|in ForkchoiceState|Unknown|Invalid PayloadAttributes|Unique|ForkchoiceUpdated Version on Payload Request|Re-Execute Payload|In-Order Consecutive Payload|Multiple New Payloads|Valid NewPayload->|NewPayload with|Payload Build after|Build Payload with|Invalid Missing Ancestor ReOrg, StateRoot|Re-Org Back to|Re-org to Previously|Safe Re-Org to Side Chain|Transaction Re-Org, Re-Org Back In|Re-Org Back into Canonical Chain, Depth=5|Suggested Fee Recipient Test|PrevRandao Opcode|Invalid NewPayload, [^R][^e]|Fork ID Genesis=0, Cancun=0|Fork ID Genesis=0, Cancun=1|Fork ID Genesis=1, Cancun=0|Fork ID Genesis=1, Cancun=2, Shanghai=2" - steps: - - name: Download artifacts - uses: actions/download-artifact@v4 - with: - name: ethrex_image - path: /tmp - - - name: Load image - run: | - docker load --input /tmp/ethrex_image.tar - - - name: Checkout sources - uses: actions/checkout@v4 - - - name: Rustup toolchain install - uses: dtolnay/rust-toolchain@stable - with: - toolchain: ${{ env.RUST_VERSION }} - - - name: Setup Go - uses: actions/setup-go@v5 - - - name: Run Hive Simulation - run: ${{ matrix.run_command }} From 4ae8b37642dff2a620ecdcd72f20cc2991bfd2e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Thu, 5 Dec 2024 15:07:12 +0100 Subject: [PATCH 07/15] readd pooled tx test --- .github/workflows/ci_l1.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci_l1.yaml b/.github/workflows/ci_l1.yaml index 3c495456f..9ebf98baf 100644 --- a/.github/workflows/ci_l1.yaml +++ b/.github/workflows/ci_l1.yaml @@ -157,7 +157,7 @@ jobs: test_pattern: /AccountRange|StorageRanges|ByteCodes|TrieNodes - name: "Devp2p eth tests" simulation: devp2p - test_pattern: eth/Status|GetBlockHeaders|SimultaneousRequests|SameRequestID|ZeroRequestID|GetBlockBodies|MaliciousHandshake|MaliciousStatus|Transaction|InvalidTxs + test_pattern: eth/Status|GetBlockHeaders|SimultaneousRequests|SameRequestID|ZeroRequestID|GetBlockBodies|MaliciousHandshake|MaliciousStatus|Transaction|InvalidTxs|NewPooledTxs - name: "Engine Auth and EC tests" simulation: ethereum/engine test_pattern: engine-(auth|exchange-capabilities)/ From a3d9b3818d0037095f1a2c8ca590f00659b196de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Thu, 5 Dec 2024 17:23:59 +0100 Subject: [PATCH 08/15] address comments --- crates/networking/p2p/rlpx/connection.rs | 6 +++--- crates/networking/p2p/rlpx/eth/transactions.rs | 2 +- crates/storage/store/storage.rs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/networking/p2p/rlpx/connection.rs b/crates/networking/p2p/rlpx/connection.rs index f10ac7e8a..0e97fcd26 100644 --- a/crates/networking/p2p/rlpx/connection.rs +++ b/crates/networking/p2p/rlpx/connection.rs @@ -382,11 +382,11 @@ impl RLPxConnection { Message::NewPooledTransactionHashes(new_pooled_transaction_hashes) if peer_supports_eth => { - // For now we always ask for the transactions we don't know. We might want to add - // checks to avoid repetitions in the future (that is, if there's already an - // ongoing request for any of those transactions). + //TODO(#1415): evaluate keeping track of requests to avoid sending the same twice. let hashes = new_pooled_transaction_hashes.get_transactions_to_request(&self.storage)?; + + //TODO(#1416): Evaluate keeping track of the request-id. let request = GetPooledTransactions::new(random(), hashes); self.send(Message::GetPooledTransactions(request)).await?; } diff --git a/crates/networking/p2p/rlpx/eth/transactions.rs b/crates/networking/p2p/rlpx/eth/transactions.rs index b4fdd1626..0f8a614c4 100644 --- a/crates/networking/p2p/rlpx/eth/transactions.rs +++ b/crates/networking/p2p/rlpx/eth/transactions.rs @@ -97,7 +97,7 @@ impl NewPooledTransactionHashes { } pub fn get_transactions_to_request(&self, storage: &Store) -> Result, StoreError> { - storage.get_unknown_transactions(&self.transaction_hashes) + storage.filter_unknown_transactions(&self.transaction_hashes) } } diff --git a/crates/storage/store/storage.rs b/crates/storage/store/storage.rs index cd51177b5..aa169b777 100644 --- a/crates/storage/store/storage.rs +++ b/crates/storage/store/storage.rs @@ -326,7 +326,7 @@ impl Store { } /// Gets hashes from possible_hashes that are not already known in the mempool. - pub fn get_unknown_transactions( + pub fn filter_unknown_transactions( &self, possible_hashes: &[H256], ) -> Result, StoreError> { From f6e4f124a97f1c7b4910190418dac61d1dcc8233 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Mon, 9 Dec 2024 12:58:51 -0300 Subject: [PATCH 09/15] Add handling of GetPooledTransaction message --- crates/networking/p2p/rlpx/connection.rs | 9 ++++++++- .../networking/p2p/rlpx/eth/transactions.rs | 20 +++++++++++++++++++ crates/networking/p2p/rlpx/message.rs | 13 +++++++++++- 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/crates/networking/p2p/rlpx/connection.rs b/crates/networking/p2p/rlpx/connection.rs index 0e97fcd26..34aa6a779 100644 --- a/crates/networking/p2p/rlpx/connection.rs +++ b/crates/networking/p2p/rlpx/connection.rs @@ -22,7 +22,7 @@ use crate::{ use super::{ error::RLPxError, - eth::transactions::GetPooledTransactions, + eth::transactions::{GetPooledTransactions, PooledTransactions}, frame, handshake::{decode_ack_message, decode_auth_message, encode_auth_message}, message::{self as rlpx}, @@ -390,6 +390,13 @@ impl RLPxConnection { let request = GetPooledTransactions::new(random(), hashes); self.send(Message::GetPooledTransactions(request)).await?; } + // TODO: Also add handler for get pooled transactions. + Message::GetPooledTransactions(msg) => { + // We need to get the transactions for each hash and build a response with the same id. + let response = msg.handle(&self.storage)?; + self.send(Message::PooledTransactions(response)).await?; + } + Message::PooledTransactions(msg) if peer_supports_eth => {} Message::GetStorageRanges(req) => { let response = process_storage_ranges_request(req, self.storage.clone())?; self.send(Message::StorageRanges(response)).await? diff --git a/crates/networking/p2p/rlpx/eth/transactions.rs b/crates/networking/p2p/rlpx/eth/transactions.rs index 0f8a614c4..b4338d8fa 100644 --- a/crates/networking/p2p/rlpx/eth/transactions.rs +++ b/crates/networking/p2p/rlpx/eth/transactions.rs @@ -156,6 +156,25 @@ impl GetPooledTransactions { id, } } + + pub fn handle(&self, store: &Store) -> Result { + let txs = self + .transaction_hashes + .iter() + .map(|hash| store.get_transaction_by_hash(*hash)) + // Return an error in case anything failed. + .collect::, _>>()? + .into_iter() + // As per the spec, Nones are perfectly acceptable, for example if a transaction was + // taken out of the mempool due to payload building after being advertised. + .flatten() + .collect(); + + Ok(PooledTransactions { + id: self.id, + pooled_transactions: txs, + }) + } } impl RLPxMessage for GetPooledTransactions { @@ -182,6 +201,7 @@ impl RLPxMessage for GetPooledTransactions { } // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#pooledtransactions-0x0a +#[derive(Debug)] pub(crate) struct PooledTransactions { // id is a u64 chosen by the requesting peer, the responding peer must mirror the value for the response // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages diff --git a/crates/networking/p2p/rlpx/message.rs b/crates/networking/p2p/rlpx/message.rs index 7176cf96b..3bb1941a5 100644 --- a/crates/networking/p2p/rlpx/message.rs +++ b/crates/networking/p2p/rlpx/message.rs @@ -5,7 +5,9 @@ use std::fmt::Display; use super::eth::blocks::{BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders}; use super::eth::receipts::Receipts; use super::eth::status::StatusMessage; -use super::eth::transactions::{GetPooledTransactions, NewPooledTransactionHashes, Transactions}; +use super::eth::transactions::{ + GetPooledTransactions, NewPooledTransactionHashes, PooledTransactions, Transactions, +}; use super::p2p::{DisconnectMessage, HelloMessage, PingMessage, PongMessage}; use super::snap::{ AccountRange, ByteCodes, GetAccountRange, GetByteCodes, GetStorageRanges, GetTrieNodes, @@ -34,6 +36,7 @@ pub(crate) enum Message { BlockBodies(BlockBodies), NewPooledTransactionHashes(NewPooledTransactionHashes), GetPooledTransactions(GetPooledTransactions), + PooledTransactions(PooledTransactions), Receipts(Receipts), // snap capability GetAccountRange(GetAccountRange), @@ -75,6 +78,9 @@ impl Message { 0x19 => Ok(Message::GetPooledTransactions( GetPooledTransactions::decode(msg_data)?, )), + 0x1a => Ok(Message::PooledTransactions(PooledTransactions::decode( + msg_data, + )?)), 0x20 => Ok(Message::Receipts(Receipts::decode(msg_data)?)), 0x21 => Ok(Message::GetAccountRange(GetAccountRange::decode(msg_data)?)), 0x22 => Ok(Message::AccountRange(AccountRange::decode(msg_data)?)), @@ -140,6 +146,10 @@ impl Message { 0x19_u8.encode(buf); msg.encode(buf) } + Message::PooledTransactions(msg) => { + 0x1a_u8.encode(buf); + msg.encode(buf) + } Message::Receipts(msg) => { 0x20_u8.encode(buf); msg.encode(buf) @@ -193,6 +203,7 @@ impl Display for Message { Message::BlockBodies(_) => "eth:BlockBodies".fmt(f), Message::NewPooledTransactionHashes(_) => "eth:NewPooledTransactionHashes".fmt(f), Message::GetPooledTransactions(_) => "eth::GetPooledTransactions".fmt(f), + Message::PooledTransactions(_) => "eth::PooledTransactions".fmt(f), Message::Transactions(_) => "eth:TransactionsMessage".fmt(f), Message::GetBlockBodies(_) => "eth:GetBlockBodies".fmt(f), Message::Receipts(_) => "eth:Receipts".fmt(f), From ee5b184b9750cc2d793d03bcc9afb9681056beee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Tue, 10 Dec 2024 16:32:13 -0300 Subject: [PATCH 10/15] wip: move p2p transactions type to common --- crates/common/types/transaction.rs | 49 +++++++++++++++++++ crates/networking/p2p/rlpx/connection.rs | 5 +- .../networking/p2p/rlpx/eth/transactions.rs | 19 +++++-- crates/networking/rpc/types/transaction.rs | 40 --------------- 4 files changed, 68 insertions(+), 45 deletions(-) diff --git a/crates/common/types/transaction.rs b/crates/common/types/transaction.rs index 33b8872a4..390237a27 100644 --- a/crates/common/types/transaction.rs +++ b/crates/common/types/transaction.rs @@ -17,6 +17,8 @@ use ethrex_rlp::{ structs::{Decoder, Encoder}, }; +use super::BlobsBundle; + // The `#[serde(untagged)]` attribute allows the `Transaction` enum to be serialized without // a tag indicating the variant type. This means that Serde will serialize the enum's variants // directly according to the structure of the variant itself. @@ -35,6 +37,53 @@ pub enum Transaction { PrivilegedL2Transaction(PrivilegedL2Transaction), } +#[derive(Debug)] +pub enum P2PTransaction { + LegacyTransaction(LegacyTransaction), + EIP2930Transaction(EIP2930Transaction), + EIP1559Transaction(EIP1559Transaction), + WrappedEIP4844Transaction(WrappedEIP4844Transaction), + PrivilegedL2Transaction(PrivilegedL2Transaction), +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct WrappedEIP4844Transaction { + pub tx: EIP4844Transaction, + pub blobs_bundle: BlobsBundle, +} + +impl RLPEncode for WrappedEIP4844Transaction { + fn encode(&self, buf: &mut dyn bytes::BufMut) { + let encoder = Encoder::new(buf); + encoder + .encode_field(&self.tx) + .encode_field(&self.blobs_bundle.blobs) + .encode_field(&self.blobs_bundle.commitments) + .encode_field(&self.blobs_bundle.proofs) + .finish(); + } +} + +impl RLPDecode for WrappedEIP4844Transaction { + fn decode_unfinished(rlp: &[u8]) -> Result<(WrappedEIP4844Transaction, &[u8]), RLPDecodeError> { + let decoder = Decoder::new(rlp)?; + let (tx, decoder) = decoder.decode_field("tx")?; + let (blobs, decoder) = decoder.decode_field("blobs")?; + let (commitments, decoder) = decoder.decode_field("commitments")?; + let (proofs, decoder) = decoder.decode_field("proofs")?; + + let wrapped = WrappedEIP4844Transaction { + tx, + blobs_bundle: BlobsBundle { + blobs, + commitments, + proofs, + }, + }; + Ok((wrapped, decoder.finish()?)) + } +} + #[derive(Clone, Debug, PartialEq, Eq, Default)] pub struct LegacyTransaction { pub nonce: u64, diff --git a/crates/networking/p2p/rlpx/connection.rs b/crates/networking/p2p/rlpx/connection.rs index 34aa6a779..aa6f745b1 100644 --- a/crates/networking/p2p/rlpx/connection.rs +++ b/crates/networking/p2p/rlpx/connection.rs @@ -392,11 +392,12 @@ impl RLPxConnection { } // TODO: Also add handler for get pooled transactions. Message::GetPooledTransactions(msg) => { - // We need to get the transactions for each hash and build a response with the same id. let response = msg.handle(&self.storage)?; self.send(Message::PooledTransactions(response)).await?; } - Message::PooledTransactions(msg) if peer_supports_eth => {} + Message::PooledTransactions(msg) if peer_supports_eth => { + msg.handle(&self.storage)?; + } Message::GetStorageRanges(req) => { let response = process_storage_ranges_request(req, self.storage.clone())?; self.send(Message::StorageRanges(response)).await? diff --git a/crates/networking/p2p/rlpx/eth/transactions.rs b/crates/networking/p2p/rlpx/eth/transactions.rs index b4338d8fa..1d273251d 100644 --- a/crates/networking/p2p/rlpx/eth/transactions.rs +++ b/crates/networking/p2p/rlpx/eth/transactions.rs @@ -1,5 +1,7 @@ use bytes::BufMut; use bytes::Bytes; +use ethrex_blockchain::mempool; +use ethrex_core::types::P2PTransaction; use ethrex_core::{types::Transaction, H256}; use ethrex_rlp::{ error::{RLPDecodeError, RLPEncodeError}, @@ -170,6 +172,7 @@ impl GetPooledTransactions { .flatten() .collect(); + // TODO: add getting of the blob bundle, as we'll implement this as a p2p transaction. Ok(PooledTransactions { id: self.id, pooled_transactions: txs, @@ -206,16 +209,26 @@ pub(crate) struct PooledTransactions { // id is a u64 chosen by the requesting peer, the responding peer must mirror the value for the response // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages id: u64, - pooled_transactions: Vec, + pooled_transactions: Vec, } impl PooledTransactions { - pub fn new(id: u64, pooled_transactions: Vec) -> Self { + pub fn new(id: u64, pooled_transactions: Vec) -> Self { Self { pooled_transactions, id, } } + + pub fn handle(&self, store: &Store) -> Result<(), StoreError> { + // We need to save all transactions, one by one, and we also need the senders. + for tx in self.pooled_transactions { + if let P2PTransaction::WrappedEIP4844Transaction() = tx { + mempool::add_blob_transaction(transaction, blobs_bundle, store) + } + } + Err(StoreError::Custom("Implement this plz".to_string())) + } } impl RLPxMessage for PooledTransactions { @@ -234,7 +247,7 @@ impl RLPxMessage for PooledTransactions { let decompressed_data = snappy_decompress(msg_data)?; let decoder = Decoder::new(&decompressed_data)?; let (id, decoder): (u64, _) = decoder.decode_field("request-id")?; - let (pooled_transactions, _): (Vec, _) = + let (pooled_transactions, _): (Vec, _) = decoder.decode_field("pooledTransactions")?; Ok(Self::new(id, pooled_transactions)) diff --git a/crates/networking/rpc/types/transaction.rs b/crates/networking/rpc/types/transaction.rs index df4b5a29c..d1980c71b 100644 --- a/crates/networking/rpc/types/transaction.rs +++ b/crates/networking/rpc/types/transaction.rs @@ -58,46 +58,6 @@ pub enum SendRawTransactionRequest { PriviligedL2(PrivilegedL2Transaction), } -// NOTE: We might move this transaction definitions to `core/types/transactions.rs` later on. - -#[derive(Clone, Debug, PartialEq, Eq)] -pub struct WrappedEIP4844Transaction { - pub tx: EIP4844Transaction, - pub blobs_bundle: BlobsBundle, -} - -impl RLPEncode for WrappedEIP4844Transaction { - fn encode(&self, buf: &mut dyn bytes::BufMut) { - let encoder = Encoder::new(buf); - encoder - .encode_field(&self.tx) - .encode_field(&self.blobs_bundle.blobs) - .encode_field(&self.blobs_bundle.commitments) - .encode_field(&self.blobs_bundle.proofs) - .finish(); - } -} - -impl RLPDecode for WrappedEIP4844Transaction { - fn decode_unfinished(rlp: &[u8]) -> Result<(WrappedEIP4844Transaction, &[u8]), RLPDecodeError> { - let decoder = Decoder::new(rlp)?; - let (tx, decoder) = decoder.decode_field("tx")?; - let (blobs, decoder) = decoder.decode_field("blobs")?; - let (commitments, decoder) = decoder.decode_field("commitments")?; - let (proofs, decoder) = decoder.decode_field("proofs")?; - - let wrapped = WrappedEIP4844Transaction { - tx, - blobs_bundle: BlobsBundle { - blobs, - commitments, - proofs, - }, - }; - Ok((wrapped, decoder.finish()?)) - } -} - impl SendRawTransactionRequest { pub fn to_transaction(&self) -> Transaction { match self { From de923b0b43adaaab626088fd13c5459a47364190 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Wed, 11 Dec 2024 17:58:05 -0300 Subject: [PATCH 11/15] wip, p2p transaction to reg transaction --- crates/blockchain/mempool.rs | 2 +- crates/common/types/transaction.rs | 98 ++++++++++++++++++- .../networking/p2p/rlpx/eth/transactions.rs | 52 +++++++++- 3 files changed, 145 insertions(+), 7 deletions(-) diff --git a/crates/blockchain/mempool.rs b/crates/blockchain/mempool.rs index 9680f37b2..429757e80 100644 --- a/crates/blockchain/mempool.rs +++ b/crates/blockchain/mempool.rs @@ -22,7 +22,7 @@ use ethrex_storage::{error::StoreError, Store}; pub fn add_blob_transaction( transaction: EIP4844Transaction, blobs_bundle: BlobsBundle, - store: Store, + store: &Store, ) -> Result { // Validate blobs bundle blobs_bundle.validate(&transaction)?; diff --git a/crates/common/types/transaction.rs b/crates/common/types/transaction.rs index 390237a27..9887fec01 100644 --- a/crates/common/types/transaction.rs +++ b/crates/common/types/transaction.rs @@ -37,7 +37,9 @@ pub enum Transaction { PrivilegedL2Transaction(PrivilegedL2Transaction), } -#[derive(Debug)] +/// The same as a Transaction enum, only that blob transactions are in wrapped format, including +/// the blobs bundle. +#[derive(Clone, Debug, PartialEq, Eq)] pub enum P2PTransaction { LegacyTransaction(LegacyTransaction), EIP2930Transaction(EIP2930Transaction), @@ -46,6 +48,67 @@ pub enum P2PTransaction { PrivilegedL2Transaction(PrivilegedL2Transaction), } +impl TryInto for P2PTransaction { + type Error = &'static str; + + fn try_into(self) -> Result { + match self { + P2PTransaction::LegacyTransaction(itx) => Ok(Transaction::LegacyTransaction(itx)), + P2PTransaction::EIP2930Transaction(itx) => Ok(Transaction::EIP2930Transaction(itx)), + P2PTransaction::EIP1559Transaction(itx) => Ok(Transaction::EIP1559Transaction(itx)), + P2PTransaction::PrivilegedL2Transaction(itx) => { + Ok(Transaction::PrivilegedL2Transaction(itx)) + } + _ => Err("Can't convert blob p2p transaction into regular transaction. Blob bundle would be lost."), + } + } +} + +impl RLPEncode for P2PTransaction { + fn encode(&self, buf: &mut dyn bytes::BufMut) { + match self { + P2PTransaction::LegacyTransaction(t) => t.encode(buf), + tx => Bytes::copy_from_slice(&tx.encode_canonical_to_vec()).encode(buf), + }; + } +} + +impl RLPDecode for P2PTransaction { + fn decode_unfinished(rlp: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> { + if is_encoded_as_bytes(rlp)? { + // Adjust the encoding to get the payload + let payload = get_rlp_bytes_item_payload(rlp); + let tx_type = payload.first().unwrap(); + let tx_encoding = &payload[1..]; + // Look at the first byte to check if it corresponds to a TransactionType + match *tx_type { + // Legacy + 0x0 => LegacyTransaction::decode_unfinished(tx_encoding) + .map(|(tx, rem)| (P2PTransaction::LegacyTransaction(tx), rem)), // TODO: check if this is a real case scenario + // EIP2930 + 0x1 => EIP2930Transaction::decode_unfinished(tx_encoding) + .map(|(tx, rem)| (P2PTransaction::EIP2930Transaction(tx), rem)), + // EIP1559 + 0x2 => EIP1559Transaction::decode_unfinished(tx_encoding) + .map(|(tx, rem)| (P2PTransaction::EIP1559Transaction(tx), rem)), + // EIP4844 + 0x3 => WrappedEIP4844Transaction::decode_unfinished(tx_encoding) + .map(|(tx, rem)| (P2PTransaction::WrappedEIP4844Transaction(tx), rem)), + // PriviligedL2 + 0x7e => PrivilegedL2Transaction::decode_unfinished(tx_encoding) + .map(|(tx, rem)| (P2PTransaction::PrivilegedL2Transaction(tx), rem)), + ty => Err(RLPDecodeError::Custom(format!( + "Invalid transaction type: {ty}" + ))), + } + } else { + // LegacyTransaction + LegacyTransaction::decode_unfinished(rlp) + .map(|(tx, rem)| (P2PTransaction::LegacyTransaction(tx), rem)) + } + } +} + #[derive(Clone, Debug, PartialEq, Eq)] pub struct WrappedEIP4844Transaction { pub tx: EIP4844Transaction, @@ -1297,6 +1360,39 @@ mod canonic_encoding { buf } } + + impl P2PTransaction { + pub fn tx_type(&self) -> TxType { + match self { + P2PTransaction::LegacyTransaction(_) => TxType::Legacy, + P2PTransaction::EIP2930Transaction(_) => TxType::EIP2930, + P2PTransaction::EIP1559Transaction(_) => TxType::EIP1559, + P2PTransaction::WrappedEIP4844Transaction(_) => TxType::EIP4844, + P2PTransaction::PrivilegedL2Transaction(_) => TxType::Privileged, + } + } + + pub fn encode_canonical(&self, buf: &mut dyn bytes::BufMut) { + match self { + // Legacy transactions don't have a prefix + P2PTransaction::LegacyTransaction(_) => {} + _ => buf.put_u8(self.tx_type() as u8), + } + match self { + P2PTransaction::LegacyTransaction(t) => t.encode(buf), + P2PTransaction::EIP2930Transaction(t) => t.encode(buf), + P2PTransaction::EIP1559Transaction(t) => t.encode(buf), + P2PTransaction::WrappedEIP4844Transaction(t) => t.encode(buf), + P2PTransaction::PrivilegedL2Transaction(t) => t.encode(buf), + }; + } + + pub fn encode_canonical_to_vec(&self) -> Vec { + let mut buf = Vec::new(); + self.encode_canonical(&mut buf); + buf + } + } } // Serialization diff --git a/crates/networking/p2p/rlpx/eth/transactions.rs b/crates/networking/p2p/rlpx/eth/transactions.rs index 1d273251d..28b53ab84 100644 --- a/crates/networking/p2p/rlpx/eth/transactions.rs +++ b/crates/networking/p2p/rlpx/eth/transactions.rs @@ -1,13 +1,19 @@ +use std::fmt::Display; +use std::fmt::Error; + use bytes::BufMut; use bytes::Bytes; +use ethrex_blockchain::error::MempoolError; use ethrex_blockchain::mempool; use ethrex_core::types::P2PTransaction; +use ethrex_core::types::WrappedEIP4844Transaction; use ethrex_core::{types::Transaction, H256}; use ethrex_rlp::{ error::{RLPDecodeError, RLPEncodeError}, structs::{Decoder, Encoder}, }; use ethrex_storage::{error::StoreError, Store}; +use thiserror::Error; use crate::rlpx::{ message::RLPxMessage, @@ -163,7 +169,7 @@ impl GetPooledTransactions { let txs = self .transaction_hashes .iter() - .map(|hash| store.get_transaction_by_hash(*hash)) + .map(|hash| Self::get_p2p_transaction(hash, store)) // Return an error in case anything failed. .collect::, _>>()? .into_iter() @@ -178,6 +184,39 @@ impl GetPooledTransactions { pooled_transactions: txs, }) } + + /// Gets a p2p transaction given a hash. + fn get_p2p_transaction( + hash: &H256, + store: &Store, + ) -> Result, StoreError> { + let Some(tx) = store.get_transaction_by_hash(*hash)? else { + return Ok(None); + }; + let result = match tx { + Transaction::LegacyTransaction(itx) => P2PTransaction::LegacyTransaction(itx), + Transaction::EIP2930Transaction(itx) => P2PTransaction::EIP2930Transaction(itx), + Transaction::EIP1559Transaction(itx) => P2PTransaction::EIP1559Transaction(itx), + Transaction::EIP4844Transaction(itx) => { + let Some(bundle) = store.get_blobs_bundle_from_pool(*hash)? else { + return Err(StoreError::Custom(format!( + "Blob transaction present without its bundle: hash {}", + hash + ))); + }; + + P2PTransaction::WrappedEIP4844Transaction(WrappedEIP4844Transaction { + tx: itx, + blobs_bundle: bundle, + }) + } + Transaction::PrivilegedL2Transaction(itx) => { + P2PTransaction::PrivilegedL2Transaction(itx) + } + }; + + Ok(Some(result)) + } } impl RLPxMessage for GetPooledTransactions { @@ -220,14 +259,17 @@ impl PooledTransactions { } } - pub fn handle(&self, store: &Store) -> Result<(), StoreError> { + pub fn handle(&self, store: &Store) -> Result<(), MempoolError> { // We need to save all transactions, one by one, and we also need the senders. for tx in self.pooled_transactions { - if let P2PTransaction::WrappedEIP4844Transaction() = tx { - mempool::add_blob_transaction(transaction, blobs_bundle, store) + if let P2PTransaction::WrappedEIP4844Transaction(itx) = tx { + mempool::add_blob_transaction(itx.tx, itx.blobs_bundle, store)?; + } else { + let regular_tx = tx.try_into()?; + mempool::add_transaction(regular_tx, store)?; } } - Err(StoreError::Custom("Implement this plz".to_string())) + Ok(()) } } From 153fbe8a6631cb81a996cd828fbe17da022974ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Wed, 11 Dec 2024 18:14:21 -0300 Subject: [PATCH 12/15] compiling --- crates/common/types/transaction.rs | 4 ++-- crates/l2/utils/eth_client/mod.rs | 2 +- crates/networking/p2p/rlpx/eth/transactions.rs | 17 ++++++++--------- crates/networking/rpc/eth/transaction.rs | 2 +- crates/networking/rpc/types/transaction.rs | 11 +++-------- 5 files changed, 15 insertions(+), 21 deletions(-) diff --git a/crates/common/types/transaction.rs b/crates/common/types/transaction.rs index 9887fec01..f7f5002b3 100644 --- a/crates/common/types/transaction.rs +++ b/crates/common/types/transaction.rs @@ -49,7 +49,7 @@ pub enum P2PTransaction { } impl TryInto for P2PTransaction { - type Error = &'static str; + type Error = String; fn try_into(self) -> Result { match self { @@ -59,7 +59,7 @@ impl TryInto for P2PTransaction { P2PTransaction::PrivilegedL2Transaction(itx) => { Ok(Transaction::PrivilegedL2Transaction(itx)) } - _ => Err("Can't convert blob p2p transaction into regular transaction. Blob bundle would be lost."), + _ => Err("Can't convert blob p2p transaction into regular transaction. Blob bundle would be lost.".to_string()), } } } diff --git a/crates/l2/utils/eth_client/mod.rs b/crates/l2/utils/eth_client/mod.rs index 083d13f01..490d5eac5 100644 --- a/crates/l2/utils/eth_client/mod.rs +++ b/crates/l2/utils/eth_client/mod.rs @@ -13,6 +13,7 @@ use ethrex_core::{ types::{ BlobsBundle, EIP1559Transaction, EIP4844Transaction, GenericTransaction, PrivilegedL2Transaction, PrivilegedTxType, Signable, TxKind, TxType, + WrappedEIP4844Transaction, }, H160, }; @@ -21,7 +22,6 @@ use ethrex_rpc::{ types::{ block::RpcBlock, receipt::{RpcLog, RpcReceipt}, - transaction::WrappedEIP4844Transaction, }, utils::{RpcErrorResponse, RpcRequest, RpcRequestId, RpcSuccessResponse}, }; diff --git a/crates/networking/p2p/rlpx/eth/transactions.rs b/crates/networking/p2p/rlpx/eth/transactions.rs index 28b53ab84..c2952b87e 100644 --- a/crates/networking/p2p/rlpx/eth/transactions.rs +++ b/crates/networking/p2p/rlpx/eth/transactions.rs @@ -1,6 +1,3 @@ -use std::fmt::Display; -use std::fmt::Error; - use bytes::BufMut; use bytes::Bytes; use ethrex_blockchain::error::MempoolError; @@ -13,7 +10,6 @@ use ethrex_rlp::{ structs::{Decoder, Encoder}, }; use ethrex_storage::{error::StoreError, Store}; -use thiserror::Error; use crate::rlpx::{ message::RLPxMessage, @@ -261,11 +257,14 @@ impl PooledTransactions { pub fn handle(&self, store: &Store) -> Result<(), MempoolError> { // We need to save all transactions, one by one, and we also need the senders. - for tx in self.pooled_transactions { - if let P2PTransaction::WrappedEIP4844Transaction(itx) = tx { + for tx in &self.pooled_transactions { + if let P2PTransaction::WrappedEIP4844Transaction(itx) = tx.clone() { mempool::add_blob_transaction(itx.tx, itx.blobs_bundle, store)?; } else { - let regular_tx = tx.try_into()?; + let regular_tx = tx + .clone() + .try_into() + .map_err(|error| MempoolError::StoreError(StoreError::Custom(error)))?; mempool::add_transaction(regular_tx, store)?; } } @@ -298,7 +297,7 @@ impl RLPxMessage for PooledTransactions { #[cfg(test)] mod tests { - use ethrex_core::{types::Transaction, H256}; + use ethrex_core::{types::P2PTransaction, H256}; use crate::rlpx::{ eth::transactions::{GetPooledTransactions, PooledTransactions}, @@ -337,7 +336,7 @@ mod tests { #[test] fn pooled_transactions_of_one_type() { - let transaction1 = Transaction::LegacyTransaction(Default::default()); + let transaction1 = P2PTransaction::LegacyTransaction(Default::default()); let pooled_transactions = vec![transaction1.clone()]; let pooled_transactions = PooledTransactions::new(1, pooled_transactions); diff --git a/crates/networking/rpc/eth/transaction.rs b/crates/networking/rpc/eth/transaction.rs index 2e3d0b076..61f92a3de 100644 --- a/crates/networking/rpc/eth/transaction.rs +++ b/crates/networking/rpc/eth/transaction.rs @@ -592,7 +592,7 @@ impl RpcHandler for SendRawTransactionRequest { mempool::add_blob_transaction( wrapped_blob_tx.tx.clone(), wrapped_blob_tx.blobs_bundle.clone(), - context.storage, + &context.storage, ) } else { mempool::add_transaction(self.to_transaction(), &context.storage) diff --git a/crates/networking/rpc/types/transaction.rs b/crates/networking/rpc/types/transaction.rs index d1980c71b..0f63205af 100644 --- a/crates/networking/rpc/types/transaction.rs +++ b/crates/networking/rpc/types/transaction.rs @@ -1,17 +1,12 @@ use ethrex_core::{ serde_utils, types::{ - BlobsBundle, BlockHash, BlockNumber, EIP1559Transaction, EIP2930Transaction, - EIP4844Transaction, LegacyTransaction, PrivilegedL2Transaction, Transaction, + BlockHash, BlockNumber, EIP1559Transaction, EIP2930Transaction, LegacyTransaction, + PrivilegedL2Transaction, Transaction, WrappedEIP4844Transaction, }, Address, H256, }; -use ethrex_rlp::{ - decode::RLPDecode, - encode::RLPEncode, - error::RLPDecodeError, - structs::{Decoder, Encoder}, -}; +use ethrex_rlp::{decode::RLPDecode, error::RLPDecodeError}; use serde::{Deserialize, Serialize}; #[allow(unused)] From 6b02dee7c814f5e94faa4fe8478b0b830d340d9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Wed, 11 Dec 2024 18:15:44 -0300 Subject: [PATCH 13/15] make clippy happy --- crates/networking/p2p/rlpx/connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/networking/p2p/rlpx/connection.rs b/crates/networking/p2p/rlpx/connection.rs index aa6f745b1..e415fea1e 100644 --- a/crates/networking/p2p/rlpx/connection.rs +++ b/crates/networking/p2p/rlpx/connection.rs @@ -22,7 +22,7 @@ use crate::{ use super::{ error::RLPxError, - eth::transactions::{GetPooledTransactions, PooledTransactions}, + eth::transactions::GetPooledTransactions, frame, handshake::{decode_ack_message, decode_auth_message, encode_auth_message}, message::{self as rlpx}, From c46cdb2a90f5165f799d9275e8a33ef1742d5c08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Wed, 11 Dec 2024 19:01:23 -0300 Subject: [PATCH 14/15] Add new passing test --- .github/workflows/ci_l1.yaml | 2 +- crates/networking/p2p/rlpx/eth/transactions.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci_l1.yaml b/.github/workflows/ci_l1.yaml index 264f40824..636587cb6 100644 --- a/.github/workflows/ci_l1.yaml +++ b/.github/workflows/ci_l1.yaml @@ -157,7 +157,7 @@ jobs: test_pattern: /AccountRange|StorageRanges|ByteCodes|TrieNodes - name: "Devp2p eth tests" simulation: devp2p - test_pattern: eth/Status|GetBlockHeaders|SimultaneousRequests|SameRequestID|ZeroRequestID|GetBlockBodies|MaliciousHandshake|MaliciousStatus|Transaction|InvalidTxs|NewPooledTxs + test_pattern: eth/Status|GetBlockHeaders|SimultaneousRequests|SameRequestID|ZeroRequestID|GetBlockBodies|MaliciousHandshake|MaliciousStatus|Transaction|InvalidTxs|NewPooledTxs|BlobViolations - name: "Engine Auth and EC tests" simulation: ethereum/engine test_pattern: engine-(auth|exchange-capabilities)/ diff --git a/crates/networking/p2p/rlpx/eth/transactions.rs b/crates/networking/p2p/rlpx/eth/transactions.rs index c2952b87e..87e80c4a8 100644 --- a/crates/networking/p2p/rlpx/eth/transactions.rs +++ b/crates/networking/p2p/rlpx/eth/transactions.rs @@ -255,8 +255,8 @@ impl PooledTransactions { } } + /// Saves every incoming pooled transaction to the mempool. pub fn handle(&self, store: &Store) -> Result<(), MempoolError> { - // We need to save all transactions, one by one, and we also need the senders. for tx in &self.pooled_transactions { if let P2PTransaction::WrappedEIP4844Transaction(itx) = tx.clone() { mempool::add_blob_transaction(itx.tx, itx.blobs_bundle, store)?; From 0adc376c6a55c2106aff7269ceb910b5db281a64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Fri, 3 Jan 2025 17:27:28 -0300 Subject: [PATCH 15/15] address some comments --- crates/common/types/transaction.rs | 15 +++++++-------- crates/networking/p2p/rlpx/connection.rs | 1 - crates/networking/p2p/rlpx/eth/transactions.rs | 11 +++++------ 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/crates/common/types/transaction.rs b/crates/common/types/transaction.rs index 51adaf923..93d0483b3 100644 --- a/crates/common/types/transaction.rs +++ b/crates/common/types/transaction.rs @@ -44,7 +44,7 @@ pub enum P2PTransaction { LegacyTransaction(LegacyTransaction), EIP2930Transaction(EIP2930Transaction), EIP1559Transaction(EIP1559Transaction), - WrappedEIP4844Transaction(WrappedEIP4844Transaction), + EIP4844TransactionWithBlobs(WrappedEIP4844Transaction), PrivilegedL2Transaction(PrivilegedL2Transaction), } @@ -76,10 +76,9 @@ impl RLPEncode for P2PTransaction { impl RLPDecode for P2PTransaction { fn decode_unfinished(rlp: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> { if is_encoded_as_bytes(rlp)? { - // Adjust the encoding to get the payload - let payload = get_rlp_bytes_item_payload(rlp); - let tx_type = payload.first().unwrap(); - let tx_encoding = &payload[1..]; + let payload = get_rlp_bytes_item_payload(rlp)?; + let tx_type = payload.first().ok_or(RLPDecodeError::InvalidLength)?; + let tx_encoding = &payload.get(1..).ok_or(RLPDecodeError::InvalidLength)?; // Look at the first byte to check if it corresponds to a TransactionType match *tx_type { // Legacy @@ -93,7 +92,7 @@ impl RLPDecode for P2PTransaction { .map(|(tx, rem)| (P2PTransaction::EIP1559Transaction(tx), rem)), // EIP4844 0x3 => WrappedEIP4844Transaction::decode_unfinished(tx_encoding) - .map(|(tx, rem)| (P2PTransaction::WrappedEIP4844Transaction(tx), rem)), + .map(|(tx, rem)| (P2PTransaction::EIP4844TransactionWithBlobs(tx), rem)), // PriviligedL2 0x7e => PrivilegedL2Transaction::decode_unfinished(tx_encoding) .map(|(tx, rem)| (P2PTransaction::PrivilegedL2Transaction(tx), rem)), @@ -1370,7 +1369,7 @@ mod canonic_encoding { P2PTransaction::LegacyTransaction(_) => TxType::Legacy, P2PTransaction::EIP2930Transaction(_) => TxType::EIP2930, P2PTransaction::EIP1559Transaction(_) => TxType::EIP1559, - P2PTransaction::WrappedEIP4844Transaction(_) => TxType::EIP4844, + P2PTransaction::EIP4844TransactionWithBlobs(_) => TxType::EIP4844, P2PTransaction::PrivilegedL2Transaction(_) => TxType::Privileged, } } @@ -1385,7 +1384,7 @@ mod canonic_encoding { P2PTransaction::LegacyTransaction(t) => t.encode(buf), P2PTransaction::EIP2930Transaction(t) => t.encode(buf), P2PTransaction::EIP1559Transaction(t) => t.encode(buf), - P2PTransaction::WrappedEIP4844Transaction(t) => t.encode(buf), + P2PTransaction::EIP4844TransactionWithBlobs(t) => t.encode(buf), P2PTransaction::PrivilegedL2Transaction(t) => t.encode(buf), }; } diff --git a/crates/networking/p2p/rlpx/connection.rs b/crates/networking/p2p/rlpx/connection.rs index a4e0d9c83..dd213bbe6 100644 --- a/crates/networking/p2p/rlpx/connection.rs +++ b/crates/networking/p2p/rlpx/connection.rs @@ -403,7 +403,6 @@ impl RLPxConnection { let request = GetPooledTransactions::new(random(), hashes); self.send(Message::GetPooledTransactions(request)).await?; } - // TODO: Also add handler for get pooled transactions. Message::GetPooledTransactions(msg) => { let response = msg.handle(&self.storage)?; self.send(Message::PooledTransactions(response)).await?; diff --git a/crates/networking/p2p/rlpx/eth/transactions.rs b/crates/networking/p2p/rlpx/eth/transactions.rs index 87e80c4a8..de1aa8404 100644 --- a/crates/networking/p2p/rlpx/eth/transactions.rs +++ b/crates/networking/p2p/rlpx/eth/transactions.rs @@ -162,6 +162,7 @@ impl GetPooledTransactions { } pub fn handle(&self, store: &Store) -> Result { + // TODO(#1615): get transactions in batch instead of iterating over them. let txs = self .transaction_hashes .iter() @@ -174,7 +175,6 @@ impl GetPooledTransactions { .flatten() .collect(); - // TODO: add getting of the blob bundle, as we'll implement this as a p2p transaction. Ok(PooledTransactions { id: self.id, pooled_transactions: txs, @@ -201,7 +201,7 @@ impl GetPooledTransactions { ))); }; - P2PTransaction::WrappedEIP4844Transaction(WrappedEIP4844Transaction { + P2PTransaction::EIP4844TransactionWithBlobs(WrappedEIP4844Transaction { tx: itx, blobs_bundle: bundle, }) @@ -256,13 +256,12 @@ impl PooledTransactions { } /// Saves every incoming pooled transaction to the mempool. - pub fn handle(&self, store: &Store) -> Result<(), MempoolError> { - for tx in &self.pooled_transactions { - if let P2PTransaction::WrappedEIP4844Transaction(itx) = tx.clone() { + pub fn handle(self, store: &Store) -> Result<(), MempoolError> { + for tx in self.pooled_transactions { + if let P2PTransaction::EIP4844TransactionWithBlobs(itx) = tx { mempool::add_blob_transaction(itx.tx, itx.blobs_bundle, store)?; } else { let regular_tx = tx - .clone() .try_into() .map_err(|error| MempoolError::StoreError(StoreError::Custom(error)))?; mempool::add_transaction(regular_tx, store)?;