Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(l1): pooled transactions #1444

Merged
merged 22 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci_l1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ jobs:
test_pattern: /AccountRange|StorageRanges|ByteCodes|TrieNodes
- name: "Devp2p eth tests"
simulation: devp2p
test_pattern: eth/Status|GetBlockHeaders|SimultaneousRequests|SameRequestID|ZeroRequestID|GetBlockBodies|MaliciousHandshake|MaliciousStatus|Transaction|InvalidTxs|NewPooledTxs
test_pattern: eth/Status|GetBlockHeaders|SimultaneousRequests|SameRequestID|ZeroRequestID|GetBlockBodies|MaliciousHandshake|MaliciousStatus|Transaction|InvalidTxs|NewPooledTxs|BlobViolations
- name: "Engine Auth and EC tests"
simulation: ethereum/engine
test_pattern: engine-(auth|exchange-capabilities)/
Expand Down
2 changes: 1 addition & 1 deletion crates/blockchain/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use ethrex_storage::{error::StoreError, Store};
pub fn add_blob_transaction(
transaction: EIP4844Transaction,
blobs_bundle: BlobsBundle,
store: Store,
store: &Store,
) -> Result<H256, MempoolError> {
// Validate blobs bundle
blobs_bundle.validate(&transaction)?;
Expand Down
145 changes: 145 additions & 0 deletions crates/common/types/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use ethrex_rlp::{
structs::{Decoder, Encoder},
};

use super::BlobsBundle;

// The `#[serde(untagged)]` attribute allows the `Transaction` enum to be serialized without
// a tag indicating the variant type. This means that Serde will serialize the enum's variants
// directly according to the structure of the variant itself.
Expand All @@ -35,6 +37,116 @@ pub enum Transaction {
PrivilegedL2Transaction(PrivilegedL2Transaction),
}

/// The same as a Transaction enum, only that blob transactions are in wrapped format, including
/// the blobs bundle.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum P2PTransaction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is a structure specifically made to fit the format expected by p2p can we move it there? Placing it in the same module as core types can lead to mix ups in the future

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm addressing fran's comments first, if they are all right we can then move it.

Arkenan marked this conversation as resolved.
Show resolved Hide resolved
LegacyTransaction(LegacyTransaction),
EIP2930Transaction(EIP2930Transaction),
EIP1559Transaction(EIP1559Transaction),
WrappedEIP4844Transaction(WrappedEIP4844Transaction),
Arkenan marked this conversation as resolved.
Show resolved Hide resolved
PrivilegedL2Transaction(PrivilegedL2Transaction),
}

impl TryInto<Transaction> for P2PTransaction {
type Error = String;

fn try_into(self) -> Result<Transaction, Self::Error> {
match self {
P2PTransaction::LegacyTransaction(itx) => Ok(Transaction::LegacyTransaction(itx)),
P2PTransaction::EIP2930Transaction(itx) => Ok(Transaction::EIP2930Transaction(itx)),
P2PTransaction::EIP1559Transaction(itx) => Ok(Transaction::EIP1559Transaction(itx)),
P2PTransaction::PrivilegedL2Transaction(itx) => {
Ok(Transaction::PrivilegedL2Transaction(itx))
}
_ => Err("Can't convert blob p2p transaction into regular transaction. Blob bundle would be lost.".to_string()),
}
}
}

impl RLPEncode for P2PTransaction {
fn encode(&self, buf: &mut dyn bytes::BufMut) {
match self {
P2PTransaction::LegacyTransaction(t) => t.encode(buf),
tx => Bytes::copy_from_slice(&tx.encode_canonical_to_vec()).encode(buf),
};
}
}

