diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 928a67dc4d84ca..62a604cc5e1392 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -13,7 +13,10 @@ use { hash::Hash, nonce_account, pubkey::Pubkey, saturating_add_assign, signature::Signature, }, std::{ - collections::hash_map::{Entry, HashMap}, + collections::{ + hash_map::{Entry, HashMap}, + HashSet, + }, net::SocketAddr, sync::{ atomic::{AtomicBool, Ordering}, @@ -88,16 +91,6 @@ impl TransactionInfo { last_sent_time, } } - - fn get_max_retries( - &self, - default_max_retries: Option, - service_max_retries: usize, - ) -> Option { - self.max_retries - .or(default_max_retries) - .map(|max_retries| max_retries.min(service_max_retries)) - } } #[derive(Default, Debug, PartialEq, Eq)] @@ -108,7 +101,6 @@ struct ProcessTransactionsResult { max_retries_elapsed: u64, failed: u64, retained: u64, - last_sent_time: Option, } #[derive(Clone, Debug)] @@ -143,7 +135,7 @@ impl Default for Config { /// The maximum duration the retry thread may be configured to sleep before /// processing the transactions that need to be retried. -pub const MAX_RETRY_SLEEP_MS: u64 = 1_000; +pub const MAX_RETRY_SLEEP_MS: u64 = 1000; impl SendTransactionService { pub fn new( @@ -176,8 +168,6 @@ impl SendTransactionService { client.clone(), retry_transactions.clone(), stats_report.clone(), - config.service_max_retries, - config.default_max_retries, config.batch_send_rate_ms, config.batch_size, config.retry_pool_max_size, @@ -203,14 +193,11 @@ impl SendTransactionService { } /// Thread responsible for receiving transactions from RPC clients. - #[allow(clippy::too_many_arguments)] fn receive_txn_thread( receiver: Receiver, client: Client, retry_transactions: Arc>>, stats_report: Arc, - service_max_retries: usize, - default_max_retries: Option, batch_send_rate_ms: u64, batch_size: usize, retry_pool_max_size: usize, @@ -274,17 +261,9 @@ impl SendTransactionService { { // take a lock of retry_transactions and move the batch to the retry set. let mut retry_transactions = retry_transactions.lock().unwrap(); - let mut transactions_to_retry: usize = 0; + let transactions_to_retry = transactions.len(); let mut transactions_added_to_retry: usize = 0; for (signature, mut transaction_info) in transactions.drain() { - // drop transactions with 0 max retries - let max_retries = transaction_info - .get_max_retries(default_max_retries, service_max_retries); - if max_retries == Some(0) { - continue; - } - transactions_to_retry += 1; - let retry_len = retry_transactions.len(); let entry = retry_transactions.entry(signature); if let Entry::Vacant(_) = entry { @@ -326,20 +305,19 @@ impl SendTransactionService { exit: Arc, ) -> JoinHandle<()> { info!("Starting send-transaction-service::retry_thread with config."); - let retry_interval_ms_default = MAX_RETRY_SLEEP_MS.min(retry_rate_ms); - let mut retry_interval_ms = retry_interval_ms_default; Builder::new() .name("solStxRetry".to_string()) .spawn(move || loop { - sleep(Duration::from_millis(retry_interval_ms)); + let retry_interval_ms = retry_rate_ms; + let stats = &stats_report.stats; + sleep(Duration::from_millis( + MAX_RETRY_SLEEP_MS.min(retry_interval_ms), + )); if exit.load(Ordering::Relaxed) { break; } let mut transactions = retry_transactions.lock().unwrap(); - if transactions.is_empty() { - retry_interval_ms = retry_interval_ms_default; - } else { - let stats = &stats_report.stats; + if !transactions.is_empty() { stats .retry_queue_size .store(transactions.len() as u64, Ordering::Relaxed); @@ -348,7 +326,7 @@ impl SendTransactionService { (bank_forks.root_bank(), bank_forks.working_bank()) }; - let result = Self::process_transactions( + let _result = Self::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -360,17 +338,6 @@ impl SendTransactionService { stats, ); stats_report.report(); - - // to send transactions as soon as possible we adjust retry interval - retry_interval_ms = retry_interval_ms_default - .checked_sub( - result - .last_sent_time - .and_then(|last| Instant::now().checked_duration_since(last)) - .and_then(|interval| interval.as_millis().try_into().ok()) - .unwrap_or(0), - ) - .unwrap_or(retry_interval_ms_default); } }) .unwrap() @@ -390,8 +357,7 @@ impl SendTransactionService { ) -> ProcessTransactionsResult { let mut result = ProcessTransactionsResult::default(); - let mut batched_transactions = Vec::new(); - let mut exceeded_retries_transactions = Vec::new(); + let mut batched_transactions = HashSet::new(); let retry_rate = Duration::from_millis(retry_rate_ms); transactions.retain(|signature, transaction_info| { @@ -410,8 +376,7 @@ impl SendTransactionService { let now = Instant::now(); let expired = transaction_info .last_sent_time - .and_then(|last| now.checked_duration_since(last)) - .map(|elapsed| elapsed >= retry_rate) + .map(|last| now.duration_since(last) >= retry_rate) .unwrap_or(false); let verify_nonce_account = nonce_account::verify_nonce_account(&nonce_account, &durable_nonce); @@ -450,36 +415,21 @@ impl SendTransactionService { let now = Instant::now(); let need_send = transaction_info .last_sent_time - .and_then(|last| now.checked_duration_since(last)) - .map(|elapsed| elapsed >= retry_rate) + .map(|last| now.duration_since(last) >= retry_rate) .unwrap_or(true); if need_send { if transaction_info.last_sent_time.is_some() { // Transaction sent before is unknown to the working bank, it might have been - // dropped or landed in another fork. Re-send it. + // dropped or landed in another fork. Re-send it info!("Retrying transaction: {}", signature); result.retried += 1; transaction_info.retries += 1; + stats.retries.fetch_add(1, Ordering::Relaxed); } - batched_transactions.push(*signature); + batched_transactions.insert(*signature); transaction_info.last_sent_time = Some(now); - - let max_retries = transaction_info - .get_max_retries(default_max_retries, service_max_retries); - if let Some(max_retries) = max_retries { - if transaction_info.retries >= max_retries { - exceeded_retries_transactions.push(*signature); - } - } - } else if let Some(last) = transaction_info.last_sent_time { - result.last_sent_time = Some( - result - .last_sent_time - .map(|result_last| result_last.min(last)) - .unwrap_or(last), - ); } true } @@ -497,14 +447,12 @@ impl SendTransactionService { } }); - stats.retries.fetch_add(result.retried, Ordering::Relaxed); - if !batched_transactions.is_empty() { // Processing the transactions in batch - let wire_transactions = batched_transactions + let wire_transactions = transactions .iter() - .filter_map(|signature| transactions.get(signature)) - .map(|transaction_info| transaction_info.wire_transaction.clone()); + .filter(|(signature, _)| batched_transactions.contains(signature)) + .map(|(_, transaction_info)| transaction_info.wire_transaction.clone()); let iter = wire_transactions.chunks(batch_size); for chunk in &iter { @@ -512,16 +460,6 @@ impl SendTransactionService { client.send_transactions_in_batch(chunk, stats); } } - - result.max_retries_elapsed += exceeded_retries_transactions.len() as u64; - stats - .transactions_exceeding_max_retries - .fetch_add(result.max_retries_elapsed, Ordering::Relaxed); - for signature in exceeded_retries_transactions { - info!("Dropping transaction due to max retries: {signature}"); - transactions.remove(&signature); - } - result } @@ -883,12 +821,31 @@ mod test { config.batch_size, &stats, ); - assert!(transactions.is_empty()); + assert_eq!(transactions.len(), 1); assert_eq!( result, ProcessTransactionsResult { retried: 1, - max_retries_elapsed: 2, + max_retries_elapsed: 1, + ..ProcessTransactionsResult::default() + } + ); + let result = SendTransactionService::process_transactions( + &working_bank, + &root_bank, + &mut transactions, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, + &stats, + ); + assert!(transactions.is_empty()); + assert_eq!( + result, + ProcessTransactionsResult { + max_retries_elapsed: 1, ..ProcessTransactionsResult::default() } );