From c853b2c37bb06ad9d8edb25703eee6708718b66c Mon Sep 17 00:00:00 2001 From: Kevin Heavey Date: Fri, 10 Jan 2025 14:45:46 +0100 Subject: [PATCH 1/5] remove solana-sdk from cli-output (#4362) * remove solana-sdk from cli-output * missing dev dep --- Cargo.lock | 21 +++++++++++- cli-output/Cargo.toml | 21 +++++++++++- cli-output/src/cli_output.rs | 39 +++++++++++---------- cli-output/src/display.rs | 66 +++++++++++++++++++----------------- programs/sbf/Cargo.lock | 18 +++++++++- svm/examples/Cargo.lock | 18 +++++++++- 6 files changed, 127 insertions(+), 56 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c18516c73bfd68..a9b0f0e2fb25b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6597,11 +6597,30 @@ dependencies = [ "semver 1.0.24", "serde", "serde_json", + "solana-account", "solana-account-decoder", + "solana-bincode", "solana-clap-utils", "solana-cli-config", + "solana-clock", + "solana-epoch-info", + "solana-hash", + "solana-keypair", + "solana-message", + "solana-native-token", + "solana-packet", + "solana-program", + "solana-pubkey", + "solana-reserved-account-keys", "solana-rpc-client-api", - "solana-sdk", + "solana-sdk-ids", + "solana-signature", + "solana-signer", + "solana-system-interface", + "solana-sysvar", + "solana-transaction", + "solana-transaction-context", + "solana-transaction-error", "solana-transaction-status", "solana-vote-program", "spl-memo", diff --git a/cli-output/Cargo.toml b/cli-output/Cargo.toml index b397c6d498b0de..ad0f4ed708e4c8 100644 --- a/cli-output/Cargo.toml +++ b/cli-output/Cargo.toml @@ -21,17 +21,36 @@ pretty-hex = { workspace = true } semver = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +solana-account = { workspace = true } solana-account-decoder = { workspace = true } +solana-bincode = { workspace = true } solana-clap-utils = { workspace = true } solana-cli-config = { workspace = true } +solana-clock = { workspace = true } +solana-epoch-info = { workspace = true } +solana-hash = { workspace = true } +solana-message = { workspace = true } +solana-native-token = { workspace = true } +solana-packet = { workspace = true } +solana-program = { workspace = true } +solana-pubkey = { workspace = true } +solana-reserved-account-keys = { workspace = true } solana-rpc-client-api = { workspace = true } -solana-sdk = { workspace = true } +solana-sdk-ids = { workspace = true } +solana-signature = { workspace = true } +solana-system-interface = { workspace = true } +solana-sysvar = { workspace = true } +solana-transaction = { workspace = true } +solana-transaction-error = { workspace = true } solana-transaction-status = { workspace = true } solana-vote-program = { workspace = true } spl-memo = { workspace = true, features = ["no-entrypoint"] } [dev-dependencies] ed25519-dalek = { workspace = true } +solana-keypair = { workspace = true } +solana-signer = { workspace = true } +solana-transaction-context = { workspace = true } [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/cli-output/src/cli_output.rs b/cli-output/src/cli_output.rs index f3dbb931825474..989ae154c97811 100644 --- a/cli-output/src/cli_output.rs +++ b/cli-output/src/cli_output.rs @@ -16,27 +16,26 @@ use { inflector::cases::titlecase::to_title_case, serde::{Deserialize, Serialize}, serde_json::{Map, Value}, + solana_account::ReadableAccount, solana_account_decoder::{ encode_ui_account, parse_account_data::AccountAdditionalDataV2, parse_token::UiTokenAccount, UiAccountEncoding, UiDataSliceConfig, }, solana_clap_utils::keypair::SignOnly, + solana_clock::{Epoch, Slot, UnixTimestamp}, + solana_epoch_info::EpochInfo, + solana_hash::Hash, + solana_native_token::lamports_to_sol, + solana_program::stake::state::{Authorized, Lockup}, + solana_pubkey::Pubkey, solana_rpc_client_api::response::{ RpcAccountBalance, RpcContactInfo, RpcInflationGovernor, RpcInflationRate, RpcKeyedAccount, RpcSupply, RpcVoteAccountInfo, }, - solana_sdk::{ - account::ReadableAccount, - clock::{Epoch, Slot, UnixTimestamp}, - epoch_info::EpochInfo, - hash::Hash, - native_token::lamports_to_sol, - pubkey::Pubkey, - signature::Signature, - stake::state::{Authorized, Lockup}, - stake_history::StakeHistoryEntry, - transaction::{Transaction, TransactionError, VersionedTransaction}, - }, + solana_signature::Signature, + solana_sysvar::stake_history::StakeHistoryEntry, + solana_transaction::{versioned::VersionedTransaction, Transaction}, + solana_transaction_error::TransactionError, solana_transaction_status::{ EncodedConfirmedBlock, EncodedTransaction, TransactionConfirmationStatus, UiTransactionStatusMeta, @@ -3280,13 +3279,13 @@ mod tests { use { super::*, clap::{App, Arg}, - solana_sdk::{ - message::Message, - pubkey::Pubkey, - signature::{keypair_from_seed, NullSigner, Signature, Signer, SignerError}, - system_instruction, - transaction::Transaction, - }, + solana_keypair::keypair_from_seed, + solana_message::Message, + solana_pubkey::Pubkey, + solana_signature::Signature, + solana_signer::{null_signer::NullSigner, Signer, SignerError}, + solana_system_interface::instruction::transfer, + solana_transaction::Transaction, }; #[test] @@ -3324,7 +3323,7 @@ mod tests { let fee_payer = absent.pubkey(); let nonce_auth = bad.pubkey(); let mut tx = Transaction::new_unsigned(Message::new_with_nonce( - vec![system_instruction::transfer(&from, &to, 42)], + vec![transfer(&from, &to, 42)], Some(&fee_payer), &nonce, &nonce_auth, diff --git a/cli-output/src/display.rs b/cli-output/src/display.rs index bde22b181e4e00..9a31d11692beb6 100644 --- a/cli-output/src/display.rs +++ b/cli-output/src/display.rs @@ -4,20 +4,18 @@ use { chrono::{DateTime, Local, SecondsFormat, TimeZone, Utc}, console::style, indicatif::{ProgressBar, ProgressStyle}, + solana_bincode::limited_deserialize, solana_cli_config::SettingType, - solana_sdk::{ - clock::UnixTimestamp, - hash::Hash, - instruction::CompiledInstruction, - message::v0::MessageAddressTableLookup, - native_token::lamports_to_sol, - program_utils::limited_deserialize, - pubkey::Pubkey, - reserved_account_keys::ReservedAccountKeys, - signature::Signature, - stake, - transaction::{TransactionError, TransactionVersion, VersionedTransaction}, - }, + solana_clock::UnixTimestamp, + solana_hash::Hash, + solana_message::{compiled_instruction::CompiledInstruction, v0::MessageAddressTableLookup}, + solana_native_token::lamports_to_sol, + solana_program::stake, + solana_pubkey::Pubkey, + solana_reserved_account_keys::ReservedAccountKeys, + solana_signature::Signature, + solana_transaction::versioned::{TransactionVersion, VersionedTransaction}, + solana_transaction_error::TransactionError, solana_transaction_status::{ Rewards, UiReturnDataEncoding, UiTransactionReturnData, UiTransactionStatusMeta, }, @@ -440,24 +438,29 @@ fn write_instruction<'a, W: io::Write>( let mut raw = true; if let AccountKeyType::Known(program_pubkey) = program_pubkey { if program_pubkey == &solana_vote_program::id() { - if let Ok(vote_instruction) = limited_deserialize::< - solana_vote_program::vote_instruction::VoteInstruction, - >(&instruction.data) + if let Ok(vote_instruction) = + limited_deserialize::( + &instruction.data, + solana_packet::PACKET_DATA_SIZE as u64, + ) { writeln!(w, "{prefix} {vote_instruction:?}")?; raw = false; } } else if program_pubkey == &stake::program::id() { - if let Ok(stake_instruction) = - limited_deserialize::(&instruction.data) - { + if let Ok(stake_instruction) = limited_deserialize::( + &instruction.data, + solana_packet::PACKET_DATA_SIZE as u64, + ) { writeln!(w, "{prefix} {stake_instruction:?}")?; raw = false; } - } else if program_pubkey == &solana_sdk::system_program::id() { - if let Ok(system_instruction) = limited_deserialize::< - solana_sdk::system_instruction::SystemInstruction, - >(&instruction.data) + } else if program_pubkey == &solana_sdk_ids::system_program::id() { + if let Ok(system_instruction) = + limited_deserialize::( + &instruction.data, + solana_packet::PACKET_DATA_SIZE as u64, + ) { writeln!(w, "{prefix} {system_instruction:?}")?; raw = false; @@ -723,16 +726,15 @@ pub fn unix_timestamp_to_string(unix_timestamp: UnixTimestamp) -> String { mod test { use { super::*, - solana_sdk::{ - message::{ - v0::{self, LoadedAddresses}, - Message as LegacyMessage, MessageHeader, VersionedMessage, - }, - pubkey::Pubkey, - signature::{Keypair, Signer}, - transaction::Transaction, - transaction_context::TransactionReturnData, + solana_keypair::Keypair, + solana_message::{ + v0::{self, LoadedAddresses}, + Message as LegacyMessage, MessageHeader, VersionedMessage, }, + solana_pubkey::Pubkey, + solana_signer::Signer, + solana_transaction::Transaction, + solana_transaction_context::TransactionReturnData, solana_transaction_status::{Reward, RewardType, TransactionStatusMeta}, std::io::BufWriter, }; diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 4bc43647e7e6fa..b163d9e8664892 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -5304,11 +5304,27 @@ dependencies = [ "semver", "serde", "serde_json", + "solana-account", "solana-account-decoder", + "solana-bincode", "solana-clap-utils", "solana-cli-config", + "solana-clock", + "solana-epoch-info", + "solana-hash", + "solana-message", + "solana-native-token", + "solana-packet", + "solana-program", + "solana-pubkey", + "solana-reserved-account-keys", "solana-rpc-client-api", - "solana-sdk", + "solana-sdk-ids", + "solana-signature", + "solana-system-interface", + "solana-sysvar", + "solana-transaction", + "solana-transaction-error", "solana-transaction-status", "solana-vote-program", "spl-memo", diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index a22877d3519108..3230817a4da1d2 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -5155,11 +5155,27 @@ dependencies = [ "semver", "serde", "serde_json", + "solana-account", "solana-account-decoder", + "solana-bincode", "solana-clap-utils", "solana-cli-config", + "solana-clock", + "solana-epoch-info", + "solana-hash", + "solana-message", + "solana-native-token", + "solana-packet", + "solana-program", + "solana-pubkey", + "solana-reserved-account-keys", "solana-rpc-client-api", - "solana-sdk", + "solana-sdk-ids", + "solana-signature", + "solana-system-interface", + "solana-sysvar", + "solana-transaction", + "solana-transaction-error", "solana-transaction-status", "solana-vote-program", "spl-memo", From 48930187e348b01d93a0d6e8990f3ee481b856c3 Mon Sep 17 00:00:00 2001 From: Alex Pyattaev Date: Fri, 10 Jan 2025 17:55:10 +0300 Subject: [PATCH 2/5] remove test-only function from prod code (#4355) rename function that is for tests only --- gossip/src/cluster_info.rs | 2 +- local-cluster/src/cluster_tests.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 8a21e9fab7e749..d8649a9150b37c 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -3035,7 +3035,7 @@ impl Node { } } -pub fn push_messages_to_peer( +pub fn push_messages_to_peer_for_tests( messages: Vec, self_id: Pubkey, peer_gossip: SocketAddr, diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index e02645601282b3..277165f681d869 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -680,7 +680,7 @@ pub fn submit_vote_to_cluster_gossip( None, ); - cluster_info::push_messages_to_peer( + cluster_info::push_messages_to_peer_for_tests( vec![CrdsValue::new( CrdsData::Vote( 0, From e8463351d94a49543e00ba8eee559f29b93bd0ed Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sat, 11 Jan 2025 00:13:55 +0900 Subject: [PATCH 3/5] Support tx poh recording in unified scheduler (#4150) * Support tx poh recording in unified scheduler * Make transaction_indexes allocation conditional * Replace Option> with saner type * Explain the odd transaction_status_sender.is_some() * Explain about Option-ed closures * Rename CommitFailed => CommitCancelled * Fix typos * Document and simplify pre_commit_callback handling * Clean up pre_commit_callback wrapping code * Extend pre_commit_callback for existing bailouts * Clean up index population * Feeeze-lock block-producing unified scheduler bank --- Cargo.lock | 5 + ledger/benches/blockstore_processor.rs | 1 + ledger/src/blockstore_processor.rs | 372 ++++++++++++++---- poh/src/poh_recorder.rs | 38 +- programs/sbf/Cargo.lock | 2 + runtime/Cargo.toml | 1 + runtime/src/bank.rs | 154 +++++--- runtime/src/installed_scheduler_pool.rs | 21 +- sdk/transaction-error/src/lib.rs | 5 + storage-proto/proto/transaction_by_addr.proto | 1 + storage-proto/src/convert.rs | 4 + svm/examples/Cargo.lock | 2 + svm/src/transaction_processing_result.rs | 13 + transaction-status/src/token_balances.rs | 1 + unified-scheduler-logic/src/lib.rs | 6 + unified-scheduler-pool/Cargo.toml | 4 + unified-scheduler-pool/src/lib.rs | 197 +++++++++- 17 files changed, 692 insertions(+), 135 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a9b0f0e2fb25b8..88da797360e920 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8853,6 +8853,7 @@ dependencies = [ "solana-system-program", "solana-timings", "solana-transaction-status-client-types", + "solana-unified-scheduler-logic", "solana-version", "solana-vote", "solana-vote-program", @@ -10124,9 +10125,12 @@ dependencies = [ "qualifier_attr", "scopeguard", "solana-clock", + "solana-entry", + "solana-hash", "solana-keypair", "solana-ledger", "solana-logger", + "solana-poh", "solana-pubkey", "solana-runtime", "solana-runtime-transaction", @@ -10136,6 +10140,7 @@ dependencies = [ "solana-transaction-error", "solana-unified-scheduler-logic", "static_assertions", + "test-case", "vec_extract_if_polyfill", ] diff --git a/ledger/benches/blockstore_processor.rs b/ledger/benches/blockstore_processor.rs index 44f65db1d54fd4..c4d39cb1813b73 100644 --- a/ledger/benches/blockstore_processor.rs +++ b/ledger/benches/blockstore_processor.rs @@ -162,6 +162,7 @@ fn bench_execute_batch( &mut timing, None, &prioritization_fee_cache, + None:: _>, ); } }); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index ac6b578c193e6e..6f0f35d8f4bb7a 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -28,7 +28,7 @@ use { solana_rayon_threadlimit::{get_max_thread_count, get_thread_count}, solana_runtime::{ accounts_background_service::{AbsRequestSender, SnapshotRequestKind}, - bank::{Bank, TransactionBalancesSet}, + bank::{Bank, PreCommitResult, TransactionBalancesSet}, bank_forks::{BankForks, SetRootError}, bank_utils, commitment::VOTE_THRESHOLD_SIZE, @@ -54,6 +54,7 @@ use { }, solana_svm::{ transaction_commit_result::{TransactionCommitResult, TransactionCommitResultExtensions}, + transaction_processing_result::{ProcessedTransaction, TransactionProcessingResult}, transaction_processor::ExecutionRecordingConfig, }, solana_svm_transaction::{svm_message::SVMMessage, svm_transaction::SVMTransaction}, @@ -61,6 +62,7 @@ use { solana_transaction_status::token_balances::TransactionTokenBalancesSet, solana_vote::vote_account::VoteAccountsHashMap, std::{ + borrow::Cow, collections::{HashMap, HashSet}, num::Saturating, ops::{Index, Range}, @@ -107,13 +109,13 @@ fn first_err(results: &[Result<()>]) -> Result<()> { } // Includes transaction signature for unit-testing -fn get_first_error( - batch: &TransactionBatch, - commit_results: &[TransactionCommitResult], +fn do_get_first_error( + batch: &TransactionBatch, + results: &[Result], ) -> Option<(Result<()>, Signature)> { let mut first_err = None; - for (commit_result, transaction) in commit_results.iter().zip(batch.sanitized_transactions()) { - if let Err(err) = commit_result { + for (result, transaction) in results.iter().zip(batch.sanitized_transactions()) { + if let Err(err) = result { if first_err.is_none() { first_err = Some((Err(err.clone()), *transaction.signature())); } @@ -134,6 +136,15 @@ fn get_first_error( first_err } +fn get_first_error( + batch: &TransactionBatch, + commit_results: &[Result], +) -> Result<()> { + do_get_first_error(batch, commit_results) + .map(|(error, _signature)| error) + .unwrap_or(Ok(())) +} + fn create_thread_pool(num_threads: usize) -> ThreadPool { rayon::ThreadPoolBuilder::new() .num_threads(num_threads) @@ -142,20 +153,26 @@ fn create_thread_pool(num_threads: usize) -> ThreadPool { .expect("new rayon threadpool") } -pub fn execute_batch( - batch: &TransactionBatchWithIndexes, - bank: &Arc, - transaction_status_sender: Option<&TransactionStatusSender>, - replay_vote_sender: Option<&ReplayVoteSender>, - timings: &mut ExecuteTimings, +pub fn execute_batch<'a>( + batch: &'a TransactionBatchWithIndexes, + bank: &'a Arc, + transaction_status_sender: Option<&'a TransactionStatusSender>, + replay_vote_sender: Option<&'a ReplayVoteSender>, + timings: &'a mut ExecuteTimings, log_messages_bytes_limit: Option, - prioritization_fee_cache: &PrioritizationFeeCache, + prioritization_fee_cache: &'a PrioritizationFeeCache, + // None is meaningfully used to detect this is called from the block producing unified + // scheduler. If so, suppress too verbose logging for the code path. + extra_pre_commit_callback: Option< + impl FnOnce(&Result) -> Result>, + >, ) -> Result<()> { let TransactionBatchWithIndexes { batch, transaction_indexes, } = batch; let record_token_balances = transaction_status_sender.is_some(); + let mut transaction_indexes = Cow::from(transaction_indexes); let mut mint_decimals: HashMap = HashMap::new(); @@ -165,14 +182,53 @@ pub fn execute_batch( vec![] }; - let (commit_results, balances) = batch.bank().load_execute_and_commit_transactions( - batch, - MAX_PROCESSING_AGE, - transaction_status_sender.is_some(), - ExecutionRecordingConfig::new_single_setting(transaction_status_sender.is_some()), - timings, - log_messages_bytes_limit, - ); + let pre_commit_callback = |timings: &mut _, processing_results: &_| -> PreCommitResult { + match extra_pre_commit_callback { + None => { + get_first_error(batch, processing_results)?; + check_block_cost_limits_if_enabled(batch, bank, timings, processing_results)?; + Ok(None) + } + Some(extra_pre_commit_callback) => { + // We're entering into the block-producing unified scheduler special case... + // `processing_results` should always contain exactly only 1 result in that case. + assert_eq!(processing_results.len(), 1); + assert!(transaction_indexes.is_empty()); + + // From now on, we need to freeze-lock the tpu bank, in order to prevent it from + // freezing in the middle of this code-path. Otherwise, the assertion at the start + // of commit_transactions() would trigger panic because it's fatal runtime + // invariant violation. + let freeze_lock = bank.freeze_lock(); + + if let Some(index) = extra_pre_commit_callback(&processing_results[0])? { + let transaction_indexes = transaction_indexes.to_mut(); + // Adjust the empty new vec with the exact needed capacity. Otherwise, excess + // cap would be reserved on `.push()` in it. + transaction_indexes.reserve_exact(1); + transaction_indexes.push(index); + } + // At this point, poh should have been succeeded so it's guaranteed that the bank + // hasn't been frozen yet and we're still holding the lock. So, it's okay to pass + // down freeze_lock without any introspection here to be unconditionally dropped + // after commit_transactions(). This reasoning is same as + // solana_core::banking_stage::Consumer::execute_and_commit_transactions_locked() + Ok(Some(freeze_lock)) + } + } + }; + + let (commit_results, balances) = batch + .bank() + .load_execute_and_commit_transactions_with_pre_commit_callback( + batch, + MAX_PROCESSING_AGE, + transaction_status_sender.is_some(), + ExecutionRecordingConfig::new_single_setting(transaction_status_sender.is_some()), + timings, + log_messages_bytes_limit, + pre_commit_callback, + )?; bank_utils::find_and_send_votes( batch.sanitized_transactions(), @@ -180,29 +236,12 @@ pub fn execute_batch( replay_vote_sender, ); - let (check_block_cost_limits_result, check_block_cost_limits_us) = measure_us!(if bank - .feature_set - .is_active(&solana_feature_set::apply_cost_tracker_during_replay::id()) - { - check_block_cost_limits(bank, &commit_results, batch.sanitized_transactions()) - } else { - Ok(()) - }); - - timings.saturating_add_in_place( - ExecuteTimingType::CheckBlockLimitsUs, - check_block_cost_limits_us, - ); - check_block_cost_limits_result?; - let committed_transactions = commit_results .iter() .zip(batch.sanitized_transactions()) .filter_map(|(commit_result, tx)| commit_result.was_committed().then_some(tx)) .collect_vec(); - let first_err = get_first_error(batch, &commit_results); - if let Some(transaction_status_sender) = transaction_status_sender { let transactions: Vec = batch .sanitized_transactions() @@ -224,13 +263,13 @@ pub fn execute_batch( commit_results, balances, token_balances, - transaction_indexes.to_vec(), + transaction_indexes.into_owned(), ); } prioritization_fee_cache.update(bank, committed_transactions.into_iter()); - first_err.map(|(result, _)| result).unwrap_or(Ok(())) + Ok(()) } // collect transactions actual execution costs, subject to block limits; @@ -238,20 +277,20 @@ pub fn execute_batch( // reported to metric `replay-stage-mark_dead_slot` fn check_block_cost_limits( bank: &Bank, - commit_results: &[TransactionCommitResult], + processing_results: &[TransactionProcessingResult], sanitized_transactions: &[impl TransactionWithMeta], ) -> Result<()> { - assert_eq!(sanitized_transactions.len(), commit_results.len()); + assert_eq!(sanitized_transactions.len(), processing_results.len()); - let tx_costs_with_actual_execution_units: Vec<_> = commit_results + let tx_costs_with_actual_execution_units: Vec<_> = processing_results .iter() .zip(sanitized_transactions) - .filter_map(|(commit_result, tx)| { - if let Ok(committed_tx) = commit_result { + .filter_map(|(processing_result, tx)| { + if let Ok(processed_tx) = processing_result { Some(CostModel::calculate_cost_for_executed_transaction( tx, - committed_tx.executed_units, - committed_tx.loaded_account_stats.loaded_accounts_data_size, + processed_tx.executed_units(), + processed_tx.loaded_accounts_data_size(), &bank.feature_set, )) } else { @@ -271,6 +310,28 @@ fn check_block_cost_limits( Ok(()) } +fn check_block_cost_limits_if_enabled( + batch: &TransactionBatch, + bank: &Bank, + timings: &mut ExecuteTimings, + processing_results: &[TransactionProcessingResult], +) -> Result<()> { + let (check_block_cost_limits_result, check_block_cost_limits_us) = measure_us!(if bank + .feature_set + .is_active(&solana_feature_set::apply_cost_tracker_during_replay::id()) + { + check_block_cost_limits(bank, processing_results, batch.sanitized_transactions()) + } else { + Ok(()) + }); + + timings.saturating_add_in_place( + ExecuteTimingType::CheckBlockLimitsUs, + check_block_cost_limits_us, + ); + check_block_cost_limits_result +} + #[derive(Default)] pub struct ExecuteBatchesInternalMetrics { execution_timings_per_thread: HashMap, @@ -322,6 +383,7 @@ fn execute_batches_internal( &mut timings, log_messages_bytes_limit, prioritization_fee_cache, + None:: _>, )); let thread_index = replay_tx_thread_pool.current_thread_index().unwrap(); @@ -2202,11 +2264,13 @@ pub fn process_single_slot( } #[allow(clippy::large_enum_variant)] +#[derive(Debug)] pub enum TransactionStatusMessage { Batch(TransactionStatusBatch), Freeze(Slot), } +#[derive(Debug)] pub struct TransactionStatusBatch { pub slot: Slot, pub transactions: Vec, @@ -2332,12 +2396,10 @@ pub mod tests { solana_sdk::{ account::{AccountSharedData, WritableAccount}, epoch_schedule::EpochSchedule, - fee::FeeDetails, hash::Hash, instruction::{Instruction, InstructionError}, native_token::LAMPORTS_PER_SOL, pubkey::Pubkey, - rent_debits::RentDebits, signature::{Keypair, Signer}, signer::SeedDerivable, system_instruction::SystemError, @@ -2345,8 +2407,9 @@ pub mod tests { transaction::{Transaction, TransactionError}, }, solana_svm::{ - transaction_commit_result::CommittedTransaction, - transaction_execution_result::TransactionLoadedAccountsStats, + account_loader::LoadedTransaction, + transaction_execution_result::{ExecutedTransaction, TransactionExecutionDetails}, + transaction_processing_result::ProcessedTransaction, transaction_processor::ExecutionRecordingConfig, }, solana_vote::vote_account::VoteAccount, @@ -2355,8 +2418,8 @@ pub mod tests { vote_state::{TowerSync, VoteState, VoteStateVersions, MAX_LOCKOUT_HISTORY}, vote_transaction, }, - std::{collections::BTreeSet, sync::RwLock}, - test_case::test_case, + std::{collections::BTreeSet, slice, sync::RwLock}, + test_case::{test_case, test_matrix}, trees::tr, }; @@ -3355,17 +3418,19 @@ pub mod tests { assert_matches!(bank.transfer(4, &mint_keypair, &keypair2.pubkey()), Ok(_)); assert_matches!(bank.transfer(4, &mint_keypair, &keypair4.pubkey()), Ok(_)); + let good_tx = system_transaction::transfer( + &keypair1, + &mint_keypair.pubkey(), + 1, + bank.last_blockhash(), + ); + // construct an Entry whose 2nd transaction would cause a lock conflict with previous entry let entry_1_to_mint = next_entry( &bank.last_blockhash(), 1, vec![ - system_transaction::transfer( - &keypair1, - &mint_keypair.pubkey(), - 1, - bank.last_blockhash(), - ), + good_tx.clone(), system_transaction::transfer( &keypair4, &keypair4.pubkey(), @@ -3394,14 +3459,16 @@ pub mod tests { ], ); - assert!(process_entries_for_tests_without_scheduler( - &bank, - vec![entry_1_to_mint.clone(), entry_2_to_3_mint_to_1.clone()], - ) - .is_err()); + assert_matches!( + process_entries_for_tests_without_scheduler( + &bank, + vec![entry_1_to_mint.clone(), entry_2_to_3_mint_to_1.clone()], + ), + Err(TransactionError::BlockhashNotFound) + ); - // First transaction in first entry succeeded, so keypair1 lost 1 lamport - assert_eq!(bank.get_balance(&keypair1.pubkey()), 3); + // First transaction in first entry was rolled-back, so keypair1 didn't lost 1 lamport + assert_eq!(bank.get_balance(&keypair1.pubkey()), 4); assert_eq!(bank.get_balance(&keypair2.pubkey()), 4); // Check all accounts are unlocked @@ -3417,6 +3484,16 @@ pub mod tests { for result in batch2.lock_results() { assert!(result.is_ok()); } + drop(batch2); + + // ensure good_tx will succeed and was just rolled back above due to other failing tx + let entry_3 = next_entry(&entry_2_to_3_mint_to_1.hash, 1, vec![good_tx]); + assert_matches!( + process_entries_for_tests_without_scheduler(&bank, vec![entry_3]), + Ok(()) + ); + // First transaction in third entry succeeded, so keypair1 lost 1 lamport + assert_eq!(bank.get_balance(&keypair1.pubkey()), 3); } #[test_case(true; "rent_collected")] @@ -4392,7 +4469,7 @@ pub mod tests { &mut ExecuteTimings::default(), None, ); - let (err, signature) = get_first_error(&batch, &commit_results).unwrap(); + let (err, signature) = do_get_first_error(&batch, &commit_results).unwrap(); assert_eq!(err.unwrap_err(), TransactionError::AccountNotFound); assert_eq!(signature, account_not_found_sig); } @@ -5041,6 +5118,133 @@ pub mod tests { do_test_schedule_batches_for_execution(false); } + enum TxResult { + ExecutedWithSuccess, + ExecutedWithFailure, + NotExecuted, + } + + #[test_matrix( + [TxResult::ExecutedWithSuccess, TxResult::ExecutedWithFailure, TxResult::NotExecuted], + [Ok(None), Ok(Some(4)), Err(TransactionError::CommitCancelled)] + )] + fn test_execute_batch_pre_commit_callback( + tx_result: TxResult, + poh_result: Result>, + ) { + solana_logger::setup(); + let dummy_leader_pubkey = solana_sdk::pubkey::new_rand(); + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100); + let bank = Bank::new_for_tests(&genesis_config); + let (bank, _bank_forks) = bank.wrap_with_bank_forks_for_tests(); + let bank = Arc::new(bank); + let pubkey = solana_sdk::pubkey::new_rand(); + let (tx, expected_tx_result) = match tx_result { + TxResult::ExecutedWithSuccess => ( + RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + genesis_config.hash(), + )), + Ok(()), + ), + TxResult::ExecutedWithFailure => ( + RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer( + &mint_keypair, + &pubkey, + 100000000, + genesis_config.hash(), + )), + Ok(()), + ), + TxResult::NotExecuted => ( + RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + Hash::default(), + )), + Err(TransactionError::BlockhashNotFound), + ), + }; + let mut batch = TransactionBatch::new( + vec![Ok(()); 1], + &bank, + OwnedOrBorrowed::Borrowed(slice::from_ref(&tx)), + ); + batch.set_needs_unlock(false); + let poh_with_index = matches!(&poh_result, Ok(Some(_))); + let batch = TransactionBatchWithIndexes { + batch, + transaction_indexes: vec![], + }; + let prioritization_fee_cache = PrioritizationFeeCache::default(); + let mut timing = ExecuteTimings::default(); + let (sender, receiver) = crossbeam_channel::unbounded(); + + assert_eq!(bank.transaction_count(), 0); + assert_eq!(bank.transaction_error_count(), 0); + let should_commit = poh_result.is_ok(); + let mut is_called = false; + let result = execute_batch( + &batch, + &bank, + Some(&TransactionStatusSender { sender }), + None, + &mut timing, + None, + &prioritization_fee_cache, + Some(|processing_result: &'_ Result<_>| { + is_called = true; + let ok = poh_result?; + if let Err(error) = processing_result { + Err(error.clone())?; + }; + Ok(ok) + }), + ); + + // pre_commit_callback() should alwasy be called regardless of tx_result + assert!(is_called); + + if should_commit { + assert_eq!(result, expected_tx_result); + if expected_tx_result.is_ok() { + assert_eq!(bank.transaction_count(), 1); + if matches!(tx_result, TxResult::ExecutedWithFailure) { + assert_eq!(bank.transaction_error_count(), 1); + } else { + assert_eq!(bank.transaction_error_count(), 0); + } + } else { + assert_eq!(bank.transaction_count(), 0); + } + } else { + assert_matches!(result, Err(TransactionError::CommitCancelled)); + assert_eq!(bank.transaction_count(), 0); + } + if poh_with_index && expected_tx_result.is_ok() { + assert_matches!( + receiver.try_recv(), + Ok(TransactionStatusMessage::Batch(TransactionStatusBatch{transaction_indexes, ..})) + if transaction_indexes == vec![4_usize] + ); + } else if should_commit && expected_tx_result.is_ok() { + assert_matches!( + receiver.try_recv(), + Ok(TransactionStatusMessage::Batch(TransactionStatusBatch{transaction_indexes, ..})) + if transaction_indexes.is_empty() + ); + } else { + assert_matches!(receiver.try_recv(), Err(_)); + } + } + #[test] fn test_confirm_slot_entries_with_fix() { const HASHES_PER_TICK: u64 = 10; @@ -5198,27 +5402,31 @@ pub mod tests { .unwrap() .set_limits(u64::MAX, block_limit, u64::MAX); let txs = vec![tx.clone(), tx]; - let commit_results = vec![ - Ok(CommittedTransaction { - status: Ok(()), - log_messages: None, - inner_instructions: None, - return_data: None, - executed_units: actual_execution_cu, - fee_details: FeeDetails::default(), - rent_debits: RentDebits::default(), - loaded_account_stats: TransactionLoadedAccountsStats { - loaded_accounts_data_size: actual_loaded_accounts_data_size, - loaded_accounts_count: 2, + let processing_results = vec![ + Ok(ProcessedTransaction::Executed(Box::new( + ExecutedTransaction { + execution_details: TransactionExecutionDetails { + status: Ok(()), + log_messages: None, + inner_instructions: None, + return_data: None, + executed_units: actual_execution_cu, + accounts_data_len_delta: 0, + }, + loaded_transaction: LoadedTransaction { + loaded_accounts_data_size: actual_loaded_accounts_data_size, + ..LoadedTransaction::default() + }, + programs_modified_by_tx: HashMap::new(), }, - }), + ))), Err(TransactionError::AccountNotFound), ]; - assert!(check_block_cost_limits(&bank, &commit_results, &txs).is_ok()); + assert!(check_block_cost_limits(&bank, &processing_results, &txs).is_ok()); assert_eq!( Err(TransactionError::WouldExceedMaxBlockCostLimit), - check_block_cost_limits(&bank, &commit_results, &txs) + check_block_cost_limits(&bank, &processing_results, &txs) ); } } diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index 0fcdb217e05fae..5a16cc791b0d42 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -135,7 +135,7 @@ pub struct RecordTransactionsSummary { pub starting_transaction_index: Option, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct TransactionRecorder { // shared by all users of PohRecorder pub record_sender: Sender, @@ -1139,11 +1139,12 @@ impl PohRecorder { } } -pub fn create_test_recorder( +fn do_create_test_recorder( bank: Arc, blockstore: Arc, poh_config: Option, leader_schedule_cache: Option>, + track_transaction_indexes: bool, ) -> ( Arc, Arc>, @@ -1169,7 +1170,10 @@ pub fn create_test_recorder( ); let ticks_per_slot = bank.ticks_per_slot(); - poh_recorder.set_bank(BankWithScheduler::new_without_scheduler(bank), false); + poh_recorder.set_bank( + BankWithScheduler::new_without_scheduler(bank), + track_transaction_indexes, + ); let poh_recorder = Arc::new(RwLock::new(poh_recorder)); let poh_service = PohService::new( poh_recorder.clone(), @@ -1184,6 +1188,34 @@ pub fn create_test_recorder( (exit, poh_recorder, poh_service, entry_receiver) } +pub fn create_test_recorder( + bank: Arc, + blockstore: Arc, + poh_config: Option, + leader_schedule_cache: Option>, +) -> ( + Arc, + Arc>, + PohService, + Receiver, +) { + do_create_test_recorder(bank, blockstore, poh_config, leader_schedule_cache, false) +} + +pub fn create_test_recorder_with_index_tracking( + bank: Arc, + blockstore: Arc, + poh_config: Option, + leader_schedule_cache: Option>, +) -> ( + Arc, + Arc>, + PohService, + Receiver, +) { + do_create_test_recorder(bank, blockstore, poh_config, leader_schedule_cache, true) +} + #[cfg(test)] mod tests { use { diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index b163d9e8664892..fd4cd2c2049ed5 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6949,6 +6949,7 @@ dependencies = [ "solana-system-program", "solana-timings", "solana-transaction-status-client-types", + "solana-unified-scheduler-logic", "solana-version", "solana-vote", "solana-vote-program", @@ -8379,6 +8380,7 @@ dependencies = [ "qualifier_attr", "scopeguard", "solana-ledger", + "solana-poh", "solana-pubkey", "solana-runtime", "solana-runtime-transaction", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 1556a459f5e0c0..0687e594a9dead 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -82,6 +82,7 @@ solana-svm-transaction = { workspace = true } solana-system-program = { workspace = true, optional = true } solana-timings = { workspace = true } solana-transaction-status-client-types = { workspace = true } +solana-unified-scheduler-logic = { workspace = true } solana-version = { workspace = true } solana-vote = { workspace = true } solana-vote-program = { workspace = true } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index a37aa26460cb62..11b34db594c4dd 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -249,7 +249,7 @@ struct RentMetrics { pub type BankStatusCache = StatusCache>; #[cfg_attr( feature = "frozen-abi", - frozen_abi(digest = "BHg4qpwegtaJypLUqAdjQYzYeLfEGf6tA4U5cREbHMHi") + frozen_abi(digest = "4e7a7AAsQrM5Lp5bhREdVZ5QGZfyETbBthhWjYMYb6zS") )] pub type BankSlotDelta = SlotDelta>; @@ -343,7 +343,7 @@ pub struct TransactionSimulationResult { pub inner_instructions: Option>, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct TransactionBalancesSet { pub pre_balances: TransactionBalances, pub post_balances: TransactionBalances, @@ -360,6 +360,8 @@ impl TransactionBalancesSet { } pub type TransactionBalances = Vec>; +pub type PreCommitResult<'a> = Result>>; + #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] pub enum TransactionLogCollectorFilter { All, @@ -3787,52 +3789,56 @@ impl Bank { ) -> Vec { processing_results .into_iter() - .map(|processing_result| match processing_result? { - ProcessedTransaction::Executed(executed_tx) => { - let execution_details = executed_tx.execution_details; - let LoadedTransaction { - rent_debits, - accounts: loaded_accounts, - loaded_accounts_data_size, - fee_details, - .. - } = executed_tx.loaded_transaction; - - // Rent is only collected for successfully executed transactions - let rent_debits = if execution_details.was_successful() { - rent_debits - } else { - RentDebits::default() - }; + .map(|processing_result| { + let processing_result = processing_result?; + let executed_units = processing_result.executed_units(); + let loaded_accounts_data_size = processing_result.loaded_accounts_data_size(); + + match processing_result { + ProcessedTransaction::Executed(executed_tx) => { + let execution_details = executed_tx.execution_details; + let LoadedTransaction { + rent_debits, + accounts: loaded_accounts, + fee_details, + .. + } = executed_tx.loaded_transaction; + + // Rent is only collected for successfully executed transactions + let rent_debits = if execution_details.was_successful() { + rent_debits + } else { + RentDebits::default() + }; - Ok(CommittedTransaction { - status: execution_details.status, - log_messages: execution_details.log_messages, - inner_instructions: execution_details.inner_instructions, - return_data: execution_details.return_data, - executed_units: execution_details.executed_units, - fee_details, - rent_debits, + Ok(CommittedTransaction { + status: execution_details.status, + log_messages: execution_details.log_messages, + inner_instructions: execution_details.inner_instructions, + return_data: execution_details.return_data, + executed_units, + fee_details, + rent_debits, + loaded_account_stats: TransactionLoadedAccountsStats { + loaded_accounts_count: loaded_accounts.len(), + loaded_accounts_data_size, + }, + }) + } + ProcessedTransaction::FeesOnly(fees_only_tx) => Ok(CommittedTransaction { + status: Err(fees_only_tx.load_error), + log_messages: None, + inner_instructions: None, + return_data: None, + executed_units, + rent_debits: RentDebits::default(), + fee_details: fees_only_tx.fee_details, loaded_account_stats: TransactionLoadedAccountsStats { - loaded_accounts_count: loaded_accounts.len(), + loaded_accounts_count: fees_only_tx.rollback_accounts.count(), loaded_accounts_data_size, }, - }) + }), } - ProcessedTransaction::FeesOnly(fees_only_tx) => Ok(CommittedTransaction { - status: Err(fees_only_tx.load_error), - log_messages: None, - inner_instructions: None, - return_data: None, - executed_units: 0, - rent_debits: RentDebits::default(), - fee_details: fees_only_tx.fee_details, - loaded_account_stats: TransactionLoadedAccountsStats { - loaded_accounts_count: fees_only_tx.rollback_accounts.count(), - loaded_accounts_data_size: fees_only_tx.rollback_accounts.data_size() - as u32, - }, - }), }) .collect() } @@ -4520,6 +4526,54 @@ impl Bank { timings: &mut ExecuteTimings, log_messages_bytes_limit: Option, ) -> (Vec, TransactionBalancesSet) { + self.do_load_execute_and_commit_transactions_with_pre_commit_callback( + batch, + max_age, + collect_balances, + recording_config, + timings, + log_messages_bytes_limit, + None:: _>, + ) + .unwrap() + } + + pub fn load_execute_and_commit_transactions_with_pre_commit_callback<'a>( + &'a self, + batch: &TransactionBatch, + max_age: usize, + collect_balances: bool, + recording_config: ExecutionRecordingConfig, + timings: &mut ExecuteTimings, + log_messages_bytes_limit: Option, + pre_commit_callback: impl FnOnce( + &mut ExecuteTimings, + &[TransactionProcessingResult], + ) -> PreCommitResult<'a>, + ) -> Result<(Vec, TransactionBalancesSet)> { + self.do_load_execute_and_commit_transactions_with_pre_commit_callback( + batch, + max_age, + collect_balances, + recording_config, + timings, + log_messages_bytes_limit, + Some(pre_commit_callback), + ) + } + + fn do_load_execute_and_commit_transactions_with_pre_commit_callback<'a>( + &'a self, + batch: &TransactionBatch, + max_age: usize, + collect_balances: bool, + recording_config: ExecutionRecordingConfig, + timings: &mut ExecuteTimings, + log_messages_bytes_limit: Option, + pre_commit_callback: Option< + impl FnOnce(&mut ExecuteTimings, &[TransactionProcessingResult]) -> PreCommitResult<'a>, + >, + ) -> Result<(Vec, TransactionBalancesSet)> { let pre_balances = if collect_balances { self.collect_balances(batch) } else { @@ -4545,21 +4599,31 @@ impl Bank { }, ); + // pre_commit_callback could initiate an atomic operation (i.e. poh recording with block + // producing unified scheduler). in that case, it returns Some(freeze_lock), which should + // unlocked only after calling commit_transactions() immediately after calling the + // callback. + let freeze_lock = if let Some(pre_commit_callback) = pre_commit_callback { + pre_commit_callback(timings, &processing_results)? + } else { + None + }; let commit_results = self.commit_transactions( batch.sanitized_transactions(), processing_results, &processed_counts, timings, ); + drop(freeze_lock); let post_balances = if collect_balances { self.collect_balances(batch) } else { vec![] }; - ( + Ok(( commit_results, TransactionBalancesSet::new(pre_balances, post_balances), - ) + )) } /// Process a Transaction. This is used for unit tests and simply calls the vector diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs index fb6bb3dc79c732..69e806c5f48ce5 100644 --- a/runtime/src/installed_scheduler_pool.rs +++ b/runtime/src/installed_scheduler_pool.rs @@ -30,6 +30,7 @@ use { transaction::{Result, SanitizedTransaction, TransactionError}, }, solana_timings::ExecuteTimings, + solana_unified_scheduler_logic::SchedulingMode, std::{ fmt::{self, Debug}, mem, @@ -227,13 +228,29 @@ pub type SchedulerId = u64; /// `SchedulingContext`s. #[derive(Clone, Debug)] pub struct SchedulingContext { - // mode: SchedulingMode, // this will be added later. + mode: SchedulingMode, bank: Arc, } impl SchedulingContext { pub fn new(bank: Arc) -> Self { - Self { bank } + // mode will be configurable later + Self { + mode: SchedulingMode::BlockVerification, + bank, + } + } + + #[cfg(feature = "dev-context-only-utils")] + pub fn for_production(bank: Arc) -> Self { + Self { + mode: SchedulingMode::BlockProduction, + bank, + } + } + + pub fn mode(&self) -> SchedulingMode { + self.mode } pub fn bank(&self) -> &Arc { diff --git a/sdk/transaction-error/src/lib.rs b/sdk/transaction-error/src/lib.rs index 433a48b0122e31..2f5b72b960833f 100644 --- a/sdk/transaction-error/src/lib.rs +++ b/sdk/transaction-error/src/lib.rs @@ -137,6 +137,9 @@ pub enum TransactionError { /// Program cache hit max limit. ProgramCacheHitMaxLimit, + + /// Commit cancelled internally. + CommitCancelled, } impl std::error::Error for TransactionError {} @@ -220,6 +223,8 @@ impl fmt::Display for TransactionError { => f.write_str("Sum of account balances before and after transaction do not match"), Self::ProgramCacheHitMaxLimit => f.write_str("Program cache hit max limit"), + Self::CommitCancelled + => f.write_str("CommitCancelled"), } } } diff --git a/storage-proto/proto/transaction_by_addr.proto b/storage-proto/proto/transaction_by_addr.proto index d0fa74a2104707..5748b05655edba 100644 --- a/storage-proto/proto/transaction_by_addr.proto +++ b/storage-proto/proto/transaction_by_addr.proto @@ -63,6 +63,7 @@ enum TransactionErrorType { PROGRAM_EXECUTION_TEMPORARILY_RESTRICTED = 35; UNBALANCED_TRANSACTION = 36; PROGRAM_CACHE_HIT_MAX_LIMIT = 37; + COMMIT_CANCELLED = 38; } message InstructionError { diff --git a/storage-proto/src/convert.rs b/storage-proto/src/convert.rs index 13fb31d9665f2c..cd53e75cbbc7fa 100644 --- a/storage-proto/src/convert.rs +++ b/storage-proto/src/convert.rs @@ -852,6 +852,7 @@ impl TryFrom for TransactionError { 34 => TransactionError::ResanitizationNeeded, 36 => TransactionError::UnbalancedTransaction, 37 => TransactionError::ProgramCacheHitMaxLimit, + 38 => TransactionError::CommitCancelled, _ => return Err("Invalid TransactionError"), }) } @@ -973,6 +974,9 @@ impl From for tx_by_addr::TransactionError { TransactionError::ProgramCacheHitMaxLimit => { tx_by_addr::TransactionErrorType::ProgramCacheHitMaxLimit } + TransactionError::CommitCancelled => { + tx_by_addr::TransactionErrorType::CommitCancelled + } } as i32, instruction_error: match transaction_error { TransactionError::InstructionError(index, ref instruction_error) => { diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index 3230817a4da1d2..f38ef7446fa02c 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -6768,6 +6768,7 @@ dependencies = [ "solana-svm-transaction", "solana-timings", "solana-transaction-status-client-types", + "solana-unified-scheduler-logic", "solana-version", "solana-vote", "solana-vote-program", @@ -7715,6 +7716,7 @@ dependencies = [ "qualifier_attr", "scopeguard", "solana-ledger", + "solana-poh", "solana-pubkey", "solana-runtime", "solana-runtime-transaction", diff --git a/svm/src/transaction_processing_result.rs b/svm/src/transaction_processing_result.rs index c8da2a941c1a9b..52532988f8f3d7 100644 --- a/svm/src/transaction_processing_result.rs +++ b/svm/src/transaction_processing_result.rs @@ -87,4 +87,17 @@ impl ProcessedTransaction { Self::FeesOnly { .. } => None, } } + + pub fn executed_units(&self) -> u64 { + self.execution_details() + .map(|detail| detail.executed_units) + .unwrap_or_default() + } + + pub fn loaded_accounts_data_size(&self) -> u32 { + match self { + Self::Executed(context) => context.loaded_transaction.loaded_accounts_data_size, + Self::FeesOnly(details) => details.rollback_accounts.data_size() as u32, + } + } } diff --git a/transaction-status/src/token_balances.rs b/transaction-status/src/token_balances.rs index 85a85a053f910f..36b46552cc687f 100644 --- a/transaction-status/src/token_balances.rs +++ b/transaction-status/src/token_balances.rs @@ -2,6 +2,7 @@ use crate::TransactionTokenBalance; pub type TransactionTokenBalances = Vec>; +#[derive(Debug)] pub struct TransactionTokenBalancesSet { pub pre_token_balances: TransactionTokenBalances, pub post_token_balances: TransactionTokenBalances, diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index e8e7501fd998cd..c7b5e7a1e8ca76 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -105,6 +105,12 @@ use { std::{collections::VecDeque, mem, sync::Arc}, }; +#[derive(Clone, Copy, Debug)] +pub enum SchedulingMode { + BlockVerification, + BlockProduction, +} + /// Internal utilities. Namely this contains [`ShortCounter`] and [`TokenCell`]. mod utils { use std::{ diff --git a/unified-scheduler-pool/Cargo.toml b/unified-scheduler-pool/Cargo.toml index 3b0a0df66d0ec1..b720642a91906c 100644 --- a/unified-scheduler-pool/Cargo.toml +++ b/unified-scheduler-pool/Cargo.toml @@ -18,6 +18,7 @@ log = { workspace = true } qualifier_attr = { workspace = true } scopeguard = { workspace = true } solana-ledger = { workspace = true } +solana-poh = { workspace = true } solana-pubkey = { workspace = true } solana-runtime = { workspace = true } solana-runtime-transaction = { workspace = true } @@ -32,10 +33,13 @@ vec_extract_if_polyfill = { workspace = true } assert_matches = { workspace = true } lazy_static = { workspace = true } solana-clock = { workspace = true } +solana-entry = { workspace = true } +solana-hash = { workspace = true } solana-keypair = { workspace = true } solana-logger = { workspace = true } solana-runtime = { workspace = true, features = ["dev-context-only-utils"] } solana-system-transaction = { workspace = true } +test-case = { workspace = true } [features] dev-context-only-utils = [] diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index c496ab455e1828..f926945c769089 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -20,6 +20,7 @@ use { solana_ledger::blockstore_processor::{ execute_batch, TransactionBatchWithIndexes, TransactionStatusSender, }, + solana_poh::poh_recorder::{RecordTransactionsSummary, TransactionRecorder}, solana_pubkey::Pubkey, solana_runtime::{ installed_scheduler_pool::{ @@ -35,7 +36,10 @@ use { solana_timings::ExecuteTimings, solana_transaction::sanitized::SanitizedTransaction, solana_transaction_error::{TransactionError, TransactionResult as Result}, - solana_unified_scheduler_logic::{SchedulingStateMachine, Task, UsageQueue}, + solana_unified_scheduler_logic::{ + SchedulingMode::{BlockProduction, BlockVerification}, + SchedulingStateMachine, Task, UsageQueue, + }, static_assertions::const_assert_eq, std::{ fmt::Debug, @@ -100,6 +104,7 @@ pub struct HandlerContext { transaction_status_sender: Option, replay_vote_sender: Option, prioritization_fee_cache: Arc, + transaction_recorder: Option, } pub type DefaultSchedulerPool = @@ -176,6 +181,8 @@ where transaction_status_sender, replay_vote_sender, prioritization_fee_cache, + // will be configurable later + transaction_recorder: None, }, weak_self: weak_self.clone(), next_scheduler_id: AtomicSchedulerId::default(), @@ -436,9 +443,47 @@ impl TaskHandler for DefaultTaskHandler { let index = task.task_index(); let batch = bank.prepare_unlocked_batch_from_single_tx(transaction); + let transaction_indexes = match scheduling_context.mode() { + BlockVerification => vec![index], + BlockProduction => { + // Create a placeholder vec, which will be populated later if + // transaction_status_sender is Some(_). + // transaction_status_sender is usually None for staked nodes because it's only + // used for RPC-related additional data recording. However, a staked node could + // also be running with rpc functionalities during development. So, we need to + // correctly support the use case for produced blocks as well, like verified blocks + // via the replaying stage. + // Refer `record_token_balances` in `execute_batch()` as this treatment is mirrored + // from it. + vec![] + } + }; let batch_with_indexes = TransactionBatchWithIndexes { batch, - transaction_indexes: vec![index], + transaction_indexes, + }; + + let pre_commit_callback = match scheduling_context.mode() { + BlockVerification => None, + BlockProduction => Some(|processing_result: &'_ Result<_>| { + if let Err(error) = processing_result { + Err(error.clone())?; + }; + + let RecordTransactionsSummary { + result, + starting_transaction_index, + .. + } = handler_context + .transaction_recorder + .as_ref() + .unwrap() + .record_transactions(bank.slot(), vec![transaction.to_versioned_transaction()]); + match result { + Ok(()) => Ok(starting_transaction_index), + Err(_) => Err(TransactionError::CommitCancelled), + } + }), }; *result = execute_batch( @@ -449,6 +494,7 @@ impl TaskHandler for DefaultTaskHandler { timings, handler_context.log_messages_bytes_limit, &handler_context.prioritization_fee_cache, + pre_commit_callback, ); sleepless_testing::at(CheckPoint::TaskHandled(index)); } @@ -1473,7 +1519,15 @@ mod tests { crate::sleepless_testing, assert_matches::assert_matches, solana_clock::{Slot, MAX_PROCESSING_AGE}, + solana_hash::Hash, solana_keypair::Keypair, + solana_ledger::{ + blockstore::Blockstore, + blockstore_processor::{TransactionStatusBatch, TransactionStatusMessage}, + create_new_tmp_ledger_auto_delete, + leader_schedule_cache::LeaderScheduleCache, + }, + solana_poh::poh_recorder::create_test_recorder_with_index_tracking, solana_pubkey::Pubkey, solana_runtime::{ bank::Bank, @@ -1487,9 +1541,10 @@ mod tests { solana_transaction::sanitized::SanitizedTransaction, solana_transaction_error::TransactionError, std::{ - sync::{Arc, RwLock}, + sync::{atomic::Ordering, Arc, RwLock}, thread::JoinHandle, }, + test_case::test_matrix, }; #[derive(Debug)] @@ -2937,10 +2992,146 @@ mod tests { transaction_status_sender: None, replay_vote_sender: None, prioritization_fee_cache, + transaction_recorder: None, }; let task = SchedulingStateMachine::create_task(tx, 0, &mut |_| UsageQueue::default()); DefaultTaskHandler::handle(result, timings, scheduling_context, &task, handler_context); assert_matches!(result, Err(TransactionError::AccountLoadedTwice)); } + + enum TxResult { + ExecutedWithSuccess, + ExecutedWithFailure, + NotExecuted, + } + + #[test_matrix( + [TxResult::ExecutedWithSuccess, TxResult::ExecutedWithFailure, TxResult::NotExecuted], + [false, true] + )] + fn test_task_handler_poh_recording(tx_result: TxResult, should_succeed_to_record_to_poh: bool) { + solana_logger::setup(); + + let GenesisConfigInfo { + genesis_config, + ref mint_keypair, + .. + } = solana_ledger::genesis_utils::create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let bank_forks = BankForks::new_rw_arc(bank); + let bank = bank_forks.read().unwrap().working_bank_with_scheduler(); + + let (tx, expected_tx_result) = match tx_result { + TxResult::ExecutedWithSuccess => ( + system_transaction::transfer( + mint_keypair, + &solana_pubkey::new_rand(), + 1, + genesis_config.hash(), + ), + Ok(()), + ), + TxResult::ExecutedWithFailure => ( + system_transaction::transfer( + mint_keypair, + &solana_pubkey::new_rand(), + 1_000_000, + genesis_config.hash(), + ), + Ok(()), + ), + TxResult::NotExecuted => ( + system_transaction::transfer( + mint_keypair, + &solana_pubkey::new_rand(), + 1, + Hash::default(), + ), + Err(TransactionError::BlockhashNotFound), + ), + }; + let tx = RuntimeTransaction::from_transaction_for_tests(tx); + + let result = &mut Ok(()); + let timings = &mut ExecuteTimings::default(); + let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let scheduling_context = &SchedulingContext::for_production(bank.clone()); + let (sender, receiver) = crossbeam_channel::unbounded(); + let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config); + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); + let (exit, poh_recorder, poh_service, signal_receiver) = + create_test_recorder_with_index_tracking( + bank.clone(), + blockstore.clone(), + None, + Some(leader_schedule_cache), + ); + let handler_context = &HandlerContext { + log_messages_bytes_limit: None, + transaction_status_sender: Some(TransactionStatusSender { sender }), + replay_vote_sender: None, + prioritization_fee_cache, + transaction_recorder: Some(poh_recorder.read().unwrap().new_recorder()), + }; + + let task = + SchedulingStateMachine::create_task(tx.clone(), 0, &mut |_| UsageQueue::default()); + + // wait until the poh's working bank is cleared. + // also flush signal_receiver after that. + if !should_succeed_to_record_to_poh { + while poh_recorder.read().unwrap().bank().is_some() { + sleep(Duration::from_millis(100)); + } + while signal_receiver.try_recv().is_ok() {} + } + + assert_eq!(bank.transaction_count(), 0); + assert_eq!(bank.transaction_error_count(), 0); + DefaultTaskHandler::handle(result, timings, scheduling_context, &task, handler_context); + + if should_succeed_to_record_to_poh { + if expected_tx_result.is_ok() { + assert_matches!(result, Ok(())); + assert_eq!(bank.transaction_count(), 1); + if matches!(tx_result, TxResult::ExecutedWithFailure) { + assert_eq!(bank.transaction_error_count(), 1); + } else { + assert_eq!(bank.transaction_error_count(), 0); + } + assert_matches!( + receiver.try_recv(), + Ok(TransactionStatusMessage::Batch( + TransactionStatusBatch { .. } + )) + ); + assert_matches!( + signal_receiver.try_recv(), + Ok((_, (solana_entry::entry::Entry {transactions, ..} , _))) + if transactions == vec![tx.to_versioned_transaction()] + ); + } else { + assert_eq!(result, &expected_tx_result); + assert_eq!(bank.transaction_count(), 0); + assert_eq!(bank.transaction_error_count(), 0); + assert_matches!(receiver.try_recv(), Err(_)); + assert_matches!(signal_receiver.try_recv(), Err(_)); + } + } else { + if expected_tx_result.is_ok() { + assert_matches!(result, Err(TransactionError::CommitCancelled)); + } else { + assert_eq!(result, &expected_tx_result); + } + + assert_eq!(bank.transaction_count(), 0); + assert_matches!(receiver.try_recv(), Err(_)); + assert_matches!(signal_receiver.try_recv(), Err(_)); + } + + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + } } From 2d406317acdce20a82360e584e133159a6cacd41 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Fri, 10 Jan 2025 15:45:00 +0000 Subject: [PATCH 4/5] recovers Merkle shreds using mutable references into shreds payloads (#4356) Erasure recovery for Merkle shreds copies the erasure shards out of the shreds: https://github.com/anza-xyz/agave/blob/df5c9ad28/ledger/src/shred/merkle.rs#L893 and then resizes and copies recovered shards after erasure recovery which might cause another re-allocation: https://github.com/anza-xyz/agave/blob/df5c9ad28/ledger/src/shred/merkle.rs#L157-L158 https://github.com/anza-xyz/agave/blob/df5c9ad28/ledger/src/shred/merkle.rs#L246-L247 In order to minimize allocations and memory copies, this commit instead uses mutable references into the shred payload, passing them directly as shards to the Reed-Solomon implementation. --- ledger/src/shred/common.rs | 4 +- ledger/src/shred/merkle.rs | 535 ++++++++++++++++++------------------- 2 files changed, 269 insertions(+), 270 deletions(-) diff --git a/ledger/src/shred/common.rs b/ledger/src/shred/common.rs index af05532a3e361c..abfcc0e0d67499 100644 --- a/ledger/src/shred/common.rs +++ b/ledger/src/shred/common.rs @@ -40,12 +40,14 @@ macro_rules! impl_shred_common { &self.payload } + #[inline] fn into_payload(self) -> Vec { self.payload } + #[inline] fn set_signature(&mut self, signature: Signature) { - bincode::serialize_into(&mut self.payload[..], &signature).unwrap(); + self.payload[..SIZE_OF_SIGNATURE].copy_from_slice(signature.as_ref()); self.common_header.signature = signature; } diff --git a/ledger/src/shred/merkle.rs b/ledger/src/shred/merkle.rs index 4293861f839dd1..f01f7e5903851f 100644 --- a/ledger/src/shred/merkle.rs +++ b/ledger/src/shred/merkle.rs @@ -29,6 +29,7 @@ use { }, static_assertions::const_assert_eq, std::{ + cmp::Ordering, io::{Cursor, Write}, ops::Range, time::Instant, @@ -85,12 +86,14 @@ pub(super) enum Shred { impl Shred { dispatch!(fn common_header(&self) -> &ShredCommonHeader); - dispatch!(fn erasure_shard_as_slice(&self) -> Result<&[u8], Error>); + dispatch!(fn erasure_shard_as_slice_mut(&mut self) -> Result<&mut [u8], Error>); dispatch!(fn erasure_shard_index(&self) -> Result); dispatch!(fn merkle_node(&self) -> Result); dispatch!(fn payload(&self) -> &Vec); dispatch!(fn sanitize(&self) -> Result<(), Error>); + dispatch!(fn set_chained_merkle_root(&mut self, chained_merkle_root: &Hash) -> Result<(), Error>); dispatch!(fn set_merkle_proof(&mut self, proof: &[&MerkleProofEntry]) -> Result<(), Error>); + dispatch!(fn set_retransmitter_signature(&mut self, signature: &Signature) -> Result<(), Error>); dispatch!(fn set_signature(&mut self, signature: Signature)); dispatch!(fn signed_data(&self) -> Result); @@ -109,6 +112,7 @@ impl Shred { } } + #[inline] fn signature(&self) -> &Signature { &self.common_header().signature } @@ -125,6 +129,7 @@ impl Shred { #[cfg(test)] impl Shred { dispatch!(fn chained_merkle_root(&self) -> Result); + dispatch!(fn erasure_shard_as_slice(&self) -> Result<&[u8], Error>); dispatch!(fn merkle_root(&self) -> Result); dispatch!(fn proof_size(&self) -> Result); @@ -144,48 +149,8 @@ impl Shred { impl ShredData { impl_merkle_shred!(MerkleData); - fn from_recovered_shard( - signature: &Signature, - chained_merkle_root: &Option, - retransmitter_signature: &Option, - mut shard: Vec, - ) -> Result { - let shard_size = shard.len(); - if shard_size + SIZE_OF_SIGNATURE > Self::SIZE_OF_PAYLOAD { - return Err(Error::InvalidShardSize(shard_size)); - } - shard.resize(Self::SIZE_OF_PAYLOAD, 0u8); - shard.copy_within(0..shard_size, SIZE_OF_SIGNATURE); - shard[0..SIZE_OF_SIGNATURE].copy_from_slice(signature.as_ref()); - // Deserialize headers. - let mut cursor = Cursor::new(&shard[..]); - let common_header: ShredCommonHeader = deserialize_from_with_limit(&mut cursor)?; - let ShredVariant::MerkleData { - proof_size, - chained, - resigned, - } = common_header.shred_variant - else { - return Err(Error::InvalidShredVariant); - }; - if ShredCode::capacity(proof_size, chained, resigned)? != shard_size { - return Err(Error::InvalidShardSize(shard_size)); - } - let data_header = deserialize_from_with_limit(&mut cursor)?; - let mut shred = Self { - common_header, - data_header, - payload: shard, - }; - if let Some(chained_merkle_root) = chained_merkle_root { - shred.set_chained_merkle_root(chained_merkle_root)?; - } - if let Some(signature) = retransmitter_signature { - shred.set_retransmitter_signature(signature)?; - } - shred.sanitize()?; - Ok(shred) - } + // Offset into the payload where the erasure coded slice begins. + const ERASURE_SHARD_START_OFFSET: usize = SIZE_OF_SIGNATURE; pub(super) fn get_merkle_root( shred: &[u8], @@ -221,47 +186,8 @@ impl ShredData { impl ShredCode { impl_merkle_shred!(MerkleCode); - fn from_recovered_shard( - common_header: ShredCommonHeader, - coding_header: CodingShredHeader, - chained_merkle_root: &Option, - retransmitter_signature: &Option, - mut shard: Vec, - ) -> Result { - let ShredVariant::MerkleCode { - proof_size, - chained, - resigned, - } = common_header.shred_variant - else { - return Err(Error::InvalidShredVariant); - }; - let shard_size = shard.len(); - if Self::capacity(proof_size, chained, resigned)? != shard_size { - return Err(Error::InvalidShardSize(shard_size)); - } - if shard_size + Self::SIZE_OF_HEADERS > Self::SIZE_OF_PAYLOAD { - return Err(Error::InvalidShardSize(shard_size)); - } - shard.resize(Self::SIZE_OF_PAYLOAD, 0u8); - shard.copy_within(0..shard_size, Self::SIZE_OF_HEADERS); - let mut cursor = Cursor::new(&mut shard[..]); - bincode::serialize_into(&mut cursor, &common_header)?; - bincode::serialize_into(&mut cursor, &coding_header)?; - let mut shred = Self { - common_header, - coding_header, - payload: shard, - }; - if let Some(chained_merkle_root) = chained_merkle_root { - shred.set_chained_merkle_root(chained_merkle_root)?; - } - if let Some(signature) = retransmitter_signature { - shred.set_retransmitter_signature(signature)?; - } - shred.sanitize()?; - Ok(shred) - } + // Offset into the payload where the erasure coded slice begins. + const ERASURE_SHARD_START_OFFSET: usize = Self::SIZE_OF_HEADERS; pub(super) fn get_merkle_root( shred: &[u8], @@ -425,10 +351,10 @@ macro_rules! impl_merkle_shred { .get_mut(proof_offset..) .ok_or(Error::InvalidProofSize(proof_size))?, ); - for entry in proof { - bincode::serialize_into(&mut cursor, entry)?; - } - Ok(()) + proof + .iter() + .try_for_each(|entry| cursor.write_all(&entry[..])) + .map_err(Error::from) } pub(super) fn retransmitter_signature(&self) -> Result { @@ -472,6 +398,48 @@ macro_rules! impl_merkle_shred { let proof_offset = Self::get_proof_offset(proof_size, chained, resigned)?; Ok(proof_offset + usize::from(proof_size) * SIZE_OF_MERKLE_PROOF_ENTRY) } + + // Returns the offsets into the payload which are erasure coded. + fn erausre_shard_offsets(&self) -> Result, Error> { + if self.payload.len() != Self::SIZE_OF_PAYLOAD { + return Err(Error::InvalidPayloadSize(self.payload.len())); + } + let ShredVariant::$variant { + proof_size, + chained, + resigned, + } = self.common_header.shred_variant + else { + return Err(Error::InvalidShredVariant); + }; + let offset = Self::SIZE_OF_HEADERS + Self::capacity(proof_size, chained, resigned)?; + Ok(Self::ERASURE_SHARD_START_OFFSET..offset) + } + + // Returns the erasure coded slice as an owned Vec. + fn erasure_shard(self) -> Result, Error> { + let Range { start, end } = self.erausre_shard_offsets()?; + let mut shard = self.payload; + shard.truncate(end); + shard.drain(..start); + Ok(shard) + } + + // Returns the erasure coded slice as an immutable reference. + fn erasure_shard_as_slice(&self) -> Result<&[u8], Error> { + self.payload + .get(self.erausre_shard_offsets()?) + .ok_or(Error::InvalidPayloadSize(self.payload.len())) + } + + // Returns the erasure coded slice as a mutable reference. + fn erasure_shard_as_slice_mut(&mut self) -> Result<&mut [u8], Error> { + let offsets = self.erausre_shard_offsets()?; + let payload_size = self.payload.len(); + self.payload + .get_mut(offsets) + .ok_or(Error::InvalidPayloadSize(payload_size)) + } }; } @@ -498,12 +466,11 @@ impl<'a> ShredTrait<'a> for ShredData { return Err(Error::InvalidPayloadSize(payload.len())); } payload.truncate(Self::SIZE_OF_PAYLOAD); - let mut cursor = Cursor::new(&payload[..]); - let common_header: ShredCommonHeader = deserialize_from_with_limit(&mut cursor)?; + let (common_header, data_header): (ShredCommonHeader, _) = + deserialize_from_with_limit(&payload[..])?; if !matches!(common_header.shred_variant, ShredVariant::MerkleData { .. }) { return Err(Error::InvalidShredVariant); } - let data_header = deserialize_from_with_limit(&mut cursor)?; let shred = Self { common_header, data_header, @@ -521,40 +488,11 @@ impl<'a> ShredTrait<'a> for ShredData { } fn erasure_shard(self) -> Result, Error> { - if self.payload.len() != Self::SIZE_OF_PAYLOAD { - return Err(Error::InvalidPayloadSize(self.payload.len())); - } - let ShredVariant::MerkleData { - proof_size, - chained, - resigned, - } = self.common_header.shred_variant - else { - return Err(Error::InvalidShredVariant); - }; - let offset = Self::SIZE_OF_HEADERS + Self::capacity(proof_size, chained, resigned)?; - let mut shard = self.payload; - shard.truncate(offset); - shard.drain(..SIZE_OF_SIGNATURE); - Ok(shard) + Self::erasure_shard(self) } fn erasure_shard_as_slice(&self) -> Result<&[u8], Error> { - if self.payload.len() != Self::SIZE_OF_PAYLOAD { - return Err(Error::InvalidPayloadSize(self.payload.len())); - } - let ShredVariant::MerkleData { - proof_size, - chained, - resigned, - } = self.common_header.shred_variant - else { - return Err(Error::InvalidShredVariant); - }; - let offset = Self::SIZE_OF_HEADERS + Self::capacity(proof_size, chained, resigned)?; - self.payload - .get(SIZE_OF_SIGNATURE..offset) - .ok_or(Error::InvalidPayloadSize(self.payload.len())) + Self::erasure_shard_as_slice(self) } fn sanitize(&self) -> Result<(), Error> { @@ -579,12 +517,11 @@ impl<'a> ShredTrait<'a> for ShredCode { const SIZE_OF_HEADERS: usize = SIZE_OF_CODING_SHRED_HEADERS; fn from_payload(mut payload: Vec) -> Result { - let mut cursor = Cursor::new(&payload[..]); - let common_header: ShredCommonHeader = deserialize_from_with_limit(&mut cursor)?; + let (common_header, coding_header): (ShredCommonHeader, _) = + deserialize_from_with_limit(&payload[..])?; if !matches!(common_header.shred_variant, ShredVariant::MerkleCode { .. }) { return Err(Error::InvalidShredVariant); } - let coding_header = deserialize_from_with_limit(&mut cursor)?; // see: https://github.com/solana-labs/solana/pull/10109 if payload.len() < Self::SIZE_OF_PAYLOAD { return Err(Error::InvalidPayloadSize(payload.len())); @@ -607,40 +544,11 @@ impl<'a> ShredTrait<'a> for ShredCode { } fn erasure_shard(self) -> Result, Error> { - if self.payload.len() != Self::SIZE_OF_PAYLOAD { - return Err(Error::InvalidPayloadSize(self.payload.len())); - } - let ShredVariant::MerkleCode { - proof_size, - chained, - resigned, - } = self.common_header.shred_variant - else { - return Err(Error::InvalidShredVariant); - }; - let offset = Self::SIZE_OF_HEADERS + Self::capacity(proof_size, chained, resigned)?; - let mut shard = self.payload; - shard.truncate(offset); - shard.drain(..Self::SIZE_OF_HEADERS); - Ok(shard) + Self::erasure_shard(self) } fn erasure_shard_as_slice(&self) -> Result<&[u8], Error> { - if self.payload.len() != Self::SIZE_OF_PAYLOAD { - return Err(Error::InvalidPayloadSize(self.payload.len())); - } - let ShredVariant::MerkleCode { - proof_size, - chained, - resigned, - } = self.common_header.shred_variant - else { - return Err(Error::InvalidShredVariant); - }; - let offset = Self::SIZE_OF_HEADERS + Self::capacity(proof_size, chained, resigned)?; - self.payload - .get(Self::SIZE_OF_HEADERS..offset) - .ok_or(Error::InvalidPayloadSize(self.payload.len())) + Self::erasure_shard_as_slice(self) } fn sanitize(&self) -> Result<(), Error> { @@ -780,41 +688,42 @@ fn make_merkle_proof( } pub(super) fn recover( - shreds: Vec, + mut shreds: Vec, reed_solomon_cache: &ReedSolomonCache, ) -> Result, Error> { - // Grab {common, coding} headers from first coding shred. + // Sort shreds by their erasure shard index. + // In particular this places all data shreds before coding shreds. + let is_sorted = |(a, b)| cmp_shred_erasure_shard_index(a, b).is_le(); + if !shreds.iter().tuple_windows().all(is_sorted) { + shreds.sort_unstable_by(cmp_shred_erasure_shard_index); + } + // Grab {common, coding} headers from the last coding shred. // Incoming shreds are resigned immediately after signature verification, // so we can just grab the retransmitter signature from one of the // available shreds and attach it to the recovered shreds. - let (common_header, coding_header, merkle_root, chained_merkle_root, retransmitter_signature) = - shreds - .iter() - .find_map(|shred| { - let Shred::ShredCode(shred) = shred else { - return None; - }; - let merkle_root = shred.merkle_root().ok()?; - let chained_merkle_root = shred.chained_merkle_root().ok(); - let retransmitter_signature = shred.retransmitter_signature().ok(); - let position = u32::from(shred.coding_header.position); - let common_header = ShredCommonHeader { - index: shred.common_header.index.checked_sub(position)?, - ..shred.common_header - }; - let coding_header = CodingShredHeader { - position: 0u16, - ..shred.coding_header - }; - Some(( - common_header, - coding_header, - merkle_root, - chained_merkle_root, - retransmitter_signature, - )) - }) - .ok_or(TooFewParityShards)?; + let (common_header, coding_header, merkle_root, chained_merkle_root, retransmitter_signature) = { + // The last shred must be a coding shred by the above sorting logic. + let Some(Shred::ShredCode(shred)) = shreds.last() else { + return Err(Error::from(TooFewParityShards)); + }; + let position = u32::from(shred.coding_header.position); + let index = shred.common_header.index.checked_sub(position); + let common_header = ShredCommonHeader { + index: index.ok_or(Error::from(InvalidIndex))?, + ..shred.common_header + }; + let coding_header = CodingShredHeader { + position: 0u16, + ..shred.coding_header + }; + ( + common_header, + coding_header, + shred.merkle_root()?, + shred.chained_merkle_root().ok(), + shred.retransmitter_signature().ok(), + ) + }; debug_assert_matches!(common_header.shred_variant, ShredVariant::MerkleCode { .. }); let (proof_size, chained, resigned) = match common_header.shred_variant { ShredVariant::MerkleCode { @@ -870,9 +779,21 @@ pub(super) fn recover( let num_data_shreds = usize::from(coding_header.num_data_shreds); let num_coding_shreds = usize::from(coding_header.num_coding_shreds); let num_shards = num_data_shreds + num_coding_shreds; - // Obtain erasure encoded shards from shreds. - let shreds = { - let mut batch = vec![None; num_shards]; + // Identify which shreds are missing and create stub shreds in their place. + let mut mask = vec![false; num_shards]; + let mut shreds = { + let make_stub_shred = |erasure_shard_index| { + make_stub_shred( + erasure_shard_index, + &common_header, + &coding_header, + &chained_merkle_root, + &retransmitter_signature, + ) + }; + let mut batch = Vec::with_capacity(num_shards); + // By the sorting logic earlier above, this visits shreds in the order + // of their erasure shard index. for shred in shreds { // The leader signs the Merkle root and shreds in the same erasure // batch have the same Merkle root. So the signatures are the same @@ -880,85 +801,58 @@ pub(super) fn recover( if shred.signature() != &common_header.signature { return Err(Error::InvalidMerkleRoot); } - let index = match shred.erasure_shard_index() { - Ok(index) if index < batch.len() => index, - _ => return Err(Error::from(InvalidIndex)), - }; - batch[index] = Some(shred); + let erasure_shard_index = shred.erasure_shard_index()?; + if !(batch.len()..num_shards).contains(&erasure_shard_index) { + return Err(Error::from(InvalidIndex)); + } + // Push stub shreds as placeholder for the missing shreds in + // between. + while batch.len() < erasure_shard_index { + batch.push(make_stub_shred(batch.len())?); + } + mask[erasure_shard_index] = true; + batch.push(shred); + } + // Push stub shreds as placeholder for the missing shreds at the end. + while batch.len() < num_shards { + batch.push(make_stub_shred(batch.len())?); } batch }; - let mut shards: Vec>> = shreds - .iter() - .map(|shred| Some(shred.as_ref()?.erasure_shard_as_slice().ok()?.to_vec())) - .collect(); + // Obtain erasure encoded shards from the shreds and reconstruct shreds. + let mut shards: Vec<(&mut [u8], bool)> = shreds + .iter_mut() + .zip(&mask) + .map(|(shred, &mask)| Ok((shred.erasure_shard_as_slice_mut()?, mask))) + .collect::>()?; reed_solomon_cache .get(num_data_shreds, num_coding_shreds)? .reconstruct(&mut shards)?; - let mask: Vec<_> = shreds.iter().map(Option::is_some).collect(); - // Reconstruct code and data shreds from erasure encoded shards. - let mut shreds: Vec<_> = shreds - .into_iter() - .zip(shards) + // Verify and sanitize recovered shreds, re-compute the Merkle tree and set + // the merkle proof on the recovered shreds. + let nodes: Vec<_> = shreds + .iter_mut() + .zip(&mask) .enumerate() - .map(|(index, (shred, shard))| { - if let Some(shred) = shred { - return Ok(shred); - } - let shard = shard.ok_or(TooFewShards)?; - if index < num_data_shreds { - let shred = ShredData::from_recovered_shard( - &common_header.signature, - &chained_merkle_root, - &retransmitter_signature, - shard, - )?; - let ShredCommonHeader { - signature: _, - shred_variant, - slot, - index: _, - version, - fec_set_index, - } = shred.common_header; - let expected_shred_variant = ShredVariant::MerkleData { - proof_size, - chained, - resigned, - }; - if shred_variant != expected_shred_variant - || common_header.slot != slot - || common_header.version != version - || common_header.fec_set_index != fec_set_index - { + .map(|(index, (shred, mask))| { + if !mask { + if index < num_data_shreds { + let Shred::ShredData(shred) = shred else { + return Err(Error::InvalidRecoveredShred); + }; + let (common_header, data_header) = + deserialize_from_with_limit(&shred.payload[..])?; + if shred.common_header != common_header { + return Err(Error::InvalidRecoveredShred); + } + shred.data_header = data_header; + } else if !matches!(shred, Shred::ShredCode(_)) { return Err(Error::InvalidRecoveredShred); } - Ok(Shred::ShredData(shred)) - } else { - let offset = index - num_data_shreds; - let coding_header = CodingShredHeader { - position: offset as u16, - ..coding_header - }; - let common_header = ShredCommonHeader { - index: common_header.index + offset as u32, - ..common_header - }; - let shred = ShredCode::from_recovered_shard( - common_header, - coding_header, - &chained_merkle_root, - &retransmitter_signature, - shard, - )?; - Ok(Shred::ShredCode(shred)) + shred.sanitize()?; } + shred.merkle_node() }) - .collect::>()?; - // Compute merkle tree and set the merkle proof on the recovered shreds. - let nodes: Vec<_> = shreds - .iter() - .map(Shred::merkle_node) .collect::>()?; let tree = make_merkle_tree(nodes); // The attched signature verfies only if we obtain the same Merkle root. @@ -978,7 +872,7 @@ pub(super) fn recover( } } else { shred.set_merkle_proof(&proof)?; - // Already sanitized in Shred{Code,Data}::from_recovered_shard. + // Already sanitized after reconstruct. debug_assert_matches!(shred.sanitize(), Ok(())); // Assert that shred payload is fully populated. debug_assert_eq!(shred, { @@ -994,6 +888,101 @@ pub(super) fn recover( .map(|(shred, _)| shred)) } +// Compares shreds of the same erasure batch by their erasure shard index +// within the erasure batch. +#[inline] +fn cmp_shred_erasure_shard_index(a: &Shred, b: &Shred) -> Ordering { + debug_assert_eq!( + a.common_header().fec_set_index, + b.common_header().fec_set_index + ); + // Ordering by erasure shard index is equivalent to: + // * ShredType::Data < ShredType::Code. + // * Tie break by shred index. + match (a, b) { + (Shred::ShredCode(_), Shred::ShredData(_)) => Ordering::Greater, + (Shred::ShredData(_), Shred::ShredCode(_)) => Ordering::Less, + (Shred::ShredCode(a), Shred::ShredCode(b)) => { + a.common_header.index.cmp(&b.common_header.index) + } + (Shred::ShredData(a), Shred::ShredData(b)) => { + a.common_header.index.cmp(&b.common_header.index) + } + } +} + +// Creates a minimally populated shred which will be a placeholder for a +// missing shred when running erasure recovery. This allows us to obtain +// mutable references to erasure coded slices within the shreds and reconstruct +// shreds in place. +fn make_stub_shred( + erasure_shard_index: usize, + common_header: &ShredCommonHeader, + coding_header: &CodingShredHeader, + chained_merkle_root: &Option, + retransmitter_signature: &Option, +) -> Result { + let num_data_shreds = usize::from(coding_header.num_data_shreds); + let mut shred = if let Some(position) = erasure_shard_index.checked_sub(num_data_shreds) { + let position = u16::try_from(position).map_err(|_| Error::from(InvalidIndex))?; + let common_header = ShredCommonHeader { + index: common_header.index + u32::from(position), + ..*common_header + }; + let coding_header = CodingShredHeader { + position, + ..*coding_header + }; + // For coding shreds {common,coding} headers are not part of the + // erasure coded slice and need to be written to the payload here. + let mut payload = vec![0u8; ShredCode::SIZE_OF_PAYLOAD]; + bincode::serialize_into(&mut payload[..], &(&common_header, &coding_header))?; + Shred::ShredCode(ShredCode { + common_header, + coding_header, + payload, + }) + } else { + let ShredVariant::MerkleCode { proof_size, .. } = common_header.shred_variant else { + return Err(Error::InvalidShredVariant); + }; + let shred_variant = ShredVariant::MerkleData { + proof_size, + chained: chained_merkle_root.is_some(), + resigned: retransmitter_signature.is_some(), + }; + let index = common_header.fec_set_index + + u32::try_from(erasure_shard_index).map_err(|_| InvalidIndex)?; + let common_header = ShredCommonHeader { + shred_variant, + index, + ..*common_header + }; + // Data header will be overwritten from the recovered shard. + let data_header = DataShredHeader { + parent_offset: 0u16, + flags: ShredFlags::empty(), + size: 0u16, + }; + // For data shreds only the signature part of the {common,data} headers + // is not erasure coded and it needs to be written to the payload here. + let mut payload = vec![0u8; ShredData::SIZE_OF_PAYLOAD]; + payload[..SIZE_OF_SIGNATURE].copy_from_slice(common_header.signature.as_ref()); + Shred::ShredData(ShredData { + common_header, + data_header, + payload, + }) + }; + if let Some(chained_merkle_root) = chained_merkle_root { + shred.set_chained_merkle_root(chained_merkle_root)?; + } + if let Some(signature) = retransmitter_signature { + shred.set_retransmitter_signature(signature)?; + } + Ok(shred) +} + // Maps number of (code + data) shreds to merkle_proof.len(). fn get_proof_size(num_shreds: usize) -> u8 { let bits = usize::BITS - num_shreds.leading_zeros(); @@ -1155,9 +1144,10 @@ pub(super) fn make_shreds_from_data( // Write common and data headers into data shreds' payload buffer. thread_pool.install(|| { shreds.par_iter_mut().try_for_each(|shred| { - let mut cursor = Cursor::new(&mut shred.payload[..]); - bincode::serialize_into(&mut cursor, &shred.common_header)?; - bincode::serialize_into(&mut cursor, &shred.data_header) + bincode::serialize_into( + &mut shred.payload[..], + &(&shred.common_header, &shred.data_header), + ) }) })?; stats.gen_data_elapsed += now.elapsed().as_micros() as u64; @@ -1307,8 +1297,7 @@ fn make_erasure_batch( for code in parity { let mut payload = vec![0u8; ShredCode::SIZE_OF_PAYLOAD]; let mut cursor = Cursor::new(&mut payload[..]); - bincode::serialize_into(&mut cursor, &common_header)?; - bincode::serialize_into(&mut cursor, &coding_header)?; + bincode::serialize_into(&mut cursor, &(&common_header, &coding_header))?; cursor.write_all(&code)?; if let Some(chained_merkle_root) = chained_merkle_root { cursor.write_all(chained_merkle_root.as_ref())?; @@ -1472,6 +1461,18 @@ mod test { } } + #[test_case(19, false, false)] + #[test_case(19, true, false)] + #[test_case(19, true, true)] + #[test_case(31, false, false)] + #[test_case(31, true, false)] + #[test_case(31, true, true)] + #[test_case(32, false, false)] + #[test_case(32, true, false)] + #[test_case(32, true, true)] + #[test_case(33, false, false)] + #[test_case(33, true, false)] + #[test_case(33, true, true)] #[test_case(37, false, false)] #[test_case(37, true, false)] #[test_case(37, true, true)] @@ -1546,9 +1547,7 @@ mod test { ..data_header }; let mut payload = vec![0u8; ShredData::SIZE_OF_PAYLOAD]; - let mut cursor = Cursor::new(&mut payload[..]); - bincode::serialize_into(&mut cursor, &common_header).unwrap(); - bincode::serialize_into(&mut cursor, &data_header).unwrap(); + bincode::serialize_into(&mut payload[..], &(&common_header, &data_header)).unwrap(); rng.fill(&mut payload[ShredData::SIZE_OF_HEADERS..size]); let shred = ShredData { common_header, @@ -1583,9 +1582,7 @@ mod test { ..coding_header }; let mut payload = vec![0u8; ShredCode::SIZE_OF_PAYLOAD]; - let mut cursor = Cursor::new(&mut payload[..]); - bincode::serialize_into(&mut cursor, &common_header).unwrap(); - bincode::serialize_into(&mut cursor, &coding_header).unwrap(); + bincode::serialize_into(&mut payload[..], &(&common_header, &coding_header)).unwrap(); payload[ShredCode::SIZE_OF_HEADERS..ShredCode::SIZE_OF_HEADERS + code.len()] .copy_from_slice(&code); let shred = ShredCode { From cb9451cd44279941892e148fd3b665b3dcb0a5c5 Mon Sep 17 00:00:00 2001 From: HaoranYi <219428+HaoranYi@users.noreply.github.com> Date: Fri, 10 Jan 2025 09:56:19 -0600 Subject: [PATCH 5/5] Optimize max_flushed_root update in flushing roots (#4320) * optimize acchounts_cache max_root tracking in flush_roots * better comments * remove max_root check --------- Co-authored-by: HaoranYi --- accounts-db/src/accounts_cache.rs | 5 +---- accounts-db/src/accounts_db.rs | 23 +++++++++++++---------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/accounts-db/src/accounts_cache.rs b/accounts-db/src/accounts_cache.rs index babf5336bd07a7..f2179c6475f9c1 100644 --- a/accounts-db/src/accounts_cache.rs +++ b/accounts-db/src/accounts_cache.rs @@ -234,10 +234,7 @@ impl AccountsCache { } pub fn add_root(&self, root: Slot) { - let max_flushed_root = self.fetch_max_flush_root(); - if root > max_flushed_root || (root == max_flushed_root && root == 0) { - self.maybe_unflushed_roots.write().unwrap().insert(root); - } + self.maybe_unflushed_roots.write().unwrap().insert(root); } pub fn clear_roots(&self, max_root: Option) -> BTreeSet { diff --git a/accounts-db/src/accounts_db.rs b/accounts-db/src/accounts_db.rs index 1a67995a62e4f4..1bdd7601585a6f 100644 --- a/accounts-db/src/accounts_db.rs +++ b/accounts-db/src/accounts_db.rs @@ -6296,30 +6296,33 @@ impl AccountsDb { }); // Always flush up to `requested_flush_root`, which is necessary for things like snapshotting. - let cached_roots: BTreeSet = self.accounts_cache.clear_roots(requested_flush_root); + let flushed_roots: BTreeSet = self.accounts_cache.clear_roots(requested_flush_root); // Iterate from highest to lowest so that we don't need to flush earlier // outdated updates in earlier roots let mut num_roots_flushed = 0; let mut flush_stats = FlushStats::default(); - for &root in cached_roots.iter().rev() { + for &root in flushed_roots.iter().rev() { if let Some(stats) = self.flush_slot_cache_with_clean(root, should_flush_f.as_mut(), max_clean_root) { num_roots_flushed += 1; flush_stats.accumulate(&stats); } + } - // Regardless of whether this slot was *just* flushed from the cache by the above - // `flush_slot_cache()`, we should update the `max_flush_root`. - // This is because some rooted slots may be flushed to storage *before* they are marked as root. - // This can occur for instance when - // the cache is overwhelmed, we flushed some yet to be rooted frozen slots - // These slots may then *later* be marked as root, so we still need to handle updating the - // `max_flush_root` in the accounts cache. + // Note that self.flush_slot_cache_with_clean() can return None if the + // slot is already been flushed. This can happen if the cache is + // overwhelmed and we flushed some yet to be rooted frozen slots. + // However, Independent of whether the last slot was actually flushed + // from the cache by the above loop, we should always update the + // `max_flush_root` to the max of the flushed roots, because that's + // max_flushed_root tracks the logical last root that was flushed to + // storage by snapshotting. + if let Some(&root) = flushed_roots.last() { self.accounts_cache.set_max_flush_root(root); } - let num_new_roots = cached_roots.len(); + let num_new_roots = flushed_roots.len(); (num_new_roots, num_roots_flushed, flush_stats) }