impl RLPDecode for P2PTransaction {
fn decode_unfinished(rlp: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> {
if is_encoded_as_bytes(rlp)? {
// Adjust the encoding to get the payload
let payload = get_rlp_bytes_item_payload(rlp);
let tx_type = payload.first().unwrap();
Arkenan marked this conversation as resolved.
Show resolved Hide resolved
let tx_encoding = &payload[1..];
Arkenan marked this conversation as resolved.
Show resolved Hide resolved
// Look at the first byte to check if it corresponds to a TransactionType
match *tx_type {
// Legacy
0x0 => LegacyTransaction::decode_unfinished(tx_encoding)
.map(|(tx, rem)| (P2PTransaction::LegacyTransaction(tx), rem)), // TODO: check if this is a real case scenario
// EIP2930
0x1 => EIP2930Transaction::decode_unfinished(tx_encoding)
.map(|(tx, rem)| (P2PTransaction::EIP2930Transaction(tx), rem)),
// EIP1559
0x2 => EIP1559Transaction::decode_unfinished(tx_encoding)
.map(|(tx, rem)| (P2PTransaction::EIP1559Transaction(tx), rem)),
// EIP4844
0x3 => WrappedEIP4844Transaction::decode_unfinished(tx_encoding)
.map(|(tx, rem)| (P2PTransaction::WrappedEIP4844Transaction(tx), rem)),
// PriviligedL2
0x7e => PrivilegedL2Transaction::decode_unfinished(tx_encoding)
.map(|(tx, rem)| (P2PTransaction::PrivilegedL2Transaction(tx), rem)),
ty => Err(RLPDecodeError::Custom(format!(
"Invalid transaction type: {ty}"
))),
}
} else {
// LegacyTransaction
LegacyTransaction::decode_unfinished(rlp)
.map(|(tx, rem)| (P2PTransaction::LegacyTransaction(tx), rem))
}
}
}
Arkenan marked this conversation as resolved.
Show resolved Hide resolved

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct WrappedEIP4844Transaction {
pub tx: EIP4844Transaction,
pub blobs_bundle: BlobsBundle,
}

impl RLPEncode for WrappedEIP4844Transaction {
Arkenan marked this conversation as resolved.
Show resolved Hide resolved
fn encode(&self, buf: &mut dyn bytes::BufMut) {
let encoder = Encoder::new(buf);
encoder
.encode_field(&self.tx)
.encode_field(&self.blobs_bundle.blobs)
.encode_field(&self.blobs_bundle.commitments)
.encode_field(&self.blobs_bundle.proofs)
.finish();
}
}

impl RLPDecode for WrappedEIP4844Transaction {
fn decode_unfinished(rlp: &[u8]) -> Result<(WrappedEIP4844Transaction, &[u8]), RLPDecodeError> {
let decoder = Decoder::new(rlp)?;
let (tx, decoder) = decoder.decode_field("tx")?;
let (blobs, decoder) = decoder.decode_field("blobs")?;
let (commitments, decoder) = decoder.decode_field("commitments")?;
let (proofs, decoder) = decoder.decode_field("proofs")?;

let wrapped = WrappedEIP4844Transaction {
tx,
blobs_bundle: BlobsBundle {
blobs,
commitments,
proofs,
},
};
Ok((wrapped, decoder.finish()?))
}
}

#[derive(Clone, Debug, PartialEq, Eq, Default)]
pub struct LegacyTransaction {
pub nonce: u64,
Expand Down Expand Up @@ -1254,6 +1366,39 @@ mod canonic_encoding {
buf
}
}

impl P2PTransaction {
pub fn tx_type(&self) -> TxType {
match self {
P2PTransaction::LegacyTransaction(_) => TxType::Legacy,
P2PTransaction::EIP2930Transaction(_) => TxType::EIP2930,
P2PTransaction::EIP1559Transaction(_) => TxType::EIP1559,
P2PTransaction::WrappedEIP4844Transaction(_) => TxType::EIP4844,
P2PTransaction::PrivilegedL2Transaction(_) => TxType::Privileged,
}
}

pub fn encode_canonical(&self, buf: &mut dyn bytes::BufMut) {
match self {
// Legacy transactions don't have a prefix
P2PTransaction::LegacyTransaction(_) => {}
_ => buf.put_u8(self.tx_type() as u8),
}
match self {
P2PTransaction::LegacyTransaction(t) => t.encode(buf),
P2PTransaction::EIP2930Transaction(t) => t.encode(buf),
P2PTransaction::EIP1559Transaction(t) => t.encode(buf),
P2PTransaction::WrappedEIP4844Transaction(t) => t.encode(buf),
P2PTransaction::PrivilegedL2Transaction(t) => t.encode(buf),
};
}

pub fn encode_canonical_to_vec(&self) -> Vec<u8> {
let mut buf = Vec::new();
self.encode_canonical(&mut buf);
buf
}
}
}

