diff --git a/Cargo.lock b/Cargo.lock index a9b0f0e2fb25b8..88da797360e920 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8853,6 +8853,7 @@ dependencies = [ "solana-system-program", "solana-timings", "solana-transaction-status-client-types", + "solana-unified-scheduler-logic", "solana-version", "solana-vote", "solana-vote-program", @@ -10124,9 +10125,12 @@ dependencies = [ "qualifier_attr", "scopeguard", "solana-clock", + "solana-entry", + "solana-hash", "solana-keypair", "solana-ledger", "solana-logger", + "solana-poh", "solana-pubkey", "solana-runtime", "solana-runtime-transaction", @@ -10136,6 +10140,7 @@ dependencies = [ "solana-transaction-error", "solana-unified-scheduler-logic", "static_assertions", + "test-case", "vec_extract_if_polyfill", ] diff --git a/ledger/benches/blockstore_processor.rs b/ledger/benches/blockstore_processor.rs index 44f65db1d54fd4..c4d39cb1813b73 100644 --- a/ledger/benches/blockstore_processor.rs +++ b/ledger/benches/blockstore_processor.rs @@ -162,6 +162,7 @@ fn bench_execute_batch( &mut timing, None, &prioritization_fee_cache, + None:: _>, ); } }); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index ac6b578c193e6e..6f0f35d8f4bb7a 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -28,7 +28,7 @@ use { solana_rayon_threadlimit::{get_max_thread_count, get_thread_count}, solana_runtime::{ accounts_background_service::{AbsRequestSender, SnapshotRequestKind}, - bank::{Bank, TransactionBalancesSet}, + bank::{Bank, PreCommitResult, TransactionBalancesSet}, bank_forks::{BankForks, SetRootError}, bank_utils, commitment::VOTE_THRESHOLD_SIZE, @@ -54,6 +54,7 @@ use { }, solana_svm::{ transaction_commit_result::{TransactionCommitResult, TransactionCommitResultExtensions}, + transaction_processing_result::{ProcessedTransaction, TransactionProcessingResult}, transaction_processor::ExecutionRecordingConfig, }, solana_svm_transaction::{svm_message::SVMMessage, svm_transaction::SVMTransaction}, @@ -61,6 +62,7 @@ use { solana_transaction_status::token_balances::TransactionTokenBalancesSet, solana_vote::vote_account::VoteAccountsHashMap, std::{ + borrow::Cow, collections::{HashMap, HashSet}, num::Saturating, ops::{Index, Range}, @@ -107,13 +109,13 @@ fn first_err(results: &[Result<()>]) -> Result<()> { } // Includes transaction signature for unit-testing -fn get_first_error( - batch: &TransactionBatch, - commit_results: &[TransactionCommitResult], +fn do_get_first_error( + batch: &TransactionBatch, + results: &[Result], ) -> Option<(Result<()>, Signature)> { let mut first_err = None; - for (commit_result, transaction) in commit_results.iter().zip(batch.sanitized_transactions()) { - if let Err(err) = commit_result { + for (result, transaction) in results.iter().zip(batch.sanitized_transactions()) { + if let Err(err) = result { if first_err.is_none() { first_err = Some((Err(err.clone()), *transaction.signature())); } @@ -134,6 +136,15 @@ fn get_first_error( first_err } +fn get_first_error( + batch: &TransactionBatch, + commit_results: &[Result], +) -> Result<()> { + do_get_first_error(batch, commit_results) + .map(|(error, _signature)| error) + .unwrap_or(Ok(())) +} + fn create_thread_pool(num_threads: usize) -> ThreadPool { rayon::ThreadPoolBuilder::new() .num_threads(num_threads) @@ -142,20 +153,26 @@ fn create_thread_pool(num_threads: usize) -> ThreadPool { .expect("new rayon threadpool") } -pub fn execute_batch( - batch: &TransactionBatchWithIndexes, - bank: &Arc, - transaction_status_sender: Option<&TransactionStatusSender>, - replay_vote_sender: Option<&ReplayVoteSender>, - timings: &mut ExecuteTimings, +pub fn execute_batch<'a>( + batch: &'a TransactionBatchWithIndexes, + bank: &'a Arc, + transaction_status_sender: Option<&'a TransactionStatusSender>, + replay_vote_sender: Option<&'a ReplayVoteSender>, + timings: &'a mut ExecuteTimings, log_messages_bytes_limit: Option, - prioritization_fee_cache: &PrioritizationFeeCache, + prioritization_fee_cache: &'a PrioritizationFeeCache, + // None is meaningfully used to detect this is called from the block producing unified + // scheduler. If so, suppress too verbose logging for the code path. + extra_pre_commit_callback: Option< + impl FnOnce(&Result) -> Result>, + >, ) -> Result<()> { let TransactionBatchWithIndexes { batch, transaction_indexes, } = batch; let record_token_balances = transaction_status_sender.is_some(); + let mut transaction_indexes = Cow::from(transaction_indexes); let mut mint_decimals: HashMap = HashMap::new(); @@ -165,14 +182,53 @@ pub fn execute_batch( vec![] }; - let (commit_results, balances) = batch.bank().load_execute_and_commit_transactions( - batch, - MAX_PROCESSING_AGE, - transaction_status_sender.is_some(), - ExecutionRecordingConfig::new_single_setting(transaction_status_sender.is_some()), - timings, - log_messages_bytes_limit, - ); + let pre_commit_callback = |timings: &mut _, processing_results: &_| -> PreCommitResult { + match extra_pre_commit_callback { + None => { + get_first_error(batch, processing_results)?; + check_block_cost_limits_if_enabled(batch, bank, timings, processing_results)?; + Ok(None) + } + Some(extra_pre_commit_callback) => { + // We're entering into the block-producing unified scheduler special case... + // `processing_results` should always contain exactly only 1 result in that case. + assert_eq!(processing_results.len(), 1); + assert!(transaction_indexes.is_empty()); + + // From now on, we need to freeze-lock the tpu bank, in order to prevent it from + // freezing in the middle of this code-path. Otherwise, the assertion at the start + // of commit_transactions() would trigger panic because it's fatal runtime + // invariant violation. + let freeze_lock = bank.freeze_lock(); + + if let Some(index) = extra_pre_commit_callback(&processing_results[0])? { + let transaction_indexes = transaction_indexes.to_mut(); + // Adjust the empty new vec with the exact needed capacity. Otherwise, excess + // cap would be reserved on `.push()` in it. + transaction_indexes.reserve_exact(1); + transaction_indexes.push(index); + } + // At this point, poh should have been succeeded so it's guaranteed that the bank + // hasn't been frozen yet and we're still holding the lock. So, it's okay to pass + // down freeze_lock without any introspection here to be unconditionally dropped + // after commit_transactions(). This reasoning is same as + // solana_core::banking_stage::Consumer::execute_and_commit_transactions_locked() + Ok(Some(freeze_lock)) + } + } + }; + + let (commit_results, balances) = batch + .bank() + .load_execute_and_commit_transactions_with_pre_commit_callback( + batch, + MAX_PROCESSING_AGE, + transaction_status_sender.is_some(), + ExecutionRecordingConfig::new_single_setting(transaction_status_sender.is_some()), + timings, + log_messages_bytes_limit, + pre_commit_callback, + )?; bank_utils::find_and_send_votes( batch.sanitized_transactions(), @@ -180,29 +236,12 @@ pub fn execute_batch( replay_vote_sender, ); - let (check_block_cost_limits_result, check_block_cost_limits_us) = measure_us!(if bank - .feature_set - .is_active(&solana_feature_set::apply_cost_tracker_during_replay::id()) - { - check_block_cost_limits(bank, &commit_results, batch.sanitized_transactions()) - } else { - Ok(()) - }); - - timings.saturating_add_in_place( - ExecuteTimingType::CheckBlockLimitsUs, - check_block_cost_limits_us, - ); - check_block_cost_limits_result?; - let committed_transactions = commit_results .iter() .zip(batch.sanitized_transactions()) .filter_map(|(commit_result, tx)| commit_result.was_committed().then_some(tx)) .collect_vec(); - let first_err = get_first_error(batch, &commit_results); - if let Some(transaction_status_sender) = transaction_status_sender { let transactions: Vec = batch .sanitized_transactions() @@ -224,13 +263,13 @@ pub fn execute_batch( commit_results, balances, token_balances, - transaction_indexes.to_vec(), + transaction_indexes.into_owned(), ); } prioritization_fee_cache.update(bank, committed_transactions.into_iter()); - first_err.map(|(result, _)| result).unwrap_or(Ok(())) + Ok(()) } // collect transactions actual execution costs, subject to block limits; @@ -238,20 +277,20 @@ pub fn execute_batch( // reported to metric `replay-stage-mark_dead_slot` fn check_block_cost_limits( bank: &Bank, - commit_results: &[TransactionCommitResult], + processing_results: &[TransactionProcessingResult], sanitized_transactions: &[impl TransactionWithMeta], ) -> Result<()> { - assert_eq!(sanitized_transactions.len(), commit_results.len()); + assert_eq!(sanitized_transactions.len(), processing_results.len()); - let tx_costs_with_actual_execution_units: Vec<_> = commit_results + let tx_costs_with_actual_execution_units: Vec<_> = processing_results .iter() .zip(sanitized_transactions) - .filter_map(|(commit_result, tx)| { - if let Ok(committed_tx) = commit_result { + .filter_map(|(processing_result, tx)| { + if let Ok(processed_tx) = processing_result { Some(CostModel::calculate_cost_for_executed_transaction( tx, - committed_tx.executed_units, - committed_tx.loaded_account_stats.loaded_accounts_data_size, + processed_tx.executed_units(), + processed_tx.loaded_accounts_data_size(), &bank.feature_set, )) } else { @@ -271,6 +310,28 @@ fn check_block_cost_limits( Ok(()) } +fn check_block_cost_limits_if_enabled( + batch: &TransactionBatch, + bank: &Bank, + timings: &mut ExecuteTimings, + processing_results: &[TransactionProcessingResult], +) -> Result<()> { + let (check_block_cost_limits_result, check_block_cost_limits_us) = measure_us!(if bank + .feature_set + .is_active(&solana_feature_set::apply_cost_tracker_during_replay::id()) + { + check_block_cost_limits(bank, processing_results, batch.sanitized_transactions()) + } else { + Ok(()) + }); + + timings.saturating_add_in_place( + ExecuteTimingType::CheckBlockLimitsUs, + check_block_cost_limits_us, + ); + check_block_cost_limits_result +} + #[derive(Default)] pub struct ExecuteBatchesInternalMetrics { execution_timings_per_thread: HashMap, @@ -322,6 +383,7 @@ fn execute_batches_internal( &mut timings, log_messages_bytes_limit, prioritization_fee_cache, + None:: _>, )); let thread_index = replay_tx_thread_pool.current_thread_index().unwrap(); @@ -2202,11 +2264,13 @@ pub fn process_single_slot( } #[allow(clippy::large_enum_variant)] +#[derive(Debug)] pub enum TransactionStatusMessage { Batch(TransactionStatusBatch), Freeze(Slot), } +#[derive(Debug)] pub struct TransactionStatusBatch { pub slot: Slot, pub transactions: Vec, @@ -2332,12 +2396,10 @@ pub mod tests { solana_sdk::{ account::{AccountSharedData, WritableAccount}, epoch_schedule::EpochSchedule, - fee::FeeDetails, hash::Hash, instruction::{Instruction, InstructionError}, native_token::LAMPORTS_PER_SOL, pubkey::Pubkey, - rent_debits::RentDebits, signature::{Keypair, Signer}, signer::SeedDerivable, system_instruction::SystemError, @@ -2345,8 +2407,9 @@ pub mod tests { transaction::{Transaction, TransactionError}, }, solana_svm::{ - transaction_commit_result::CommittedTransaction, - transaction_execution_result::TransactionLoadedAccountsStats, + account_loader::LoadedTransaction, + transaction_execution_result::{ExecutedTransaction, TransactionExecutionDetails}, + transaction_processing_result::ProcessedTransaction, transaction_processor::ExecutionRecordingConfig, }, solana_vote::vote_account::VoteAccount, @@ -2355,8 +2418,8 @@ pub mod tests { vote_state::{TowerSync, VoteState, VoteStateVersions, MAX_LOCKOUT_HISTORY}, vote_transaction, }, - std::{collections::BTreeSet, sync::RwLock}, - test_case::test_case, + std::{collections::BTreeSet, slice, sync::RwLock}, + test_case::{test_case, test_matrix}, trees::tr, }; @@ -3355,17 +3418,19 @@ pub mod tests { assert_matches!(bank.transfer(4, &mint_keypair, &keypair2.pubkey()), Ok(_)); assert_matches!(bank.transfer(4, &mint_keypair, &keypair4.pubkey()), Ok(_)); + let good_tx = system_transaction::transfer( + &keypair1, + &mint_keypair.pubkey(), + 1, + bank.last_blockhash(), + ); + // construct an Entry whose 2nd transaction would cause a lock conflict with previous entry let entry_1_to_mint = next_entry( &bank.last_blockhash(), 1, vec![ - system_transaction::transfer( - &keypair1, - &mint_keypair.pubkey(), - 1, - bank.last_blockhash(), - ), + good_tx.clone(), system_transaction::transfer( &keypair4, &keypair4.pubkey(), @@ -3394,14 +3459,16 @@ pub mod tests { ], ); - assert!(process_entries_for_tests_without_scheduler( - &bank, - vec![entry_1_to_mint.clone(), entry_2_to_3_mint_to_1.clone()], - ) - .is_err()); + assert_matches!( + process_entries_for_tests_without_scheduler( + &bank, + vec![entry_1_to_mint.clone(), entry_2_to_3_mint_to_1.clone()], + ), + Err(TransactionError::BlockhashNotFound) + ); - // First transaction in first entry succeeded, so keypair1 lost 1 lamport - assert_eq!(bank.get_balance(&keypair1.pubkey()), 3); + // First transaction in first entry was rolled-back, so keypair1 didn't lost 1 lamport + assert_eq!(bank.get_balance(&keypair1.pubkey()), 4); assert_eq!(bank.get_balance(&keypair2.pubkey()), 4); // Check all accounts are unlocked @@ -3417,6 +3484,16 @@ pub mod tests { for result in batch2.lock_results() { assert!(result.is_ok()); } + drop(batch2); + + // ensure good_tx will succeed and was just rolled back above due to other failing tx + let entry_3 = next_entry(&entry_2_to_3_mint_to_1.hash, 1, vec![good_tx]); + assert_matches!( + process_entries_for_tests_without_scheduler(&bank, vec![entry_3]), + Ok(()) + ); + // First transaction in third entry succeeded, so keypair1 lost 1 lamport + assert_eq!(bank.get_balance(&keypair1.pubkey()), 3); } #[test_case(true; "rent_collected")] @@ -4392,7 +4469,7 @@ pub mod tests { &mut ExecuteTimings::default(), None, ); - let (err, signature) = get_first_error(&batch, &commit_results).unwrap(); + let (err, signature) = do_get_first_error(&batch, &commit_results).unwrap(); assert_eq!(err.unwrap_err(), TransactionError::AccountNotFound); assert_eq!(signature, account_not_found_sig); } @@ -5041,6 +5118,133 @@ pub mod tests { do_test_schedule_batches_for_execution(false); } + enum TxResult { + ExecutedWithSuccess, + ExecutedWithFailure, + NotExecuted, + } + + #[test_matrix( + [TxResult::ExecutedWithSuccess, TxResult::ExecutedWithFailure, TxResult::NotExecuted], + [Ok(None), Ok(Some(4)), Err(TransactionError::CommitCancelled)] + )] + fn test_execute_batch_pre_commit_callback( + tx_result: TxResult, + poh_result: Result>, + ) { + solana_logger::setup(); + let dummy_leader_pubkey = solana_sdk::pubkey::new_rand(); + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100); + let bank = Bank::new_for_tests(&genesis_config); + let (bank, _bank_forks) = bank.wrap_with_bank_forks_for_tests(); + let bank = Arc::new(bank); + let pubkey = solana_sdk::pubkey::new_rand(); + let (tx, expected_tx_result) = match tx_result { + TxResult::ExecutedWithSuccess => ( + RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + genesis_config.hash(), + )), + Ok(()), + ), + TxResult::ExecutedWithFailure => ( + RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer( + &mint_keypair, + &pubkey, + 100000000, + genesis_config.hash(), + )), + Ok(()), + ), + TxResult::NotExecuted => ( + RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + Hash::default(), + )), + Err(TransactionError::BlockhashNotFound), + ), + }; + let mut batch = TransactionBatch::new( + vec![Ok(()); 1], + &bank, + OwnedOrBorrowed::Borrowed(slice::from_ref(&tx)), + ); + batch.set_needs_unlock(false); + let poh_with_index = matches!(&poh_result, Ok(Some(_))); + let batch = TransactionBatchWithIndexes { + batch, + transaction_indexes: vec![], + }; + let prioritization_fee_cache = PrioritizationFeeCache::default(); + let mut timing = ExecuteTimings::default(); + let (sender, receiver) = crossbeam_channel::unbounded(); + + assert_eq!(bank.transaction_count(), 0); + assert_eq!(bank.transaction_error_count(), 0); + let should_commit = poh_result.is_ok(); + let mut is_called = false; + let result = execute_batch( + &batch, + &bank, + Some(&TransactionStatusSender { sender }), + None, + &mut timing, + None, + &prioritization_fee_cache, + Some(|processing_result: &'_ Result<_>| { + is_called = true; + let ok = poh_result?; + if let Err(error) = processing_result { + Err(error.clone())?; + }; + Ok(ok) + }), + ); + + // pre_commit_callback() should alwasy be called regardless of tx_result + assert!(is_called); + + if should_commit { + assert_eq!(result, expected_tx_result); + if expected_tx_result.is_ok() { + assert_eq!(bank.transaction_count(), 1); + if matches!(tx_result, TxResult::ExecutedWithFailure) { + assert_eq!(bank.transaction_error_count(), 1); + } else { + assert_eq!(bank.transaction_error_count(), 0); + } + } else { + assert_eq!(bank.transaction_count(), 0); + } + } else { + assert_matches!(result, Err(TransactionError::CommitCancelled)); + assert_eq!(bank.transaction_count(), 0); + } + if poh_with_index && expected_tx_result.is_ok() { + assert_matches!( + receiver.try_recv(), + Ok(TransactionStatusMessage::Batch(TransactionStatusBatch{transaction_indexes, ..})) + if transaction_indexes == vec![4_usize] + ); + } else if should_commit && expected_tx_result.is_ok() { + assert_matches!( + receiver.try_recv(), + Ok(TransactionStatusMessage::Batch(TransactionStatusBatch{transaction_indexes, ..})) + if transaction_indexes.is_empty() + ); + } else { + assert_matches!(receiver.try_recv(), Err(_)); + } + } + #[test] fn test_confirm_slot_entries_with_fix() { const HASHES_PER_TICK: u64 = 10; @@ -5198,27 +5402,31 @@ pub mod tests { .unwrap() .set_limits(u64::MAX, block_limit, u64::MAX); let txs = vec![tx.clone(), tx]; - let commit_results = vec![ - Ok(CommittedTransaction { - status: Ok(()), - log_messages: None, - inner_instructions: None, - return_data: None, - executed_units: actual_execution_cu, - fee_details: FeeDetails::default(), - rent_debits: RentDebits::default(), - loaded_account_stats: TransactionLoadedAccountsStats { - loaded_accounts_data_size: actual_loaded_accounts_data_size, - loaded_accounts_count: 2, + let processing_results = vec![ + Ok(ProcessedTransaction::Executed(Box::new( + ExecutedTransaction { + execution_details: TransactionExecutionDetails { + status: Ok(()), + log_messages: None, + inner_instructions: None, + return_data: None, + executed_units: actual_execution_cu, + accounts_data_len_delta: 0, + }, + loaded_transaction: LoadedTransaction { + loaded_accounts_data_size: actual_loaded_accounts_data_size, + ..LoadedTransaction::default() + }, + programs_modified_by_tx: HashMap::new(), }, - }), + ))), Err(TransactionError::AccountNotFound), ]; - assert!(check_block_cost_limits(&bank, &commit_results, &txs).is_ok()); + assert!(check_block_cost_limits(&bank, &processing_results, &txs).is_ok()); assert_eq!( Err(TransactionError::WouldExceedMaxBlockCostLimit), - check_block_cost_limits(&bank, &commit_results, &txs) + check_block_cost_limits(&bank, &processing_results, &txs) ); } } diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index 0fcdb217e05fae..5a16cc791b0d42 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -135,7 +135,7 @@ pub struct RecordTransactionsSummary { pub starting_transaction_index: Option, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct TransactionRecorder { // shared by all users of PohRecorder pub record_sender: Sender, @@ -1139,11 +1139,12 @@ impl PohRecorder { } } -pub fn create_test_recorder( +fn do_create_test_recorder( bank: Arc, blockstore: Arc, poh_config: Option, leader_schedule_cache: Option>, + track_transaction_indexes: bool, ) -> ( Arc, Arc>, @@ -1169,7 +1170,10 @@ pub fn create_test_recorder( ); let ticks_per_slot = bank.ticks_per_slot(); - poh_recorder.set_bank(BankWithScheduler::new_without_scheduler(bank), false); + poh_recorder.set_bank( + BankWithScheduler::new_without_scheduler(bank), + track_transaction_indexes, + ); let poh_recorder = Arc::new(RwLock::new(poh_recorder)); let poh_service = PohService::new( poh_recorder.clone(), @@ -1184,6 +1188,34 @@ pub fn create_test_recorder( (exit, poh_recorder, poh_service, entry_receiver) } +pub fn create_test_recorder( + bank: Arc, + blockstore: Arc, + poh_config: Option, + leader_schedule_cache: Option>, +) -> ( + Arc, + Arc>, + PohService, + Receiver, +) { + do_create_test_recorder(bank, blockstore, poh_config, leader_schedule_cache, false) +} + +pub fn create_test_recorder_with_index_tracking( + bank: Arc, + blockstore: Arc, + poh_config: Option, + leader_schedule_cache: Option>, +) -> ( + Arc, + Arc>, + PohService, + Receiver, +) { + do_create_test_recorder(bank, blockstore, poh_config, leader_schedule_cache, true) +} + #[cfg(test)] mod tests { use { diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index b163d9e8664892..fd4cd2c2049ed5 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6949,6 +6949,7 @@ dependencies = [ "solana-system-program", "solana-timings", "solana-transaction-status-client-types", + "solana-unified-scheduler-logic", "solana-version", "solana-vote", "solana-vote-program", @@ -8379,6 +8380,7 @@ dependencies = [ "qualifier_attr", "scopeguard", "solana-ledger", + "solana-poh", "solana-pubkey", "solana-runtime", "solana-runtime-transaction", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 1556a459f5e0c0..0687e594a9dead 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -82,6 +82,7 @@ solana-svm-transaction = { workspace = true } solana-system-program = { workspace = true, optional = true } solana-timings = { workspace = true } solana-transaction-status-client-types = { workspace = true } +solana-unified-scheduler-logic = { workspace = true } solana-version = { workspace = true } solana-vote = { workspace = true } solana-vote-program = { workspace = true } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index a37aa26460cb62..11b34db594c4dd 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -249,7 +249,7 @@ struct RentMetrics { pub type BankStatusCache = StatusCache>; #[cfg_attr( feature = "frozen-abi", - frozen_abi(digest = "BHg4qpwegtaJypLUqAdjQYzYeLfEGf6tA4U5cREbHMHi") + frozen_abi(digest = "4e7a7AAsQrM5Lp5bhREdVZ5QGZfyETbBthhWjYMYb6zS") )] pub type BankSlotDelta = SlotDelta>; @@ -343,7 +343,7 @@ pub struct TransactionSimulationResult { pub inner_instructions: Option>, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct TransactionBalancesSet { pub pre_balances: TransactionBalances, pub post_balances: TransactionBalances, @@ -360,6 +360,8 @@ impl TransactionBalancesSet { } pub type TransactionBalances = Vec>; +pub type PreCommitResult<'a> = Result>>; + #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] pub enum TransactionLogCollectorFilter { All, @@ -3787,52 +3789,56 @@ impl Bank { ) -> Vec { processing_results .into_iter() - .map(|processing_result| match processing_result? { - ProcessedTransaction::Executed(executed_tx) => { - let execution_details = executed_tx.execution_details; - let LoadedTransaction { - rent_debits, - accounts: loaded_accounts, - loaded_accounts_data_size, - fee_details, - .. - } = executed_tx.loaded_transaction; - - // Rent is only collected for successfully executed transactions - let rent_debits = if execution_details.was_successful() { - rent_debits - } else { - RentDebits::default() - }; + .map(|processing_result| { + let processing_result = processing_result?; + let executed_units = processing_result.executed_units(); + let loaded_accounts_data_size = processing_result.loaded_accounts_data_size(); + + match processing_result { + ProcessedTransaction::Executed(executed_tx) => { + let execution_details = executed_tx.execution_details; + let LoadedTransaction { + rent_debits, + accounts: loaded_accounts, + fee_details, + .. + } = executed_tx.loaded_transaction; + + // Rent is only collected for successfully executed transactions + let rent_debits = if execution_details.was_successful() { + rent_debits + } else { + RentDebits::default() + }; - Ok(CommittedTransaction { - status: execution_details.status, - log_messages: execution_details.log_messages, - inner_instructions: execution_details.inner_instructions, - return_data: execution_details.return_data, - executed_units: execution_details.executed_units, - fee_details, - rent_debits, + Ok(CommittedTransaction { + status: execution_details.status, + log_messages: execution_details.log_messages, + inner_instructions: execution_details.inner_instructions, + return_data: execution_details.return_data, + executed_units, + fee_details, + rent_debits, + loaded_account_stats: TransactionLoadedAccountsStats { + loaded_accounts_count: loaded_accounts.len(), + loaded_accounts_data_size, + }, + }) + } + ProcessedTransaction::FeesOnly(fees_only_tx) => Ok(CommittedTransaction { + status: Err(fees_only_tx.load_error), + log_messages: None, + inner_instructions: None, + return_data: None, + executed_units, + rent_debits: RentDebits::default(), + fee_details: fees_only_tx.fee_details, loaded_account_stats: TransactionLoadedAccountsStats { - loaded_accounts_count: loaded_accounts.len(), + loaded_accounts_count: fees_only_tx.rollback_accounts.count(), loaded_accounts_data_size, }, - }) + }), } - ProcessedTransaction::FeesOnly(fees_only_tx) => Ok(CommittedTransaction { - status: Err(fees_only_tx.load_error), - log_messages: None, - inner_instructions: None, - return_data: None, - executed_units: 0, - rent_debits: RentDebits::default(), - fee_details: fees_only_tx.fee_details, - loaded_account_stats: TransactionLoadedAccountsStats { - loaded_accounts_count: fees_only_tx.rollback_accounts.count(), - loaded_accounts_data_size: fees_only_tx.rollback_accounts.data_size() - as u32, - }, - }), }) .collect() } @@ -4520,6 +4526,54 @@ impl Bank { timings: &mut ExecuteTimings, log_messages_bytes_limit: Option, ) -> (Vec, TransactionBalancesSet) { + self.do_load_execute_and_commit_transactions_with_pre_commit_callback( + batch, + max_age, + collect_balances, + recording_config, + timings, + log_messages_bytes_limit, + None:: _>, + ) + .unwrap() + } + + pub fn load_execute_and_commit_transactions_with_pre_commit_callback<'a>( + &'a self, + batch: &TransactionBatch, + max_age: usize, + collect_balances: bool, + recording_config: ExecutionRecordingConfig, + timings: &mut ExecuteTimings, + log_messages_bytes_limit: Option, + pre_commit_callback: impl FnOnce( + &mut ExecuteTimings, + &[TransactionProcessingResult], + ) -> PreCommitResult<'a>, + ) -> Result<(Vec, TransactionBalancesSet)> { + self.do_load_execute_and_commit_transactions_with_pre_commit_callback( + batch, + max_age, + collect_balances, + recording_config, + timings, + log_messages_bytes_limit, + Some(pre_commit_callback), + ) + } + + fn do_load_execute_and_commit_transactions_with_pre_commit_callback<'a>( + &'a self, + batch: &TransactionBatch, + max_age: usize, + collect_balances: bool, + recording_config: ExecutionRecordingConfig, + timings: &mut ExecuteTimings, + log_messages_bytes_limit: Option, + pre_commit_callback: Option< + impl FnOnce(&mut ExecuteTimings, &[TransactionProcessingResult]) -> PreCommitResult<'a>, + >, + ) -> Result<(Vec, TransactionBalancesSet)> { let pre_balances = if collect_balances { self.collect_balances(batch) } else { @@ -4545,21 +4599,31 @@ impl Bank { }, ); + // pre_commit_callback could initiate an atomic operation (i.e. poh recording with block + // producing unified scheduler). in that case, it returns Some(freeze_lock), which should + // unlocked only after calling commit_transactions() immediately after calling the + // callback. + let freeze_lock = if let Some(pre_commit_callback) = pre_commit_callback { + pre_commit_callback(timings, &processing_results)? + } else { + None + }; let commit_results = self.commit_transactions( batch.sanitized_transactions(), processing_results, &processed_counts, timings, ); + drop(freeze_lock); let post_balances = if collect_balances { self.collect_balances(batch) } else { vec![] }; - ( + Ok(( commit_results, TransactionBalancesSet::new(pre_balances, post_balances), - ) + )) } /// Process a Transaction. This is used for unit tests and simply calls the vector diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs index fb6bb3dc79c732..69e806c5f48ce5 100644 --- a/runtime/src/installed_scheduler_pool.rs +++ b/runtime/src/installed_scheduler_pool.rs @@ -30,6 +30,7 @@ use { transaction::{Result, SanitizedTransaction, TransactionError}, }, solana_timings::ExecuteTimings, + solana_unified_scheduler_logic::SchedulingMode, std::{ fmt::{self, Debug}, mem, @@ -227,13 +228,29 @@ pub type SchedulerId = u64; /// `SchedulingContext`s. #[derive(Clone, Debug)] pub struct SchedulingContext { - // mode: SchedulingMode, // this will be added later. + mode: SchedulingMode, bank: Arc, } impl SchedulingContext { pub fn new(bank: Arc) -> Self { - Self { bank } + // mode will be configurable later + Self { + mode: SchedulingMode::BlockVerification, + bank, + } + } + + #[cfg(feature = "dev-context-only-utils")] + pub fn for_production(bank: Arc) -> Self { + Self { + mode: SchedulingMode::BlockProduction, + bank, + } + } + + pub fn mode(&self) -> SchedulingMode { + self.mode } pub fn bank(&self) -> &Arc { diff --git a/sdk/transaction-error/src/lib.rs b/sdk/transaction-error/src/lib.rs index 433a48b0122e31..2f5b72b960833f 100644 --- a/sdk/transaction-error/src/lib.rs +++ b/sdk/transaction-error/src/lib.rs @@ -137,6 +137,9 @@ pub enum TransactionError { /// Program cache hit max limit. ProgramCacheHitMaxLimit, + + /// Commit cancelled internally. + CommitCancelled, } impl std::error::Error for TransactionError {} @@ -220,6 +223,8 @@ impl fmt::Display for TransactionError { => f.write_str("Sum of account balances before and after transaction do not match"), Self::ProgramCacheHitMaxLimit => f.write_str("Program cache hit max limit"), + Self::CommitCancelled + => f.write_str("CommitCancelled"), } } } diff --git a/storage-proto/proto/transaction_by_addr.proto b/storage-proto/proto/transaction_by_addr.proto index d0fa74a2104707..5748b05655edba 100644 --- a/storage-proto/proto/transaction_by_addr.proto +++ b/storage-proto/proto/transaction_by_addr.proto @@ -63,6 +63,7 @@ enum TransactionErrorType { PROGRAM_EXECUTION_TEMPORARILY_RESTRICTED = 35; UNBALANCED_TRANSACTION = 36; PROGRAM_CACHE_HIT_MAX_LIMIT = 37; + COMMIT_CANCELLED = 38; } message InstructionError { diff --git a/storage-proto/src/convert.rs b/storage-proto/src/convert.rs index 13fb31d9665f2c..cd53e75cbbc7fa 100644 --- a/storage-proto/src/convert.rs +++ b/storage-proto/src/convert.rs @@ -852,6 +852,7 @@ impl TryFrom for TransactionError { 34 => TransactionError::ResanitizationNeeded, 36 => TransactionError::UnbalancedTransaction, 37 => TransactionError::ProgramCacheHitMaxLimit, + 38 => TransactionError::CommitCancelled, _ => return Err("Invalid TransactionError"), }) } @@ -973,6 +974,9 @@ impl From for tx_by_addr::TransactionError { TransactionError::ProgramCacheHitMaxLimit => { tx_by_addr::TransactionErrorType::ProgramCacheHitMaxLimit } + TransactionError::CommitCancelled => { + tx_by_addr::TransactionErrorType::CommitCancelled + } } as i32, instruction_error: match transaction_error { TransactionError::InstructionError(index, ref instruction_error) => { diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index 3230817a4da1d2..f38ef7446fa02c 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -6768,6 +6768,7 @@ dependencies = [ "solana-svm-transaction", "solana-timings", "solana-transaction-status-client-types", + "solana-unified-scheduler-logic", "solana-version", "solana-vote", "solana-vote-program", @@ -7715,6 +7716,7 @@ dependencies = [ "qualifier_attr", "scopeguard", "solana-ledger", + "solana-poh", "solana-pubkey", "solana-runtime", "solana-runtime-transaction", diff --git a/svm/src/transaction_processing_result.rs b/svm/src/transaction_processing_result.rs index c8da2a941c1a9b..52532988f8f3d7 100644 --- a/svm/src/transaction_processing_result.rs +++ b/svm/src/transaction_processing_result.rs @@ -87,4 +87,17 @@ impl ProcessedTransaction { Self::FeesOnly { .. } => None, } } + + pub fn executed_units(&self) -> u64 { + self.execution_details() + .map(|detail| detail.executed_units) + .unwrap_or_default() + } + + pub fn loaded_accounts_data_size(&self) -> u32 { + match self { + Self::Executed(context) => context.loaded_transaction.loaded_accounts_data_size, + Self::FeesOnly(details) => details.rollback_accounts.data_size() as u32, + } + } } diff --git a/transaction-status/src/token_balances.rs b/transaction-status/src/token_balances.rs index 85a85a053f910f..36b46552cc687f 100644 --- a/transaction-status/src/token_balances.rs +++ b/transaction-status/src/token_balances.rs @@ -2,6 +2,7 @@ use crate::TransactionTokenBalance; pub type TransactionTokenBalances = Vec>; +#[derive(Debug)] pub struct TransactionTokenBalancesSet { pub pre_token_balances: TransactionTokenBalances, pub post_token_balances: TransactionTokenBalances, diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index e8e7501fd998cd..c7b5e7a1e8ca76 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -105,6 +105,12 @@ use { std::{collections::VecDeque, mem, sync::Arc}, }; +#[derive(Clone, Copy, Debug)] +pub enum SchedulingMode { + BlockVerification, + BlockProduction, +} + /// Internal utilities. Namely this contains [`ShortCounter`] and [`TokenCell`]. mod utils { use std::{ diff --git a/unified-scheduler-pool/Cargo.toml b/unified-scheduler-pool/Cargo.toml index 3b0a0df66d0ec1..b720642a91906c 100644 --- a/unified-scheduler-pool/Cargo.toml +++ b/unified-scheduler-pool/Cargo.toml @@ -18,6 +18,7 @@ log = { workspace = true } qualifier_attr = { workspace = true } scopeguard = { workspace = true } solana-ledger = { workspace = true } +solana-poh = { workspace = true } solana-pubkey = { workspace = true } solana-runtime = { workspace = true } solana-runtime-transaction = { workspace = true } @@ -32,10 +33,13 @@ vec_extract_if_polyfill = { workspace = true } assert_matches = { workspace = true } lazy_static = { workspace = true } solana-clock = { workspace = true } +solana-entry = { workspace = true } +solana-hash = { workspace = true } solana-keypair = { workspace = true } solana-logger = { workspace = true } solana-runtime = { workspace = true, features = ["dev-context-only-utils"] } solana-system-transaction = { workspace = true } +test-case = { workspace = true } [features] dev-context-only-utils = [] diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index c496ab455e1828..f926945c769089 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -20,6 +20,7 @@ use { solana_ledger::blockstore_processor::{ execute_batch, TransactionBatchWithIndexes, TransactionStatusSender, }, + solana_poh::poh_recorder::{RecordTransactionsSummary, TransactionRecorder}, solana_pubkey::Pubkey, solana_runtime::{ installed_scheduler_pool::{ @@ -35,7 +36,10 @@ use { solana_timings::ExecuteTimings, solana_transaction::sanitized::SanitizedTransaction, solana_transaction_error::{TransactionError, TransactionResult as Result}, - solana_unified_scheduler_logic::{SchedulingStateMachine, Task, UsageQueue}, + solana_unified_scheduler_logic::{ + SchedulingMode::{BlockProduction, BlockVerification}, + SchedulingStateMachine, Task, UsageQueue, + }, static_assertions::const_assert_eq, std::{ fmt::Debug, @@ -100,6 +104,7 @@ pub struct HandlerContext { transaction_status_sender: Option, replay_vote_sender: Option, prioritization_fee_cache: Arc, + transaction_recorder: Option, } pub type DefaultSchedulerPool = @@ -176,6 +181,8 @@ where transaction_status_sender, replay_vote_sender, prioritization_fee_cache, + // will be configurable later + transaction_recorder: None, }, weak_self: weak_self.clone(), next_scheduler_id: AtomicSchedulerId::default(), @@ -436,9 +443,47 @@ impl TaskHandler for DefaultTaskHandler { let index = task.task_index(); let batch = bank.prepare_unlocked_batch_from_single_tx(transaction); + let transaction_indexes = match scheduling_context.mode() { + BlockVerification => vec![index], + BlockProduction => { + // Create a placeholder vec, which will be populated later if + // transaction_status_sender is Some(_). + // transaction_status_sender is usually None for staked nodes because it's only + // used for RPC-related additional data recording. However, a staked node could + // also be running with rpc functionalities during development. So, we need to + // correctly support the use case for produced blocks as well, like verified blocks + // via the replaying stage. + // Refer `record_token_balances` in `execute_batch()` as this treatment is mirrored + // from it. + vec![] + } + }; let batch_with_indexes = TransactionBatchWithIndexes { batch, - transaction_indexes: vec![index], + transaction_indexes, + }; + + let pre_commit_callback = match scheduling_context.mode() { + BlockVerification => None, + BlockProduction => Some(|processing_result: &'_ Result<_>| { + if let Err(error) = processing_result { + Err(error.clone())?; + }; + + let RecordTransactionsSummary { + result, + starting_transaction_index, + .. + } = handler_context + .transaction_recorder + .as_ref() + .unwrap() + .record_transactions(bank.slot(), vec![transaction.to_versioned_transaction()]); + match result { + Ok(()) => Ok(starting_transaction_index), + Err(_) => Err(TransactionError::CommitCancelled), + } + }), }; *result = execute_batch( @@ -449,6 +494,7 @@ impl TaskHandler for DefaultTaskHandler { timings, handler_context.log_messages_bytes_limit, &handler_context.prioritization_fee_cache, + pre_commit_callback, ); sleepless_testing::at(CheckPoint::TaskHandled(index)); } @@ -1473,7 +1519,15 @@ mod tests { crate::sleepless_testing, assert_matches::assert_matches, solana_clock::{Slot, MAX_PROCESSING_AGE}, + solana_hash::Hash, solana_keypair::Keypair, + solana_ledger::{ + blockstore::Blockstore, + blockstore_processor::{TransactionStatusBatch, TransactionStatusMessage}, + create_new_tmp_ledger_auto_delete, + leader_schedule_cache::LeaderScheduleCache, + }, + solana_poh::poh_recorder::create_test_recorder_with_index_tracking, solana_pubkey::Pubkey, solana_runtime::{ bank::Bank, @@ -1487,9 +1541,10 @@ mod tests { solana_transaction::sanitized::SanitizedTransaction, solana_transaction_error::TransactionError, std::{ - sync::{Arc, RwLock}, + sync::{atomic::Ordering, Arc, RwLock}, thread::JoinHandle, }, + test_case::test_matrix, }; #[derive(Debug)] @@ -2937,10 +2992,146 @@ mod tests { transaction_status_sender: None, replay_vote_sender: None, prioritization_fee_cache, + transaction_recorder: None, }; let task = SchedulingStateMachine::create_task(tx, 0, &mut |_| UsageQueue::default()); DefaultTaskHandler::handle(result, timings, scheduling_context, &task, handler_context); assert_matches!(result, Err(TransactionError::AccountLoadedTwice)); } + + enum TxResult { + ExecutedWithSuccess, + ExecutedWithFailure, + NotExecuted, + } + + #[test_matrix( + [TxResult::ExecutedWithSuccess, TxResult::ExecutedWithFailure, TxResult::NotExecuted], + [false, true] + )] + fn test_task_handler_poh_recording(tx_result: TxResult, should_succeed_to_record_to_poh: bool) { + solana_logger::setup(); + + let GenesisConfigInfo { + genesis_config, + ref mint_keypair, + .. + } = solana_ledger::genesis_utils::create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let bank_forks = BankForks::new_rw_arc(bank); + let bank = bank_forks.read().unwrap().working_bank_with_scheduler(); + + let (tx, expected_tx_result) = match tx_result { + TxResult::ExecutedWithSuccess => ( + system_transaction::transfer( + mint_keypair, + &solana_pubkey::new_rand(), + 1, + genesis_config.hash(), + ), + Ok(()), + ), + TxResult::ExecutedWithFailure => ( + system_transaction::transfer( + mint_keypair, + &solana_pubkey::new_rand(), + 1_000_000, + genesis_config.hash(), + ), + Ok(()), + ), + TxResult::NotExecuted => ( + system_transaction::transfer( + mint_keypair, + &solana_pubkey::new_rand(), + 1, + Hash::default(), + ), + Err(TransactionError::BlockhashNotFound), + ), + }; + let tx = RuntimeTransaction::from_transaction_for_tests(tx); + + let result = &mut Ok(()); + let timings = &mut ExecuteTimings::default(); + let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let scheduling_context = &SchedulingContext::for_production(bank.clone()); + let (sender, receiver) = crossbeam_channel::unbounded(); + let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config); + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); + let (exit, poh_recorder, poh_service, signal_receiver) = + create_test_recorder_with_index_tracking( + bank.clone(), + blockstore.clone(), + None, + Some(leader_schedule_cache), + ); + let handler_context = &HandlerContext { + log_messages_bytes_limit: None, + transaction_status_sender: Some(TransactionStatusSender { sender }), + replay_vote_sender: None, + prioritization_fee_cache, + transaction_recorder: Some(poh_recorder.read().unwrap().new_recorder()), + }; + + let task = + SchedulingStateMachine::create_task(tx.clone(), 0, &mut |_| UsageQueue::default()); + + // wait until the poh's working bank is cleared. + // also flush signal_receiver after that. + if !should_succeed_to_record_to_poh { + while poh_recorder.read().unwrap().bank().is_some() { + sleep(Duration::from_millis(100)); + } + while signal_receiver.try_recv().is_ok() {} + } + + assert_eq!(bank.transaction_count(), 0); + assert_eq!(bank.transaction_error_count(), 0); + DefaultTaskHandler::handle(result, timings, scheduling_context, &task, handler_context); + + if should_succeed_to_record_to_poh { + if expected_tx_result.is_ok() { + assert_matches!(result, Ok(())); + assert_eq!(bank.transaction_count(), 1); + if matches!(tx_result, TxResult::ExecutedWithFailure) { + assert_eq!(bank.transaction_error_count(), 1); + } else { + assert_eq!(bank.transaction_error_count(), 0); + } + assert_matches!( + receiver.try_recv(), + Ok(TransactionStatusMessage::Batch( + TransactionStatusBatch { .. } + )) + ); + assert_matches!( + signal_receiver.try_recv(), + Ok((_, (solana_entry::entry::Entry {transactions, ..} , _))) + if transactions == vec![tx.to_versioned_transaction()] + ); + } else { + assert_eq!(result, &expected_tx_result); + assert_eq!(bank.transaction_count(), 0); + assert_eq!(bank.transaction_error_count(), 0); + assert_matches!(receiver.try_recv(), Err(_)); + assert_matches!(signal_receiver.try_recv(), Err(_)); + } + } else { + if expected_tx_result.is_ok() { + assert_matches!(result, Err(TransactionError::CommitCancelled)); + } else { + assert_eq!(result, &expected_tx_result); + } + + assert_eq!(bank.transaction_count(), 0); + assert_matches!(receiver.try_recv(), Err(_)); + assert_matches!(signal_receiver.try_recv(), Err(_)); + } + + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + } }