From 7957cb75e4f3db0bf8cd3ae34589c3873a6f0252 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Tue, 10 Dec 2024 14:29:25 +0000 Subject: [PATCH 1/2] Define BankingTracer::create_channels() --- banking-bench/src/main.rs | 15 ++++-- core/benches/banking_stage.rs | 13 +++-- core/benches/banking_trace.rs | 26 +++++++-- core/src/banking_simulation.rs | 16 ++++-- core/src/banking_stage.rs | 62 ++++++++++++++-------- core/src/banking_trace.rs | 88 +++++++++++++++++++++++++++++-- core/src/tpu.rs | 15 +++--- unified-scheduler-pool/src/lib.rs | 5 ++ 8 files changed, 192 insertions(+), 48 deletions(-) diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 57f768e2773cd5..35b32b399123a3 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -8,7 +8,9 @@ use { solana_client::connection_cache::ConnectionCache, solana_core::{ banking_stage::BankingStage, - banking_trace::{BankingPacketBatch, BankingTracer, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT}, + banking_trace::{ + BankingPacketBatch, BankingTracer, Channels, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, + }, validator::BlockProductionMethod, }, solana_gossip::cluster_info::{ClusterInfo, Node}, @@ -440,9 +442,14 @@ fn main() { BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, ))) .unwrap(); - let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); - let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); - let (gossip_vote_sender, gossip_vote_receiver) = banking_tracer.create_channel_gossip_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + tpu_vote_sender, + tpu_vote_receiver, + gossip_vote_sender, + gossip_vote_receiver, + } = banking_tracer.create_channels(None /*scheduler_pool.as_ref()*/); let cluster_info = { let keypair = Arc::new(Keypair::new()); let node = Node::new_localhost_with_pubkey(&keypair.pubkey()); diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 0f449719ce34cb..1f261c9443c13f 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -2,7 +2,7 @@ #![feature(test)] use { - solana_core::validator::BlockProductionMethod, + solana_core::{banking_trace::Channels, validator::BlockProductionMethod}, solana_vote_program::{vote_state::TowerSync, vote_transaction::new_tower_sync_transaction}, }; @@ -211,9 +211,14 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { genesis_config.ticks_per_slot = 10_000; let banking_tracer = BankingTracer::new_disabled(); - let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); - let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); - let (gossip_vote_sender, gossip_vote_receiver) = banking_tracer.create_channel_gossip_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + tpu_vote_sender, + tpu_vote_receiver, + gossip_vote_sender, + gossip_vote_receiver, + } = banking_tracer.create_channels(None); let mut bank = Bank::new_for_benches(&genesis_config); // Allow arbitrary transaction processing time for the purposes of this bench diff --git a/core/benches/banking_trace.rs b/core/benches/banking_trace.rs index fb93deebc17ae2..34ab2aaf78f640 100644 --- a/core/benches/banking_trace.rs +++ b/core/benches/banking_trace.rs @@ -7,7 +7,7 @@ use { for_test::{ drop_and_clean_temp_dir_unless_suppressed, sample_packet_batch, terminate_tracer, }, - receiving_loop_with_minimized_sender_overhead, BankingPacketBatch, BankingTracer, + receiving_loop_with_minimized_sender_overhead, BankingPacketBatch, BankingTracer, Channels, TraceError, TracerThreadResult, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, }, std::{ @@ -35,7 +35,11 @@ fn black_box_packet_batch(packet_batch: BankingPacketBatch) -> TracerThreadResul fn bench_banking_tracer_main_thread_overhead_noop_baseline(bencher: &mut Bencher) { let exit = Arc::::default(); let tracer = BankingTracer::new_disabled(); - let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + .. + } = tracer.create_channels(None); let exit_for_dummy_thread = exit.clone(); let dummy_main_thread = thread::spawn(move || { @@ -64,7 +68,11 @@ fn bench_banking_tracer_main_thread_overhead_under_peak_write(bencher: &mut Benc BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, ))) .unwrap(); - let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + .. + } = tracer.create_channels(None); let exit_for_dummy_thread = exit.clone(); let dummy_main_thread = thread::spawn(move || { @@ -101,7 +109,11 @@ fn bench_banking_tracer_main_thread_overhead_under_sustained_write(bencher: &mut 1024 * 1024, // cause more frequent trace file rotation ))) .unwrap(); - let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + .. + } = tracer.create_channels(None); let exit_for_dummy_thread = exit.clone(); let dummy_main_thread = thread::spawn(move || { @@ -142,7 +154,11 @@ fn bench_banking_tracer_background_thread_throughput(bencher: &mut Bencher) { let (tracer, tracer_thread) = BankingTracer::new(Some((&path, exit.clone(), 50 * 1024 * 1024))).unwrap(); - let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + .. + } = tracer.create_channels(None); let dummy_main_thread = thread::spawn(move || { receiving_loop_with_minimized_sender_overhead::<_, TraceError, 0>( diff --git a/core/src/banking_simulation.rs b/core/src/banking_simulation.rs index 6e5113ded67336..957169c7509193 100644 --- a/core/src/banking_simulation.rs +++ b/core/src/banking_simulation.rs @@ -3,8 +3,9 @@ use { crate::{ banking_stage::{BankingStage, LikeClusterInfo}, banking_trace::{ - BankingPacketBatch, BankingTracer, ChannelLabel, TimedTracedEvent, TracedEvent, - TracedSender, TracerThread, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, BASENAME, + BankingPacketBatch, BankingTracer, ChannelLabel, Channels, TimedTracedEvent, + TracedEvent, TracedSender, TracerThread, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, + BASENAME, }, validator::BlockProductionMethod, }, @@ -758,9 +759,14 @@ impl BankingSimulator { BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, ); - let (non_vote_sender, non_vote_receiver) = retracer.create_channel_non_vote(); - let (tpu_vote_sender, tpu_vote_receiver) = retracer.create_channel_tpu_vote(); - let (gossip_vote_sender, gossip_vote_receiver) = retracer.create_channel_gossip_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + tpu_vote_sender, + tpu_vote_receiver, + gossip_vote_sender, + gossip_vote_receiver, + } = retracer.create_channels(None /*unified_scheduler_pool.as_ref()*/); let connection_cache = Arc::new(ConnectionCache::new("connection_cache_sim")); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 49ccdb6ae15eff..db630625739cff 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -813,7 +813,7 @@ impl BankingStage { mod tests { use { super::*, - crate::banking_trace::{BankingPacketBatch, BankingTracer}, + crate::banking_trace::{BankingPacketBatch, BankingTracer, Channels}, crossbeam_channel::{unbounded, Receiver}, itertools::Itertools, solana_entry::entry::{self, Entry, EntrySlice}, @@ -874,10 +874,14 @@ mod tests { let genesis_config = create_genesis_config(2).genesis_config; let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); let banking_tracer = BankingTracer::new_disabled(); - let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); - let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); - let (gossip_vote_sender, gossip_vote_receiver) = - banking_tracer.create_channel_gossip_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + tpu_vote_sender, + tpu_vote_receiver, + gossip_vote_sender, + gossip_vote_receiver, + } = banking_tracer.create_channels(None); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( @@ -926,10 +930,14 @@ mod tests { let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); let start_hash = bank.last_blockhash(); let banking_tracer = BankingTracer::new_disabled(); - let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); - let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); - let (gossip_vote_sender, gossip_vote_receiver) = - banking_tracer.create_channel_gossip_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + tpu_vote_sender, + tpu_vote_receiver, + gossip_vote_sender, + gossip_vote_receiver, + } = banking_tracer.create_channels(None); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( @@ -1004,10 +1012,14 @@ mod tests { let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); let start_hash = bank.last_blockhash(); let banking_tracer = BankingTracer::new_disabled(); - let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); - let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); - let (gossip_vote_sender, gossip_vote_receiver) = - banking_tracer.create_channel_gossip_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + tpu_vote_sender, + tpu_vote_receiver, + gossip_vote_sender, + gossip_vote_receiver, + } = banking_tracer.create_channels(None); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( @@ -1138,7 +1150,14 @@ mod tests { .. } = create_slow_genesis_config(2); let banking_tracer = BankingTracer::new_disabled(); - let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + tpu_vote_sender, + tpu_vote_receiver, + gossip_vote_sender, + gossip_vote_receiver, + } = banking_tracer.create_channels(None); // Process a batch that includes a transaction that receives two lamports. let alice = Keypair::new(); @@ -1168,9 +1187,6 @@ mod tests { .send(BankingPacketBatch::new((packet_batches, None))) .unwrap(); - let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); - let (gossip_vote_sender, gossip_vote_receiver) = - banking_tracer.create_channel_gossip_vote(); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let (replay_vote_sender, _replay_vote_receiver) = unbounded(); @@ -1361,10 +1377,14 @@ mod tests { let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); let start_hash = bank.last_blockhash(); let banking_tracer = BankingTracer::new_disabled(); - let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); - let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); - let (gossip_vote_sender, gossip_vote_receiver) = - banking_tracer.create_channel_gossip_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + tpu_vote_sender, + tpu_vote_receiver, + gossip_vote_sender, + gossip_vote_receiver, + } = banking_tracer.create_channels(None); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( diff --git a/core/src/banking_trace.rs b/core/src/banking_trace.rs index 6e0797c8c3842f..41df5f0df8bf99 100644 --- a/core/src/banking_trace.rs +++ b/core/src/banking_trace.rs @@ -6,6 +6,7 @@ use { rolling_file::{RollingCondition, RollingConditionBasic, RollingFileAppender}, solana_perf::packet::PacketBatch, solana_sdk::{hash::Hash, slot_history::Slot}, + solana_unified_scheduler_pool::DefaultSchedulerPool, std::{ fs::{create_dir_all, remove_dir_all}, io::{self, Write}, @@ -178,6 +179,15 @@ pub fn receiving_loop_with_minimized_sender_overhead( Ok(()) } +pub struct Channels { + pub non_vote_sender: BankingPacketSender, + pub non_vote_receiver: BankingPacketReceiver, + pub tpu_vote_sender: BankingPacketSender, + pub tpu_vote_receiver: BankingPacketReceiver, + pub gossip_vote_sender: BankingPacketSender, + pub gossip_vote_receiver: BankingPacketReceiver, +} + impl BankingTracer { pub fn new( maybe_config: Option<(&PathBuf, Arc, DirByteLimit)>, @@ -220,22 +230,85 @@ impl BankingTracer { self.active_tracer.is_some() } + pub fn create_channels(&self, pool: Option<&Arc>) -> Channels { + if let Some(true) = pool.map(|pool| pool.block_production_supported()) { + // Returning the same channel is needed when unified scheduler supports block + // production because unified scheduler doesn't distinguish them and treats them as + // unified as the single source of incoming transactions. This is to reduce the number + // of recv operation per loop and load balance evenly as much as possible there. + let (non_vote_sender, non_vote_receiver) = self.create_channel_non_vote(); + // Tap into some private helper fns so that banking trace labelling works as before. + let (tpu_vote_sender, tpu_vote_receiver) = + self.create_unified_channel_tpu_vote(&non_vote_sender, &non_vote_receiver); + let (gossip_vote_sender, gossip_vote_receiver) = + self.create_unified_channel_gossip_vote(&non_vote_sender, &non_vote_receiver); + + Channels { + non_vote_sender, + non_vote_receiver, + tpu_vote_sender, + tpu_vote_receiver, + gossip_vote_sender, + gossip_vote_receiver, + } + } else { + let (non_vote_sender, non_vote_receiver) = self.create_channel_non_vote(); + let (tpu_vote_sender, tpu_vote_receiver) = self.create_channel_tpu_vote(); + let (gossip_vote_sender, gossip_vote_receiver) = self.create_channel_gossip_vote(); + + Channels { + non_vote_sender, + non_vote_receiver, + tpu_vote_sender, + tpu_vote_receiver, + gossip_vote_sender, + gossip_vote_receiver, + } + } + } + fn create_channel(&self, label: ChannelLabel) -> (BankingPacketSender, BankingPacketReceiver) { Self::channel(label, self.active_tracer.as_ref().cloned()) } - pub fn create_channel_non_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) { + fn create_channel_non_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) { self.create_channel(ChannelLabel::NonVote) } - pub fn create_channel_tpu_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) { + fn create_channel_tpu_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) { self.create_channel(ChannelLabel::TpuVote) } - pub fn create_channel_gossip_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) { + fn create_channel_gossip_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) { self.create_channel(ChannelLabel::GossipVote) } + fn create_unified_channel_tpu_vote( + &self, + sender: &TracedSender, + receiver: &BankingPacketReceiver, + ) -> (BankingPacketSender, BankingPacketReceiver) { + Self::channel_inner( + ChannelLabel::TpuVote, + self.active_tracer.as_ref().cloned(), + sender.sender.clone(), + receiver.clone(), + ) + } + + fn create_unified_channel_gossip_vote( + &self, + sender: &TracedSender, + receiver: &BankingPacketReceiver, + ) -> (BankingPacketSender, BankingPacketReceiver) { + Self::channel_inner( + ChannelLabel::GossipVote, + self.active_tracer.as_ref().cloned(), + sender.sender.clone(), + receiver.clone(), + ) + } + pub fn hash_event(&self, slot: Slot, blockhash: &Hash, bank_hash: &Hash) { self.trace_event(|| { TimedTracedEvent( @@ -264,6 +337,15 @@ impl BankingTracer { active_tracer: Option, ) -> (TracedSender, Receiver) { let (sender, receiver) = unbounded(); + Self::channel_inner(label, active_tracer, sender, receiver) + } + + fn channel_inner( + label: ChannelLabel, + active_tracer: Option, + sender: Sender, + receiver: BankingPacketReceiver, + ) -> (TracedSender, Receiver) { (TracedSender::new(label, sender, active_tracer), receiver) } diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 091a5901c2311e..b2340f00e63794 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -5,7 +5,7 @@ pub use solana_sdk::net::DEFAULT_TPU_COALESCE; use { crate::{ banking_stage::BankingStage, - banking_trace::{BankingTracer, TracerThread}, + banking_trace::{BankingTracer, Channels, TracerThread}, cluster_info_vote_listener::{ ClusterInfoVoteListener, DuplicateConfirmedSlotsSender, GossipVerifiedVoteHashSender, VerifiedVoteSender, VoteTracker, @@ -156,7 +156,14 @@ impl Tpu { shared_staked_nodes_overrides, ); - let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + tpu_vote_sender, + tpu_vote_receiver, + gossip_vote_sender, + gossip_vote_receiver, + } = banking_tracer.create_channels(None /*unified_scheduler_pool.as_ref()*/); // Streamer for Votes: let SpawnServerResult { @@ -235,8 +242,6 @@ impl Tpu { SigVerifyStage::new(packet_receiver, verifier, "solSigVerTpu", "tpu-verifier") }; - let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); - let vote_sigverify_stage = { let verifier = TransactionSigVerifier::new_reject_non_vote(tpu_vote_sender); SigVerifyStage::new( @@ -247,8 +252,6 @@ impl Tpu { ) }; - let (gossip_vote_sender, gossip_vote_receiver) = - banking_tracer.create_channel_gossip_vote(); let cluster_info_vote_listener = ClusterInfoVoteListener::new( exit.clone(), cluster_info.clone(), diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 4a27de9496bb8e..52f32e2c4d2a6d 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -354,6 +354,11 @@ where self.scheduler_inners.lock().expect("not poisoned").len() } + pub fn block_production_supported(&self) -> bool { + // to be implemented later, for now always pretend as if block production isn't supported + false + } + pub fn default_handler_count() -> usize { Self::calculate_default_handler_count( thread::available_parallelism() From 1d014047268da11799cbdec23734dcf5f5d78fd2 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 12 Dec 2024 15:34:22 +0000 Subject: [PATCH 2/2] Make create_channels() take a bool --- banking-bench/src/main.rs | 2 +- core/benches/banking_stage.rs | 2 +- core/benches/banking_trace.rs | 8 ++++---- core/src/banking_simulation.rs | 2 +- core/src/banking_stage.rs | 10 +++++----- core/src/banking_trace.rs | 5 ++--- core/src/tpu.rs | 2 +- unified-scheduler-pool/src/lib.rs | 5 ----- 8 files changed, 15 insertions(+), 21 deletions(-) diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 35b32b399123a3..e577b2d77f9743 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -449,7 +449,7 @@ fn main() { tpu_vote_receiver, gossip_vote_sender, gossip_vote_receiver, - } = banking_tracer.create_channels(None /*scheduler_pool.as_ref()*/); + } = banking_tracer.create_channels(false); let cluster_info = { let keypair = Arc::new(Keypair::new()); let node = Node::new_localhost_with_pubkey(&keypair.pubkey()); diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 1f261c9443c13f..18adb713b5b360 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -218,7 +218,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { tpu_vote_receiver, gossip_vote_sender, gossip_vote_receiver, - } = banking_tracer.create_channels(None); + } = banking_tracer.create_channels(false); let mut bank = Bank::new_for_benches(&genesis_config); // Allow arbitrary transaction processing time for the purposes of this bench diff --git a/core/benches/banking_trace.rs b/core/benches/banking_trace.rs index 34ab2aaf78f640..66bfc84c630436 100644 --- a/core/benches/banking_trace.rs +++ b/core/benches/banking_trace.rs @@ -39,7 +39,7 @@ fn bench_banking_tracer_main_thread_overhead_noop_baseline(bencher: &mut Bencher non_vote_sender, non_vote_receiver, .. - } = tracer.create_channels(None); + } = tracer.create_channels(false); let exit_for_dummy_thread = exit.clone(); let dummy_main_thread = thread::spawn(move || { @@ -72,7 +72,7 @@ fn bench_banking_tracer_main_thread_overhead_under_peak_write(bencher: &mut Benc non_vote_sender, non_vote_receiver, .. - } = tracer.create_channels(None); + } = tracer.create_channels(false); let exit_for_dummy_thread = exit.clone(); let dummy_main_thread = thread::spawn(move || { @@ -113,7 +113,7 @@ fn bench_banking_tracer_main_thread_overhead_under_sustained_write(bencher: &mut non_vote_sender, non_vote_receiver, .. - } = tracer.create_channels(None); + } = tracer.create_channels(false); let exit_for_dummy_thread = exit.clone(); let dummy_main_thread = thread::spawn(move || { @@ -158,7 +158,7 @@ fn bench_banking_tracer_background_thread_throughput(bencher: &mut Bencher) { non_vote_sender, non_vote_receiver, .. - } = tracer.create_channels(None); + } = tracer.create_channels(false); let dummy_main_thread = thread::spawn(move || { receiving_loop_with_minimized_sender_overhead::<_, TraceError, 0>( diff --git a/core/src/banking_simulation.rs b/core/src/banking_simulation.rs index 957169c7509193..fca5346726947b 100644 --- a/core/src/banking_simulation.rs +++ b/core/src/banking_simulation.rs @@ -766,7 +766,7 @@ impl BankingSimulator { tpu_vote_receiver, gossip_vote_sender, gossip_vote_receiver, - } = retracer.create_channels(None /*unified_scheduler_pool.as_ref()*/); + } = retracer.create_channels(false); let connection_cache = Arc::new(ConnectionCache::new("connection_cache_sim")); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index db630625739cff..1fd1652887f41d 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -881,7 +881,7 @@ mod tests { tpu_vote_receiver, gossip_vote_sender, gossip_vote_receiver, - } = banking_tracer.create_channels(None); + } = banking_tracer.create_channels(false); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( @@ -937,7 +937,7 @@ mod tests { tpu_vote_receiver, gossip_vote_sender, gossip_vote_receiver, - } = banking_tracer.create_channels(None); + } = banking_tracer.create_channels(false); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( @@ -1019,7 +1019,7 @@ mod tests { tpu_vote_receiver, gossip_vote_sender, gossip_vote_receiver, - } = banking_tracer.create_channels(None); + } = banking_tracer.create_channels(false); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( @@ -1157,7 +1157,7 @@ mod tests { tpu_vote_receiver, gossip_vote_sender, gossip_vote_receiver, - } = banking_tracer.create_channels(None); + } = banking_tracer.create_channels(false); // Process a batch that includes a transaction that receives two lamports. let alice = Keypair::new(); @@ -1384,7 +1384,7 @@ mod tests { tpu_vote_receiver, gossip_vote_sender, gossip_vote_receiver, - } = banking_tracer.create_channels(None); + } = banking_tracer.create_channels(false); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( diff --git a/core/src/banking_trace.rs b/core/src/banking_trace.rs index 41df5f0df8bf99..ba5b1e79e49fd7 100644 --- a/core/src/banking_trace.rs +++ b/core/src/banking_trace.rs @@ -6,7 +6,6 @@ use { rolling_file::{RollingCondition, RollingConditionBasic, RollingFileAppender}, solana_perf::packet::PacketBatch, solana_sdk::{hash::Hash, slot_history::Slot}, - solana_unified_scheduler_pool::DefaultSchedulerPool, std::{ fs::{create_dir_all, remove_dir_all}, io::{self, Write}, @@ -230,8 +229,8 @@ impl BankingTracer { self.active_tracer.is_some() } - pub fn create_channels(&self, pool: Option<&Arc>) -> Channels { - if let Some(true) = pool.map(|pool| pool.block_production_supported()) { + pub fn create_channels(&self, unify_channels: bool) -> Channels { + if unify_channels { // Returning the same channel is needed when unified scheduler supports block // production because unified scheduler doesn't distinguish them and treats them as // unified as the single source of incoming transactions. This is to reduce the number diff --git a/core/src/tpu.rs b/core/src/tpu.rs index b2340f00e63794..d715bb5c7b0534 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -163,7 +163,7 @@ impl Tpu { tpu_vote_receiver, gossip_vote_sender, gossip_vote_receiver, - } = banking_tracer.create_channels(None /*unified_scheduler_pool.as_ref()*/); + } = banking_tracer.create_channels(false); // Streamer for Votes: let SpawnServerResult { diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 52f32e2c4d2a6d..4a27de9496bb8e 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -354,11 +354,6 @@ where self.scheduler_inners.lock().expect("not poisoned").len() } - pub fn block_production_supported(&self) -> bool { - // to be implemented later, for now always pretend as if block production isn't supported - false - } - pub fn default_handler_count() -> usize { Self::calculate_default_handler_count( thread::available_parallelism()