// Serialization
Expand Down
2 changes: 1 addition & 1 deletion crates/l2/utils/eth_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use ethrex_core::{
types::{
BlobsBundle, EIP1559Transaction, EIP4844Transaction, GenericTransaction,
PrivilegedL2Transaction, PrivilegedTxType, Signable, TxKind, TxType,
WrappedEIP4844Transaction,
},
H160,
};
Expand All @@ -21,7 +22,6 @@ use ethrex_rpc::{
types::{
block::RpcBlock,
receipt::{RpcLog, RpcReceipt},
transaction::WrappedEIP4844Transaction,
},
utils::{RpcErrorResponse, RpcRequest, RpcRequestId, RpcSuccessResponse},
};
Expand Down
8 changes: 8 additions & 0 deletions crates/networking/p2p/rlpx/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,14 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
let request = GetPooledTransactions::new(random(), hashes);
self.send(Message::GetPooledTransactions(request)).await?;
}
// TODO: Also add handler for get pooled transactions.
Arkenan marked this conversation as resolved.
Show resolved Hide resolved
Message::GetPooledTransactions(msg) => {
let response = msg.handle(&self.storage)?;
self.send(Message::PooledTransactions(response)).await?;
}
Message::PooledTransactions(msg) if peer_supports_eth => {
msg.handle(&self.storage)?;
}
Message::GetStorageRanges(req) => {
let response = process_storage_ranges_request(req, self.storage.clone())?;
self.send(Message::StorageRanges(response)).await?
Expand Down
84 changes: 79 additions & 5 deletions crates/networking/p2p/rlpx/eth/transactions.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use bytes::BufMut;
use bytes::Bytes;
use ethrex_blockchain::error::MempoolError;
use ethrex_blockchain::mempool;
use ethrex_core::types::P2PTransaction;
use ethrex_core::types::WrappedEIP4844Transaction;
use ethrex_core::{types::Transaction, H256};
use ethrex_rlp::{
error::{RLPDecodeError, RLPEncodeError},
Expand Down Expand Up @@ -156,6 +160,59 @@ impl GetPooledTransactions {
id,
}
}

pub fn handle(&self, store: &Store) -> Result<PooledTransactions, StoreError> {
let txs = self
.transaction_hashes
.iter()
.map(|hash| Self::get_p2p_transaction(hash, store))
Arkenan marked this conversation as resolved.
Show resolved Hide resolved
// Return an error in case anything failed.
.collect::<Result<Vec<_>, _>>()?
.into_iter()
// As per the spec, Nones are perfectly acceptable, for example if a transaction was
// taken out of the mempool due to payload building after being advertised.
.flatten()
.collect();

// TODO: add getting of the blob bundle, as we'll implement this as a p2p transaction.
Arkenan marked this conversation as resolved.
Show resolved Hide resolved
Ok(PooledTransactions {
id: self.id,
pooled_transactions: txs,
})
}

/// Gets a p2p transaction given a hash.
fn get_p2p_transaction(
hash: &H256,
store: &Store,
) -> Result<Option<P2PTransaction>, StoreError> {
let Some(tx) = store.get_transaction_by_hash(*hash)? else {
return Ok(None);
};
let result = match tx {
Transaction::LegacyTransaction(itx) => P2PTransaction::LegacyTransaction(itx),
Transaction::EIP2930Transaction(itx) => P2PTransaction::EIP2930Transaction(itx),
Transaction::EIP1559Transaction(itx) => P2PTransaction::EIP1559Transaction(itx),
Transaction::EIP4844Transaction(itx) => {
let Some(bundle) = store.get_blobs_bundle_from_pool(*hash)? else {
return Err(StoreError::Custom(format!(
"Blob transaction present without its bundle: hash {}",
hash
)));
};

P2PTransaction::WrappedEIP4844Transaction(WrappedEIP4844Transaction {
tx: itx,
blobs_bundle: bundle,
})
}
Transaction::PrivilegedL2Transaction(itx) => {
P2PTransaction::PrivilegedL2Transaction(itx)
}
};

Ok(Some(result))
}
}

