diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 57f768e2773cd5..e577b2d77f9743 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -8,7 +8,9 @@ use { solana_client::connection_cache::ConnectionCache, solana_core::{ banking_stage::BankingStage, - banking_trace::{BankingPacketBatch, BankingTracer, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT}, + banking_trace::{ + BankingPacketBatch, BankingTracer, Channels, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, + }, validator::BlockProductionMethod, }, solana_gossip::cluster_info::{ClusterInfo, Node}, @@ -440,9 +442,14 @@ fn main() { BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, ))) .unwrap(); - let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); - let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); - let (gossip_vote_sender, gossip_vote_receiver) = banking_tracer.create_channel_gossip_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + tpu_vote_sender, + tpu_vote_receiver, + gossip_vote_sender, + gossip_vote_receiver, + } = banking_tracer.create_channels(false); let cluster_info = { let keypair = Arc::new(Keypair::new()); let node = Node::new_localhost_with_pubkey(&keypair.pubkey()); diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 0f449719ce34cb..18adb713b5b360 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -2,7 +2,7 @@ #![feature(test)] use { - solana_core::validator::BlockProductionMethod, + solana_core::{banking_trace::Channels, validator::BlockProductionMethod}, solana_vote_program::{vote_state::TowerSync, vote_transaction::new_tower_sync_transaction}, }; @@ -211,9 +211,14 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { genesis_config.ticks_per_slot = 10_000; let banking_tracer = BankingTracer::new_disabled(); - let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); - let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); - let (gossip_vote_sender, gossip_vote_receiver) = banking_tracer.create_channel_gossip_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + tpu_vote_sender, + tpu_vote_receiver, + gossip_vote_sender, + gossip_vote_receiver, + } = banking_tracer.create_channels(false); let mut bank = Bank::new_for_benches(&genesis_config); // Allow arbitrary transaction processing time for the purposes of this bench diff --git a/core/benches/banking_trace.rs b/core/benches/banking_trace.rs index fb93deebc17ae2..66bfc84c630436 100644 --- a/core/benches/banking_trace.rs +++ b/core/benches/banking_trace.rs @@ -7,7 +7,7 @@ use { for_test::{ drop_and_clean_temp_dir_unless_suppressed, sample_packet_batch, terminate_tracer, }, - receiving_loop_with_minimized_sender_overhead, BankingPacketBatch, BankingTracer, + receiving_loop_with_minimized_sender_overhead, BankingPacketBatch, BankingTracer, Channels, TraceError, TracerThreadResult, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, }, std::{ @@ -35,7 +35,11 @@ fn black_box_packet_batch(packet_batch: BankingPacketBatch) -> TracerThreadResul fn bench_banking_tracer_main_thread_overhead_noop_baseline(bencher: &mut Bencher) { let exit = Arc::::default(); let tracer = BankingTracer::new_disabled(); - let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + .. + } = tracer.create_channels(false); let exit_for_dummy_thread = exit.clone(); let dummy_main_thread = thread::spawn(move || { @@ -64,7 +68,11 @@ fn bench_banking_tracer_main_thread_overhead_under_peak_write(bencher: &mut Benc BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, ))) .unwrap(); - let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + .. + } = tracer.create_channels(false); let exit_for_dummy_thread = exit.clone(); let dummy_main_thread = thread::spawn(move || { @@ -101,7 +109,11 @@ fn bench_banking_tracer_main_thread_overhead_under_sustained_write(bencher: &mut 1024 * 1024, // cause more frequent trace file rotation ))) .unwrap(); - let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + .. + } = tracer.create_channels(false); let exit_for_dummy_thread = exit.clone(); let dummy_main_thread = thread::spawn(move || { @@ -142,7 +154,11 @@ fn bench_banking_tracer_background_thread_throughput(bencher: &mut Bencher) { let (tracer, tracer_thread) = BankingTracer::new(Some((&path, exit.clone(), 50 * 1024 * 1024))).unwrap(); - let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + .. + } = tracer.create_channels(false); let dummy_main_thread = thread::spawn(move || { receiving_loop_with_minimized_sender_overhead::<_, TraceError, 0>( diff --git a/core/src/banking_simulation.rs b/core/src/banking_simulation.rs index 6e5113ded67336..fca5346726947b 100644 --- a/core/src/banking_simulation.rs +++ b/core/src/banking_simulation.rs @@ -3,8 +3,9 @@ use { crate::{ banking_stage::{BankingStage, LikeClusterInfo}, banking_trace::{ - BankingPacketBatch, BankingTracer, ChannelLabel, TimedTracedEvent, TracedEvent, - TracedSender, TracerThread, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, BASENAME, + BankingPacketBatch, BankingTracer, ChannelLabel, Channels, TimedTracedEvent, + TracedEvent, TracedSender, TracerThread, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, + BASENAME, }, validator::BlockProductionMethod, }, @@ -758,9 +759,14 @@ impl BankingSimulator { BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, ); - let (non_vote_sender, non_vote_receiver) = retracer.create_channel_non_vote(); - let (tpu_vote_sender, tpu_vote_receiver) = retracer.create_channel_tpu_vote(); - let (gossip_vote_sender, gossip_vote_receiver) = retracer.create_channel_gossip_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + tpu_vote_sender, + tpu_vote_receiver, + gossip_vote_sender, + gossip_vote_receiver, + } = retracer.create_channels(false); let connection_cache = Arc::new(ConnectionCache::new("connection_cache_sim")); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 49ccdb6ae15eff..1fd1652887f41d 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -813,7 +813,7 @@ impl BankingStage { mod tests { use { super::*, - crate::banking_trace::{BankingPacketBatch, BankingTracer}, + crate::banking_trace::{BankingPacketBatch, BankingTracer, Channels}, crossbeam_channel::{unbounded, Receiver}, itertools::Itertools, solana_entry::entry::{self, Entry, EntrySlice}, @@ -874,10 +874,14 @@ mod tests { let genesis_config = create_genesis_config(2).genesis_config; let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); let banking_tracer = BankingTracer::new_disabled(); - let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); - let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); - let (gossip_vote_sender, gossip_vote_receiver) = - banking_tracer.create_channel_gossip_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + tpu_vote_sender, + tpu_vote_receiver, + gossip_vote_sender, + gossip_vote_receiver, + } = banking_tracer.create_channels(false); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( @@ -926,10 +930,14 @@ mod tests { let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); let start_hash = bank.last_blockhash(); let banking_tracer = BankingTracer::new_disabled(); - let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); - let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); - let (gossip_vote_sender, gossip_vote_receiver) = - banking_tracer.create_channel_gossip_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + tpu_vote_sender, + tpu_vote_receiver, + gossip_vote_sender, + gossip_vote_receiver, + } = banking_tracer.create_channels(false); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( @@ -1004,10 +1012,14 @@ mod tests { let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); let start_hash = bank.last_blockhash(); let banking_tracer = BankingTracer::new_disabled(); - let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); - let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); - let (gossip_vote_sender, gossip_vote_receiver) = - banking_tracer.create_channel_gossip_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + tpu_vote_sender, + tpu_vote_receiver, + gossip_vote_sender, + gossip_vote_receiver, + } = banking_tracer.create_channels(false); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( @@ -1138,7 +1150,14 @@ mod tests { .. } = create_slow_genesis_config(2); let banking_tracer = BankingTracer::new_disabled(); - let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + tpu_vote_sender, + tpu_vote_receiver, + gossip_vote_sender, + gossip_vote_receiver, + } = banking_tracer.create_channels(false); // Process a batch that includes a transaction that receives two lamports. let alice = Keypair::new(); @@ -1168,9 +1187,6 @@ mod tests { .send(BankingPacketBatch::new((packet_batches, None))) .unwrap(); - let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); - let (gossip_vote_sender, gossip_vote_receiver) = - banking_tracer.create_channel_gossip_vote(); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let (replay_vote_sender, _replay_vote_receiver) = unbounded(); @@ -1361,10 +1377,14 @@ mod tests { let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); let start_hash = bank.last_blockhash(); let banking_tracer = BankingTracer::new_disabled(); - let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); - let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); - let (gossip_vote_sender, gossip_vote_receiver) = - banking_tracer.create_channel_gossip_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + tpu_vote_sender, + tpu_vote_receiver, + gossip_vote_sender, + gossip_vote_receiver, + } = banking_tracer.create_channels(false); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( diff --git a/core/src/banking_trace.rs b/core/src/banking_trace.rs index 6e0797c8c3842f..ba5b1e79e49fd7 100644 --- a/core/src/banking_trace.rs +++ b/core/src/banking_trace.rs @@ -178,6 +178,15 @@ pub fn receiving_loop_with_minimized_sender_overhead( Ok(()) } +pub struct Channels { + pub non_vote_sender: BankingPacketSender, + pub non_vote_receiver: BankingPacketReceiver, + pub tpu_vote_sender: BankingPacketSender, + pub tpu_vote_receiver: BankingPacketReceiver, + pub gossip_vote_sender: BankingPacketSender, + pub gossip_vote_receiver: BankingPacketReceiver, +} + impl BankingTracer { pub fn new( maybe_config: Option<(&PathBuf, Arc, DirByteLimit)>, @@ -220,22 +229,85 @@ impl BankingTracer { self.active_tracer.is_some() } + pub fn create_channels(&self, unify_channels: bool) -> Channels { + if unify_channels { + // Returning the same channel is needed when unified scheduler supports block + // production because unified scheduler doesn't distinguish them and treats them as + // unified as the single source of incoming transactions. This is to reduce the number + // of recv operation per loop and load balance evenly as much as possible there. + let (non_vote_sender, non_vote_receiver) = self.create_channel_non_vote(); + // Tap into some private helper fns so that banking trace labelling works as before. + let (tpu_vote_sender, tpu_vote_receiver) = + self.create_unified_channel_tpu_vote(&non_vote_sender, &non_vote_receiver); + let (gossip_vote_sender, gossip_vote_receiver) = + self.create_unified_channel_gossip_vote(&non_vote_sender, &non_vote_receiver); + + Channels { + non_vote_sender, + non_vote_receiver, + tpu_vote_sender, + tpu_vote_receiver, + gossip_vote_sender, + gossip_vote_receiver, + } + } else { + let (non_vote_sender, non_vote_receiver) = self.create_channel_non_vote(); + let (tpu_vote_sender, tpu_vote_receiver) = self.create_channel_tpu_vote(); + let (gossip_vote_sender, gossip_vote_receiver) = self.create_channel_gossip_vote(); + + Channels { + non_vote_sender, + non_vote_receiver, + tpu_vote_sender, + tpu_vote_receiver, + gossip_vote_sender, + gossip_vote_receiver, + } + } + } + fn create_channel(&self, label: ChannelLabel) -> (BankingPacketSender, BankingPacketReceiver) { Self::channel(label, self.active_tracer.as_ref().cloned()) } - pub fn create_channel_non_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) { + fn create_channel_non_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) { self.create_channel(ChannelLabel::NonVote) } - pub fn create_channel_tpu_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) { + fn create_channel_tpu_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) { self.create_channel(ChannelLabel::TpuVote) } - pub fn create_channel_gossip_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) { + fn create_channel_gossip_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) { self.create_channel(ChannelLabel::GossipVote) } + fn create_unified_channel_tpu_vote( + &self, + sender: &TracedSender, + receiver: &BankingPacketReceiver, + ) -> (BankingPacketSender, BankingPacketReceiver) { + Self::channel_inner( + ChannelLabel::TpuVote, + self.active_tracer.as_ref().cloned(), + sender.sender.clone(), + receiver.clone(), + ) + } + + fn create_unified_channel_gossip_vote( + &self, + sender: &TracedSender, + receiver: &BankingPacketReceiver, + ) -> (BankingPacketSender, BankingPacketReceiver) { + Self::channel_inner( + ChannelLabel::GossipVote, + self.active_tracer.as_ref().cloned(), + sender.sender.clone(), + receiver.clone(), + ) + } + pub fn hash_event(&self, slot: Slot, blockhash: &Hash, bank_hash: &Hash) { self.trace_event(|| { TimedTracedEvent( @@ -264,6 +336,15 @@ impl BankingTracer { active_tracer: Option, ) -> (TracedSender, Receiver) { let (sender, receiver) = unbounded(); + Self::channel_inner(label, active_tracer, sender, receiver) + } + + fn channel_inner( + label: ChannelLabel, + active_tracer: Option, + sender: Sender, + receiver: BankingPacketReceiver, + ) -> (TracedSender, Receiver) { (TracedSender::new(label, sender, active_tracer), receiver) } diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 091a5901c2311e..d715bb5c7b0534 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -5,7 +5,7 @@ pub use solana_sdk::net::DEFAULT_TPU_COALESCE; use { crate::{ banking_stage::BankingStage, - banking_trace::{BankingTracer, TracerThread}, + banking_trace::{BankingTracer, Channels, TracerThread}, cluster_info_vote_listener::{ ClusterInfoVoteListener, DuplicateConfirmedSlotsSender, GossipVerifiedVoteHashSender, VerifiedVoteSender, VoteTracker, @@ -156,7 +156,14 @@ impl Tpu { shared_staked_nodes_overrides, ); - let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); + let Channels { + non_vote_sender, + non_vote_receiver, + tpu_vote_sender, + tpu_vote_receiver, + gossip_vote_sender, + gossip_vote_receiver, + } = banking_tracer.create_channels(false); // Streamer for Votes: let SpawnServerResult { @@ -235,8 +242,6 @@ impl Tpu { SigVerifyStage::new(packet_receiver, verifier, "solSigVerTpu", "tpu-verifier") }; - let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); - let vote_sigverify_stage = { let verifier = TransactionSigVerifier::new_reject_non_vote(tpu_vote_sender); SigVerifyStage::new( @@ -247,8 +252,6 @@ impl Tpu { ) }; - let (gossip_vote_sender, gossip_vote_receiver) = - banking_tracer.create_channel_gossip_vote(); let cluster_info_vote_listener = ClusterInfoVoteListener::new( exit.clone(), cluster_info.clone(),