diff --git a/Cargo.lock b/Cargo.lock index a7b643f06c62d0..5ab5d59095b085 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9129,6 +9129,7 @@ name = "solana-send-transaction-service" version = "2.2.0" dependencies = [ "crossbeam-channel", + "itertools 0.12.1", "log", "solana-client", "solana-connection-cache", diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 5c01911b5f9c6c..c9b7f13cac7e3a 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -7670,6 +7670,7 @@ name = "solana-send-transaction-service" version = "2.2.0" dependencies = [ "crossbeam-channel", + "itertools 0.12.1", "log", "solana-client", "solana-connection-cache", diff --git a/send-transaction-service/Cargo.toml b/send-transaction-service/Cargo.toml index a69c366a358fdc..07ad3f5a5b886c 100644 --- a/send-transaction-service/Cargo.toml +++ b/send-transaction-service/Cargo.toml @@ -11,6 +11,7 @@ edition = { workspace = true } [dependencies] crossbeam-channel = { workspace = true } +itertools = { workspace = true } log = { workspace = true } solana-client = { workspace = true } solana-connection-cache = { workspace = true } diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 8cc21b12359639..dbe89367f685a0 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -1,14 +1,15 @@ use { crate::tpu_info::TpuInfo, crossbeam_channel::{Receiver, RecvTimeoutError}, - log::*, - solana_client::connection_cache::{ConnectionCache, Protocol}, + itertools::Itertools, + log::{warn, *}, + solana_client::connection_cache::ConnectionCache, solana_connection_cache::client_connection::ClientConnection as TpuConnection, solana_measure::measure::Measure, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ - clock::Slot, hash::Hash, nonce_account, pubkey::Pubkey, saturating_add_assign, - signature::Signature, timing::AtomicInterval, transport::TransportError, + hash::Hash, nonce_account, pubkey::Pubkey, saturating_add_assign, signature::Signature, + timing::AtomicInterval, }, std::{ collections::{ @@ -186,60 +187,60 @@ where /// Metrics of the send-transaction-service. #[derive(Default)] -struct SendTransactionServiceStats { +pub struct SendTransactionServiceStats { /// Count of the received transactions - received_transactions: AtomicU64, + pub received_transactions: AtomicU64, /// Count of the received duplicate transactions - received_duplicate_transactions: AtomicU64, + pub received_duplicate_transactions: AtomicU64, /// Count of transactions sent in batch - sent_transactions: AtomicU64, + pub sent_transactions: AtomicU64, /// Count of transactions not being added to retry queue /// due to queue size limit - retry_queue_overflow: AtomicU64, + pub retry_queue_overflow: AtomicU64, /// retry queue size - retry_queue_size: AtomicU64, + pub retry_queue_size: AtomicU64, /// The count of calls of sending transactions which can be in batch or single. - send_attempt_count: AtomicU64, + pub send_attempt_count: AtomicU64, /// Time spent on transactions in micro seconds - send_us: AtomicU64, + pub send_us: AtomicU64, /// Send failure count - send_failure_count: AtomicU64, + pub send_failure_count: AtomicU64, /// Count of nonced transactions - nonced_transactions: AtomicU64, + pub nonced_transactions: AtomicU64, /// Count of rooted transactions - rooted_transactions: AtomicU64, + pub rooted_transactions: AtomicU64, /// Count of expired transactions - expired_transactions: AtomicU64, + pub expired_transactions: AtomicU64, /// Count of transactions exceeding max retries - transactions_exceeding_max_retries: AtomicU64, + pub transactions_exceeding_max_retries: AtomicU64, /// Count of retries of transactions - retries: AtomicU64, + pub retries: AtomicU64, /// Count of transactions failed - failed_transactions: AtomicU64, + pub failed_transactions: AtomicU64, } #[derive(Default)] -struct SendTransactionServiceStatsReport { - stats: SendTransactionServiceStats, +pub(crate) struct SendTransactionServiceStatsReport { + pub stats: SendTransactionServiceStats, last_report: AtomicInterval, } impl SendTransactionServiceStatsReport { /// report metrics of the send transaction service - fn report(&self) { + pub fn report(&self) { if self .last_report .should_update(SEND_TRANSACTION_METRICS_REPORT_RATE_MS) @@ -325,7 +326,7 @@ impl SendTransactionServiceStatsReport { } } -/// Report the send transaction memtrics for every 5 seconds. +/// Report the send transaction metrics for every 5 seconds. const SEND_TRANSACTION_METRICS_REPORT_RATE_MS: u64 = 5000; impl SendTransactionService { @@ -368,26 +369,33 @@ impl SendTransactionService { let retry_transactions = Arc::new(Mutex::new(HashMap::new())); - let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(leader_info))); + let client = ConnectionCacheClient::new( + connection_cache.clone(), + tpu_address, + config.tpu_peers, + leader_info, + config.leader_forward_count, + ); let receive_txn_thread = Self::receive_txn_thread( - tpu_address, receiver, - leader_info_provider.clone(), - connection_cache.clone(), - config.clone(), + client.clone(), retry_transactions.clone(), stats_report.clone(), + config.batch_send_rate_ms, + config.batch_size, + config.retry_pool_max_size, exit.clone(), ); let retry_thread = Self::retry_thread( - tpu_address, bank_forks.clone(), - leader_info_provider, - connection_cache.clone(), - config, + client, retry_transactions, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, stats_report, exit.clone(), ); @@ -400,26 +408,23 @@ impl SendTransactionService { /// Thread responsible for receiving transactions from RPC clients. fn receive_txn_thread( - tpu_address: SocketAddr, receiver: Receiver, - leader_info_provider: Arc>>, - connection_cache: Arc, - config: Config, + client: ConnectionCacheClient, retry_transactions: Arc>>, stats_report: Arc, + batch_send_rate_ms: u64, + batch_size: usize, + retry_pool_max_size: usize, exit: Arc, ) -> JoinHandle<()> { let mut last_batch_sent = Instant::now(); let mut transactions = HashMap::new(); - info!( - "Starting send-transaction-service::receive_txn_thread with config {:?}", - config - ); + info!("Starting send-transaction-service::receive_txn_thread with config.",); Builder::new() .name("solStxReceive".to_string()) .spawn(move || loop { - let recv_timeout_ms = config.batch_send_rate_ms; + let recv_timeout_ms = batch_send_rate_ms; let stats = &stats_report.stats; let recv_result = receiver.recv_timeout(Duration::from_millis(recv_timeout_ms)); if exit.load(Ordering::Relaxed) { @@ -455,20 +460,17 @@ impl SendTransactionService { } if (!transactions.is_empty() - && last_batch_sent.elapsed().as_millis() as u64 >= config.batch_send_rate_ms) - || transactions.len() >= config.batch_size + && last_batch_sent.elapsed().as_millis() as u64 >= batch_send_rate_ms) + || transactions.len() >= batch_size { stats .sent_transactions .fetch_add(transactions.len() as u64, Ordering::Relaxed); - Self::send_transactions_in_batch( - &tpu_address, - &transactions, - leader_info_provider.lock().unwrap().get_leader_info(), - &connection_cache, - &config, - stats, - ); + let wire_transactions = transactions + .values() + .map(|transaction_info| transaction_info.wire_transaction.clone()) + .collect::>>(); + client.send_transactions_in_batch(wire_transactions, stats); let last_sent_time = Instant::now(); { // take a lock of retry_transactions and move the batch to the retry set. @@ -479,7 +481,7 @@ impl SendTransactionService { let retry_len = retry_transactions.len(); let entry = retry_transactions.entry(signature); if let Entry::Vacant(_) = entry { - if retry_len >= config.retry_pool_max_size { + if retry_len >= retry_pool_max_size { break; } else { transaction_info.last_sent_time = Some(last_sent_time); @@ -506,23 +508,21 @@ impl SendTransactionService { /// Thread responsible for retrying transactions fn retry_thread( - tpu_address: SocketAddr, bank_forks: Arc>, - leader_info_provider: Arc>>, - connection_cache: Arc, - config: Config, + client: ConnectionCacheClient, retry_transactions: Arc>>, + retry_rate_ms: u64, + service_max_retries: usize, + default_max_retries: Option, + batch_size: usize, stats_report: Arc, exit: Arc, ) -> JoinHandle<()> { - info!( - "Starting send-transaction-service::retry_thread with config {:?}", - config - ); + info!("Starting send-transaction-service::retry_thread with config."); Builder::new() .name("solStxRetry".to_string()) .spawn(move || loop { - let retry_interval_ms = config.retry_rate_ms; + let retry_interval_ms = retry_rate_ms; let stats = &stats_report.stats; sleep(Duration::from_millis( MAX_RETRY_SLEEP_MS.min(retry_interval_ms), @@ -543,11 +543,12 @@ impl SendTransactionService { let _result = Self::process_transactions( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + retry_rate_ms, + service_max_retries, + default_max_retries, + batch_size, stats, ); stats_report.report(); @@ -556,60 +557,22 @@ impl SendTransactionService { .unwrap() } - /// Process transactions in batch. - fn send_transactions_in_batch( - tpu_address: &SocketAddr, - transactions: &HashMap, - leader_info: Option<&T>, - connection_cache: &Arc, - config: &Config, - stats: &SendTransactionServiceStats, - ) { - // Processing the transactions in batch - let mut addresses = config - .tpu_peers - .as_ref() - .map(|addrs| addrs.iter().map(|a| (a, 0)).collect::>()) - .unwrap_or_default(); - let leader_addresses = Self::get_tpu_addresses_with_slots( - tpu_address, - leader_info, - config, - connection_cache.protocol(), - ); - addresses.extend(leader_addresses); - - let wire_transactions = transactions - .iter() - .map(|(_, transaction_info)| { - debug!( - "Sending transacation {} to (address, slot): {:?}", - transaction_info.signature, addresses, - ); - transaction_info.wire_transaction.as_ref() - }) - .collect::>(); - - for (address, _) in &addresses { - Self::send_transactions(address, &wire_transactions, connection_cache, stats); - } - } - /// Retry transactions sent before. fn process_transactions( working_bank: &Bank, root_bank: &Bank, - tpu_address: &SocketAddr, transactions: &mut HashMap, - leader_info_provider: &Arc>>, - connection_cache: &Arc, - config: &Config, + client: &ConnectionCacheClient, + retry_rate_ms: u64, + service_max_retries: usize, + default_max_retries: Option, + batch_size: usize, stats: &SendTransactionServiceStats, ) -> ProcessTransactionsResult { let mut result = ProcessTransactionsResult::default(); let mut batched_transactions = HashSet::new(); - let retry_rate = Duration::from_millis(config.retry_rate_ms); + let retry_rate = Duration::from_millis(retry_rate_ms); transactions.retain(|signature, transaction_info| { if transaction_info.durable_nonce_info.is_some() { @@ -647,8 +610,8 @@ impl SendTransactionService { let max_retries = transaction_info .max_retries - .or(config.default_max_retries) - .map(|max_retries| max_retries.min(config.service_max_retries)); + .or(default_max_retries) + .map(|max_retries| max_retries.min(service_max_retries)); if let Some(max_retries) = max_retries { if transaction_info.retries >= max_retries { @@ -703,70 +666,101 @@ impl SendTransactionService { let wire_transactions = transactions .iter() .filter(|(signature, _)| batched_transactions.contains(signature)) - .map(|(_, transaction_info)| transaction_info.wire_transaction.as_ref()) - .collect::>(); - - let iter = wire_transactions.chunks(config.batch_size); - for chunk in iter { - let mut addresses = config - .tpu_peers - .as_ref() - .map(|addrs| addrs.iter().collect::>()) - .unwrap_or_default(); - let mut leader_info_provider = leader_info_provider.lock().unwrap(); - let leader_info = leader_info_provider.get_leader_info(); - let leader_addresses = Self::get_tpu_addresses( - tpu_address, - leader_info, - config, - connection_cache.protocol(), - ); - addresses.extend(leader_addresses); - - for address in &addresses { - Self::send_transactions(address, chunk, connection_cache, stats); - } + .map(|(_, transaction_info)| transaction_info.wire_transaction.clone()); + + let iter = wire_transactions.chunks(batch_size); + for chunk in &iter { + let chunk = chunk.collect(); + client.send_transactions_in_batch(chunk, stats); } } result } - fn send_transaction( - tpu_address: &SocketAddr, - wire_transaction: &[u8], - connection_cache: &Arc, - ) -> Result<(), TransportError> { - let conn = connection_cache.get_connection(tpu_address); - conn.send_data_async(wire_transaction.to_vec()) + pub fn join(self) -> thread::Result<()> { + self.receive_txn_thread.join()?; + self.exit.store(true, Ordering::Relaxed); + self.retry_thread.join() } +} - fn send_transactions_with_metrics( - tpu_address: &SocketAddr, - wire_transactions: &[&[u8]], - connection_cache: &Arc, - ) -> Result<(), TransportError> { - let wire_transactions = wire_transactions.iter().map(|t| t.to_vec()).collect(); - let conn = connection_cache.get_connection(tpu_address); - conn.send_data_batch_async(wire_transactions) +pub trait TransactionClient { + fn send_transactions_in_batch( + &self, + wire_transactions: Vec>, + stats: &SendTransactionServiceStats, + ); +} + +pub struct ConnectionCacheClient { + connection_cache: Arc, + tpu_address: SocketAddr, + tpu_peers: Option>, + leader_info_provider: Arc>>, + leader_forward_count: u64, +} + +// Manual implementation of Clone without requiring T to be Clone +impl Clone for ConnectionCacheClient +where + T: TpuInfo + std::marker::Send + 'static, +{ + fn clone(&self) -> Self { + Self { + connection_cache: Arc::clone(&self.connection_cache), + tpu_address: self.tpu_address, + tpu_peers: self.tpu_peers.clone(), + leader_info_provider: Arc::clone(&self.leader_info_provider), + leader_forward_count: self.leader_forward_count, + } + } +} + +impl ConnectionCacheClient +where + T: TpuInfo + std::marker::Send + 'static, +{ + pub fn new( + connection_cache: Arc, + tpu_address: SocketAddr, + tpu_peers: Option>, + leader_info: Option, + leader_forward_count: u64, + ) -> Self { + let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(leader_info))); + Self { + connection_cache, + tpu_address, + tpu_peers, + leader_info_provider, + leader_forward_count, + } + } + + fn get_tpu_addresses<'a>(&'a self, leader_info: Option<&'a T>) -> Vec<&'a SocketAddr> { + leader_info + .map(|leader_info| { + leader_info + .get_leader_tpus(self.leader_forward_count, self.connection_cache.protocol()) + }) + .filter(|addresses| !addresses.is_empty()) + .unwrap_or_else(|| vec![&self.tpu_address]) } fn send_transactions( - tpu_address: &SocketAddr, - wire_transactions: &[&[u8]], - connection_cache: &Arc, + &self, + peer: &SocketAddr, + wire_transactions: Vec>, stats: &SendTransactionServiceStats, ) { let mut measure = Measure::start("send-us"); - let result = if wire_transactions.len() == 1 { - Self::send_transaction(tpu_address, wire_transactions[0], connection_cache) - } else { - Self::send_transactions_with_metrics(tpu_address, wire_transactions, connection_cache) - }; + let conn = self.connection_cache.get_connection(peer); + let result = conn.send_data_batch_async(wire_transactions); if let Err(err) = result { warn!( "Failed to send transaction transaction to {}: {:?}", - tpu_address, err + self.tpu_address, err ); stats.send_failure_count.fetch_add(1, Ordering::Relaxed); } @@ -775,46 +769,31 @@ impl SendTransactionService { stats.send_us.fetch_add(measure.as_us(), Ordering::Relaxed); stats.send_attempt_count.fetch_add(1, Ordering::Relaxed); } +} - fn get_tpu_addresses<'a, T: TpuInfo>( - tpu_address: &'a SocketAddr, - leader_info: Option<&'a T>, - config: &'a Config, - protocol: Protocol, - ) -> Vec<&'a SocketAddr> { - let addresses = leader_info - .as_ref() - .map(|leader_info| leader_info.get_leader_tpus(config.leader_forward_count, protocol)); - addresses - .map(|address_list| { - if address_list.is_empty() { - vec![tpu_address] - } else { - address_list - } - }) - .unwrap_or_else(|| vec![tpu_address]) - } - - fn get_tpu_addresses_with_slots<'a, T: TpuInfo>( - tpu_address: &'a SocketAddr, - leader_info: Option<&'a T>, - config: &'a Config, - protocol: Protocol, - ) -> Vec<(&'a SocketAddr, Slot)> { - leader_info +impl TransactionClient for ConnectionCacheClient +where + T: TpuInfo + std::marker::Send + 'static, +{ + fn send_transactions_in_batch( + &self, + wire_transactions: Vec>, + stats: &SendTransactionServiceStats, + ) { + // Processing the transactions in batch + let mut addresses = self + .tpu_peers .as_ref() - .map(|leader_info| { - leader_info.get_leader_tpus_with_slots(config.leader_forward_count, protocol) - }) - .filter(|addresses| !addresses.is_empty()) - .unwrap_or_else(|| vec![(tpu_address, 0)]) - } + .map(|addrs| addrs.iter().collect::>()) + .unwrap_or_default(); + let mut leader_info_provider = self.leader_info_provider.lock().unwrap(); + let leader_info = leader_info_provider.get_leader_info(); + let leader_addresses = self.get_tpu_addresses(leader_info); + addresses.extend(leader_addresses); - pub fn join(self) -> thread::Result<()> { - self.receive_txn_thread.join()?; - self.exit.store(true, Ordering::Relaxed); - self.retry_thread.join() + for address in &addresses { + self.send_transactions(address, wire_transactions.clone(), stats); + } } } @@ -900,6 +879,22 @@ mod test { } } + fn create_client( + tpu_peers: Option>, + leader_forward_count: u64, + ) -> ConnectionCacheClient { + let tpu_address = "127.0.0.1:0".parse().unwrap(); + let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); + + ConnectionCacheClient::new( + connection_cache, + tpu_address, + tpu_peers, + None, + leader_forward_count, + ) + } + #[test] fn process_transactions() { solana_logger::setup(); @@ -907,7 +902,6 @@ mod test { let (mut genesis_config, mint_keypair) = create_genesis_config(4); genesis_config.fee_rate_governor = solana_sdk::fee_calculator::FeeRateGovernor::new(0, 0); let (_, bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config); - let tpu_address = "127.0.0.1:0".parse().unwrap(); let config = Config { leader_forward_count: 1, ..Config::default() @@ -954,7 +948,6 @@ mod test { let mut transactions = HashMap::new(); info!("Expired transactions are dropped..."); - let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(None))); let stats = SendTransactionServiceStats::default(); transactions.insert( Signature::default(), @@ -967,15 +960,17 @@ mod test { Some(Instant::now()), ), ); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); + + let client = create_client(config.tpu_peers, config.leader_forward_count); let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -1002,11 +997,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -1033,11 +1029,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -1064,11 +1061,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert_eq!(transactions.len(), 1); @@ -1097,11 +1095,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert_eq!(transactions.len(), 1); @@ -1140,11 +1139,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert_eq!(transactions.len(), 1); @@ -1159,11 +1159,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -1183,7 +1184,6 @@ mod test { let (mut genesis_config, mint_keypair) = create_genesis_config(4); genesis_config.fee_rate_governor = solana_sdk::fee_calculator::FeeRateGovernor::new(0, 0); let (_, bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config); - let tpu_address = "127.0.0.1:0".parse().unwrap(); let config = Config { leader_forward_count: 1, ..Config::default() @@ -1251,17 +1251,17 @@ mod test { Some(Instant::now()), ), ); - let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(None))); let stats = SendTransactionServiceStats::default(); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); + let client = create_client(config.tpu_peers, config.leader_forward_count); let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -1287,11 +1287,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -1319,11 +1320,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -1349,11 +1351,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -1380,11 +1383,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -1411,11 +1415,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert_eq!(transactions.len(), 1); @@ -1444,11 +1449,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert_eq!(transactions.len(), 1); @@ -1474,11 +1480,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert_eq!(transactions.len(), 0); diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index ae54e8a5a0e58d..75468f7afb58ba 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -6999,6 +6999,7 @@ name = "solana-send-transaction-service" version = "2.2.0" dependencies = [ "crossbeam-channel", + "itertools 0.12.1", "log", "solana-client", "solana-connection-cache",