impl RLPxMessage for GetPooledTransactions {
Expand All @@ -182,20 +239,37 @@ impl RLPxMessage for GetPooledTransactions {
}

// https://github.com/ethereum/devp2p/blob/master/caps/eth.md#pooledtransactions-0x0a
#[derive(Debug)]
pub(crate) struct PooledTransactions {
// id is a u64 chosen by the requesting peer, the responding peer must mirror the value for the response
// https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages
id: u64,
pooled_transactions: Vec<Transaction>,
pooled_transactions: Vec<P2PTransaction>,
}

impl PooledTransactions {
pub fn new(id: u64, pooled_transactions: Vec<Transaction>) -> Self {
pub fn new(id: u64, pooled_transactions: Vec<P2PTransaction>) -> Self {
Self {
pooled_transactions,
id,
}
}

/// Saves every incoming pooled transaction to the mempool.
pub fn handle(&self, store: &Store) -> Result<(), MempoolError> {
Arkenan marked this conversation as resolved.
Show resolved Hide resolved
for tx in &self.pooled_transactions {
if let P2PTransaction::WrappedEIP4844Transaction(itx) = tx.clone() {
mempool::add_blob_transaction(itx.tx, itx.blobs_bundle, store)?;
} else {
let regular_tx = tx
.clone()
.try_into()
.map_err(|error| MempoolError::StoreError(StoreError::Custom(error)))?;
mempool::add_transaction(regular_tx, store)?;
}
}
Ok(())
}
}

impl RLPxMessage for PooledTransactions {
Expand All @@ -214,7 +288,7 @@ impl RLPxMessage for PooledTransactions {
let decompressed_data = snappy_decompress(msg_data)?;
let decoder = Decoder::new(&decompressed_data)?;
let (id, decoder): (u64, _) = decoder.decode_field("request-id")?;
let (pooled_transactions, _): (Vec<Transaction>, _) =
let (pooled_transactions, _): (Vec<P2PTransaction>, _) =
decoder.decode_field("pooledTransactions")?;

Ok(Self::new(id, pooled_transactions))
Expand All @@ -223,7 +297,7 @@ impl RLPxMessage for PooledTransactions {

#[cfg(test)]
mod tests {
use ethrex_core::{types::Transaction, H256};
use ethrex_core::{types::P2PTransaction, H256};

use crate::rlpx::{
eth::transactions::{GetPooledTransactions, PooledTransactions},
Expand Down Expand Up @@ -262,7 +336,7 @@ mod tests {

#[test]
fn pooled_transactions_of_one_type() {
let transaction1 = Transaction::LegacyTransaction(Default::default());
let transaction1 = P2PTransaction::LegacyTransaction(Default::default());
let pooled_transactions = vec![transaction1.clone()];
let pooled_transactions = PooledTransactions::new(1, pooled_transactions);

Expand Down
13 changes: 12 additions & 1 deletion crates/networking/p2p/rlpx/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use std::fmt::Display;
use super::eth::blocks::{BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders};
use super::eth::receipts::Receipts;
use super::eth::status::StatusMessage;
use super::eth::transactions::{GetPooledTransactions, NewPooledTransactionHashes, Transactions};
use super::eth::transactions::{
GetPooledTransactions, NewPooledTransactionHashes, PooledTransactions, Transactions,
};
use super::p2p::{DisconnectMessage, HelloMessage, PingMessage, PongMessage};
use super::snap::{
AccountRange, ByteCodes, GetAccountRange, GetByteCodes, GetStorageRanges, GetTrieNodes,
Expand Down Expand Up @@ -34,6 +36,7 @@ pub(crate) enum Message {
BlockBodies(BlockBodies),
NewPooledTransactionHashes(NewPooledTransactionHashes),
GetPooledTransactions(GetPooledTransactions),
PooledTransactions(PooledTransactions),
Receipts(Receipts),
// snap capability
GetAccountRange(GetAccountRange),
Expand Down Expand Up @@ -75,6 +78,9 @@ impl Message {
0x19 => Ok(Message::GetPooledTransactions(
GetPooledTransactions::decode(msg_data)?,
)),
0x1a => Ok(Message::PooledTransactions(PooledTransactions::decode(
msg_data,
)?)),
0x20 => Ok(Message::Receipts(Receipts::decode(msg_data)?)),
0x21 => Ok(Message::GetAccountRange(GetAccountRange::decode(msg_data)?)),
0x22 => Ok(Message::AccountRange(AccountRange::decode(msg_data)?)),
Expand Down Expand Up @@ -140,6 +146,10 @@ impl Message {
0x19_u8.encode(buf);
msg.encode(buf)
}
Message::PooledTransactions(msg) => {
0x1a_u8.encode(buf);
msg.encode(buf)
}
Message::Receipts(msg) => {
0x20_u8.encode(buf);
msg.encode(buf)
Expand Down Expand Up @@ -193,6 +203,7 @@ impl Display for Message {
Message::BlockBodies(_) => "eth:BlockBodies".fmt(f),
Message::NewPooledTransactionHashes(_) => "eth:NewPooledTransactionHashes".fmt(f),
Message::GetPooledTransactions(_) => "eth::GetPooledTransactions".fmt(f),
Message::PooledTransactions(_) => "eth::PooledTransactions".fmt(f),
Message::Transactions(_) => "eth:TransactionsMessage".fmt(f),
Message::GetBlockBodies(_) => "eth:GetBlockBodies".fmt(f),
Message::Receipts(_) => "eth:Receipts".fmt(f),
Expand Down
Loading
Loading