diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index a1605ff9888c47..7d194d044aa85c 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -474,6 +474,7 @@ fn main() { Arc::new(connection_cache), bank_forks.clone(), &Arc::new(PrioritizationFeeCache::new(0u64)), + false, ); // This is so that the signal_receiver does not go out of scope after the closure. diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 45c6e02e20f424..d0efbfafddfc0b 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -303,6 +303,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { Arc::new(ConnectionCache::new("connection_cache_test")), bank_forks, &Arc::new(PrioritizationFeeCache::new(0u64)), + false, ); let chunk_len = verified.len() / CHUNKS; diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 0cf3cace2f82c0..1bdd7defeec8e4 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -339,6 +339,7 @@ impl BankingStage { connection_cache: Arc, bank_forks: Arc>, prioritization_fee_cache: &Arc, + enable_forwarding: bool, ) -> Self { Self::new_num_threads( block_production_method, @@ -354,6 +355,7 @@ impl BankingStage { connection_cache, bank_forks, prioritization_fee_cache, + enable_forwarding, ) } @@ -372,6 +374,7 @@ impl BankingStage { connection_cache: Arc, bank_forks: Arc>, prioritization_fee_cache: &Arc, + enable_forwarding: bool, ) -> Self { match block_production_method { BlockProductionMethod::ThreadLocalMultiIterator => { @@ -403,6 +406,7 @@ impl BankingStage { connection_cache, bank_forks, prioritization_fee_cache, + enable_forwarding, ), } } @@ -505,6 +509,7 @@ impl BankingStage { connection_cache: Arc, bank_forks: Arc>, prioritization_fee_cache: &Arc, + enable_forwarding: bool, ) -> Self { assert!(num_threads >= MIN_TOTAL_THREADS); // Single thread to generate entries from many banks. @@ -586,13 +591,15 @@ impl BankingStage { ) } - let forwarder = Forwarder::new( - poh_recorder.clone(), - bank_forks.clone(), - cluster_info.clone(), - connection_cache.clone(), - data_budget.clone(), - ); + let forwarder = enable_forwarding.then(|| { + Forwarder::new( + poh_recorder.clone(), + bank_forks.clone(), + cluster_info.clone(), + connection_cache.clone(), + data_budget.clone(), + ) + }); // Spawn the central scheduler thread bank_thread_hdls.push({ @@ -883,6 +890,7 @@ mod tests { Arc::new(ConnectionCache::new("connection_cache_test")), bank_forks, &Arc::new(PrioritizationFeeCache::new(0u64)), + false, ); drop(non_vote_sender); drop(tpu_vote_sender); @@ -938,6 +946,7 @@ mod tests { Arc::new(ConnectionCache::new("connection_cache_test")), bank_forks, &Arc::new(PrioritizationFeeCache::new(0u64)), + false, ); trace!("sending bank"); drop(non_vote_sender); @@ -1017,6 +1026,7 @@ mod tests { Arc::new(ConnectionCache::new("connection_cache_test")), bank_forks.clone(), // keep a local-copy of bank-forks so worker threads do not lose weak access to bank-forks &Arc::new(PrioritizationFeeCache::new(0u64)), + false, ); // fund another account so we can send 2 good transactions in a single batch. @@ -1378,6 +1388,7 @@ mod tests { Arc::new(ConnectionCache::new("connection_cache_test")), bank_forks, &Arc::new(PrioritizationFeeCache::new(0u64)), + false, ); let keypairs = (0..100).map(|_| Keypair::new()).collect_vec(); diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index 20462a2a1b42b2..1092e2e012925e 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -64,8 +64,8 @@ pub(crate) struct SchedulerController { timing_metrics: SchedulerTimingMetrics, /// Metric report handles for the worker threads. worker_metrics: Vec>, - /// State for forwarding packets to the leader. - forwarder: Forwarder, + /// State for forwarding packets to the leader, if enabled. + forwarder: Option, } impl SchedulerController { @@ -75,7 +75,7 @@ impl SchedulerController { bank_forks: Arc>, scheduler: PrioGraphScheduler, worker_metrics: Vec>, - forwarder: Forwarder, + forwarder: Option, ) -> Self { Self { decision_maker, @@ -146,6 +146,7 @@ impl SchedulerController { &mut self, decision: &BufferedPacketsDecision, ) -> Result<(), SchedulerError> { + let forwarding_enabled = self.forwarder.is_some(); match decision { BufferedPacketsDecision::Consume(bank_start) => { let (scheduling_summary, schedule_time_us) = measure_us!(self.scheduler.schedule( @@ -185,16 +186,30 @@ impl SchedulerController { }); } BufferedPacketsDecision::Forward => { - let (_, forward_time_us) = measure_us!(self.forward_packets(false)); - self.timing_metrics.update(|timing_metrics| { - saturating_add_assign!(timing_metrics.forward_time_us, forward_time_us); - }); + if forwarding_enabled { + let (_, forward_time_us) = measure_us!(self.forward_packets(false)); + self.timing_metrics.update(|timing_metrics| { + saturating_add_assign!(timing_metrics.forward_time_us, forward_time_us); + }); + } else { + let (_, clear_time_us) = measure_us!(self.clear_container()); + self.timing_metrics.update(|timing_metrics| { + saturating_add_assign!(timing_metrics.clear_time_us, clear_time_us); + }); + } } BufferedPacketsDecision::ForwardAndHold => { - let (_, forward_time_us) = measure_us!(self.forward_packets(true)); - self.timing_metrics.update(|timing_metrics| { - saturating_add_assign!(timing_metrics.forward_time_us, forward_time_us); - }); + if forwarding_enabled { + let (_, forward_time_us) = measure_us!(self.forward_packets(true)); + self.timing_metrics.update(|timing_metrics| { + saturating_add_assign!(timing_metrics.forward_time_us, forward_time_us); + }); + } else { + let (_, clean_time_us) = measure_us!(self.clean_queue()); + self.timing_metrics.update(|timing_metrics| { + saturating_add_assign!(timing_metrics.clean_time_us, clean_time_us); + }); + } } BufferedPacketsDecision::Hold => {} } @@ -233,6 +248,7 @@ impl SchedulerController { let start = Instant::now(); let bank = self.bank_forks.read().unwrap().working_bank(); let feature_set = &bank.feature_set; + let forwarder = self.forwarder.as_mut().expect("forwarder must exist"); // Pop from the container in chunks, filter using bank checks, then attempt to forward. // This doubles as a way to clean the queue as well as forwarding transactions. @@ -281,7 +297,7 @@ impl SchedulerController { // If not already forwarded and can be forwarded, add to forwardable packets. if state.should_forward() - && self.forwarder.try_add_packet( + && forwarder.try_add_packet( sanitized_transaction, immutable_packet, feature_set, @@ -299,9 +315,8 @@ impl SchedulerController { } // Forward each batch of transactions - self.forwarder - .forward_batched_packets(&ForwardOption::ForwardTransaction); - self.forwarder.clear_batches(); + forwarder.forward_batched_packets(&ForwardOption::ForwardTransaction); + forwarder.clear_batches(); // If we hit the time limit. Drop everything that was not checked/processed. // If we cannot run these simple checks in time, then we cannot run them during @@ -329,7 +344,6 @@ impl SchedulerController { /// Clears the transaction state container. /// This only clears pending transactions, and does **not** clear in-flight transactions. - #[allow(dead_code)] fn clear_container(&mut self) { let mut num_dropped_on_clear: usize = 0; while let Some(id) = self.container.pop() { @@ -345,7 +359,6 @@ impl SchedulerController { /// Clean unprocessable transactions from the queue. These will be transactions that are /// expired, already processed, or are no longer sanitizable. /// This only clears pending transactions, and does **not** clear in-flight transactions. - #[allow(dead_code)] fn clean_queue(&mut self) { // Clean up any transactions that have already been processed, are too old, or do not have // valid nonce accounts. @@ -423,17 +436,19 @@ impl SchedulerController { let remaining_queue_capacity = self.container.remaining_queue_capacity(); const MAX_PACKET_RECEIVE_TIME: Duration = Duration::from_millis(100); - let recv_timeout = match decision { - BufferedPacketsDecision::Consume(_) => { + let (recv_timeout, should_buffer) = match decision { + BufferedPacketsDecision::Consume(_) => ( if self.container.is_empty() { MAX_PACKET_RECEIVE_TIME } else { Duration::ZERO - } + }, + true, + ), + BufferedPacketsDecision::Forward => (MAX_PACKET_RECEIVE_TIME, self.forwarder.is_some()), + BufferedPacketsDecision::ForwardAndHold | BufferedPacketsDecision::Hold => { + (MAX_PACKET_RECEIVE_TIME, true) } - BufferedPacketsDecision::Forward - | BufferedPacketsDecision::ForwardAndHold - | BufferedPacketsDecision::Hold => MAX_PACKET_RECEIVE_TIME, }; let (received_packet_results, receive_time_us) = measure_us!(self @@ -455,11 +470,21 @@ impl SchedulerController { saturating_add_assign!(count_metrics.num_received, num_received_packets); }); - let (_, buffer_time_us) = - measure_us!(self.buffer_packets(receive_packet_results.deserialized_packets)); - self.timing_metrics.update(|timing_metrics| { - saturating_add_assign!(timing_metrics.buffer_time_us, buffer_time_us); - }); + if should_buffer { + let (_, buffer_time_us) = measure_us!( + self.buffer_packets(receive_packet_results.deserialized_packets) + ); + self.timing_metrics.update(|timing_metrics| { + saturating_add_assign!(timing_metrics.buffer_time_us, buffer_time_us); + }); + } else { + self.count_metrics.update(|count_metrics| { + saturating_add_assign!( + count_metrics.num_dropped_on_receive, + num_received_packets + ); + }); + } } Err(RecvTimeoutError::Timeout) => {} Err(RecvTimeoutError::Disconnected) => return false, @@ -635,14 +660,13 @@ mod tests { banking_stage::{ consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH, scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId}, - tests::{create_slow_genesis_config, new_test_cluster_info}, + tests::create_slow_genesis_config, }, banking_trace::BankingPacketBatch, sigverify::SigverifyTracerPacketStats, }, crossbeam_channel::{unbounded, Receiver, Sender}, itertools::Itertools, - solana_client::connection_cache::ConnectionCache, solana_ledger::{ blockstore::Blockstore, genesis_utils::GenesisConfigInfo, get_tmp_ledger_path_auto_delete, leader_schedule_cache::LeaderScheduleCache, @@ -711,17 +735,6 @@ mod tests { let (consume_work_senders, consume_work_receivers) = create_channels(num_threads); let (finished_consume_work_sender, finished_consume_work_receiver) = unbounded(); - let validator_keypair = Arc::new(Keypair::new()); - let (_local_node, cluster_info) = new_test_cluster_info(Some(validator_keypair)); - let cluster_info = Arc::new(cluster_info); - let forwarder = Forwarder::new( - poh_recorder.clone(), - bank_forks.clone(), - cluster_info, - Arc::new(ConnectionCache::new("connection_cache_test")), - Arc::default(), - ); - let test_frame = TestFrame { bank, mint_keypair, @@ -740,7 +753,7 @@ mod tests { bank_forks, PrioGraphScheduler::new(consume_work_senders, finished_consume_work_receiver), vec![], // no actual workers with metrics to report, this can be empty - forwarder, + None, ); (test_frame, scheduler_controller) diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_metrics.rs b/core/src/banking_stage/transaction_scheduler/scheduler_metrics.rs index 7f05210b48e11c..bb8cbbe617396a 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_metrics.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_metrics.rs @@ -271,6 +271,10 @@ pub struct SchedulerTimingMetricsInner { pub schedule_filter_time_us: u64, /// Time spent scheduling transactions. pub schedule_time_us: u64, + /// Time spent clearing transactions from the container. + pub clear_time_us: u64, + /// Time spent cleaning expired or processed transactions from the container. + pub clean_time_us: u64, /// Time spent forwarding transactions. pub forward_time_us: u64, /// Time spent receiving completed transactions. @@ -312,6 +316,8 @@ impl SchedulerTimingMetricsInner { ("buffer_time_us", self.buffer_time_us, i64), ("schedule_filter_time_us", self.schedule_filter_time_us, i64), ("schedule_time_us", self.schedule_time_us, i64), + ("clear_time_us", self.clear_time_us, i64), + ("clean_time_us", self.clean_time_us, i64), ("forward_time_us", self.forward_time_us, i64), ( "receive_completed_time_us", @@ -331,6 +337,8 @@ impl SchedulerTimingMetricsInner { self.buffer_time_us = 0; self.schedule_filter_time_us = 0; self.schedule_time_us = 0; + self.clear_time_us = 0; + self.clean_time_us = 0; self.forward_time_us = 0; self.receive_completed_time_us = 0; } diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 96350cfbb26db9..3719404582de2f 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -114,6 +114,7 @@ impl Tpu { tpu_max_connections_per_ipaddr_per_minute: u64, prioritization_fee_cache: &Arc, block_production_method: BlockProductionMethod, + enable_block_production_forwarding: bool, _generator_config: Option, /* vestigial code for replay invalidator */ ) -> (Self, Vec>) { let TpuSockets { @@ -244,6 +245,7 @@ impl Tpu { connection_cache.clone(), bank_forks.clone(), prioritization_fee_cache, + enable_block_production_forwarding, ); let (entry_receiver, tpu_entry_notifier) = diff --git a/core/src/validator.rs b/core/src/validator.rs index 743612a35f35fd..d89520c0c83163 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -266,6 +266,7 @@ pub struct ValidatorConfig { pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit, pub block_verification_method: BlockVerificationMethod, pub block_production_method: BlockProductionMethod, + pub enable_block_production_forwarding: bool, pub generator_config: Option, pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup, pub wen_restart_proto_path: Option, @@ -337,6 +338,7 @@ impl Default for ValidatorConfig { banking_trace_dir_byte_limit: 0, block_verification_method: BlockVerificationMethod::default(), block_production_method: BlockProductionMethod::default(), + enable_block_production_forwarding: false, generator_config: None, use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(), wen_restart_proto_path: None, @@ -355,6 +357,7 @@ impl ValidatorConfig { enforce_ulimit_nofile: false, rpc_config: JsonRpcConfig::default_for_test(), block_production_method: BlockProductionMethod::default(), + enable_block_production_forwarding: true, // enable forwarding by default for tests replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"), replay_transactions_threads: NonZeroUsize::new(get_max_thread_count()) .expect("thread count is non-zero"), @@ -1432,6 +1435,7 @@ impl Validator { tpu_max_connections_per_ipaddr_per_minute, &prioritization_fee_cache, config.block_production_method.clone(), + config.enable_block_production_forwarding, config.generator_config.clone(), ); diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index c01106bda4d4e5..a2366eb41489c8 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -64,6 +64,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { banking_trace_dir_byte_limit: config.banking_trace_dir_byte_limit, block_verification_method: config.block_verification_method.clone(), block_production_method: config.block_production_method.clone(), + enable_block_production_forwarding: config.enable_block_production_forwarding, generator_config: config.generator_config.clone(), use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup, wen_restart_proto_path: config.wen_restart_proto_path.clone(), diff --git a/validator/src/cli.rs b/validator/src/cli.rs index d2b70583d20592..e769d11c3c3394 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -1533,6 +1533,16 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .possible_values(BlockProductionMethod::cli_names()) .help(BlockProductionMethod::cli_message()), ) + .arg( + Arg::with_name("disable_block_production_forwarding") + .long("disable-block-production-forwarding") + .requires("staked_nodes_overrides") + .takes_value(false) + .help("Disable forwarding of non-vote transactions in block production. \ + By default, forwarding is already disabled, it is enabled by setting \ + \"staked-nodes-overrides\". This flag can be used to disable forwarding \ + even when \"staked-nodes-overrides\" is set."), + ) .arg( Arg::with_name("unified_scheduler_handler_threads") .long("unified-scheduler-handler-threads") diff --git a/validator/src/main.rs b/validator/src/main.rs index 8fdf14eabfc00b..99abf814aed687 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -957,10 +957,10 @@ pub fn main() { .value_of("staked_nodes_overrides") .map(str::to_string); let staked_nodes_overrides = Arc::new(RwLock::new( - match staked_nodes_overrides_path { + match &staked_nodes_overrides_path { None => StakedNodesOverrides::default(), - Some(p) => load_staked_nodes_overrides(&p).unwrap_or_else(|err| { - error!("Failed to load stake-nodes-overrides from {}: {}", &p, err); + Some(p) => load_staked_nodes_overrides(p).unwrap_or_else(|err| { + error!("Failed to load stake-nodes-overrides from {}: {}", p, err); clap::Error::with_description( "Failed to load configuration of stake-nodes-overrides argument", clap::ErrorKind::InvalidValue, @@ -1769,6 +1769,10 @@ pub fn main() { BlockProductionMethod ) .unwrap_or_default(); + validator_config.enable_block_production_forwarding = staked_nodes_overrides_path + .as_ref() + .map(|_| !matches.is_present("disable_block_production_forwarding")) + .unwrap_or_default(); validator_config.unified_scheduler_handler_threads = value_t!(matches, "unified_scheduler_handler_threads", usize).ok();