diff --git a/core/src/banking_simulation.rs b/core/src/banking_simulation.rs index 6e5113ded67336..6625011c88d0a7 100644 --- a/core/src/banking_simulation.rs +++ b/core/src/banking_simulation.rs @@ -488,8 +488,7 @@ impl SimulatorLoop { .bank_forks .read() .unwrap() - .working_bank_with_scheduler() - .clone_with_scheduler(); + .working_bank_with_scheduler(); self.poh_recorder .write() .unwrap() @@ -676,11 +675,7 @@ impl BankingSimulator { let parent_slot = self.parent_slot().unwrap(); let mut packet_batches_by_time = self.banking_trace_events.packet_batches_by_time; let freeze_time_by_slot = self.banking_trace_events.freeze_time_by_slot; - let bank = bank_forks - .read() - .unwrap() - .working_bank_with_scheduler() - .clone_with_scheduler(); + let bank = bank_forks.read().unwrap().working_bank_with_scheduler(); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); assert_eq!(parent_slot, bank.slot()); diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index 83c2e0ab3fd675..1627f8113021db 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -284,8 +284,8 @@ impl BankForks { self[self.highest_slot()].clone() } - pub fn working_bank_with_scheduler(&self) -> &BankWithScheduler { - &self.banks[&self.highest_slot()] + pub fn working_bank_with_scheduler(&self) -> BankWithScheduler { + self.banks[&self.highest_slot()].clone_with_scheduler() } /// Register to be notified when a bank has been dumped (due to duplicate block handling) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 11831c64e74848..a90a3c0b8a449d 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -551,7 +551,7 @@ mod chained_channel { pub(super) fn send_chained_channel( &mut self, - context: C, + context: &C, count: usize, ) -> std::result::Result<(), SendError>> { let (chained_sender, chained_receiver) = crossbeam_channel::unbounded(); @@ -771,7 +771,6 @@ impl, TH: TaskHandler> ThreadManager { fn new(pool: Arc>) -> Self { let (new_task_sender, new_task_receiver) = crossbeam_channel::unbounded(); let (session_result_sender, session_result_receiver) = crossbeam_channel::unbounded(); - let handler_count = pool.handler_count; Self { scheduler_id: pool.new_scheduler_id(), @@ -782,7 +781,7 @@ impl, TH: TaskHandler> ThreadManager { session_result_receiver, session_result_with_timings: None, scheduler_thread: None, - handler_threads: Vec::with_capacity(handler_count), + handler_threads: vec![], } } @@ -1101,7 +1100,7 @@ impl, TH: TaskHandler> ThreadManager { // enter into the preceding `while(!is_finished) {...}` loop again. // Before that, propagate new SchedulingContext to handler threads runnable_task_sender - .send_chained_channel(new_context, handler_count) + .send_chained_channel(&new_context, handler_count) .unwrap(); result_with_timings = new_result_with_timings; } @@ -1152,54 +1151,56 @@ impl, TH: TaskHandler> ThreadManager { // 2. Subsequent contexts are propagated explicitly inside `.after_select()` as part of // `select_biased!`, which are sent from `.send_chained_channel()` in the scheduler // thread for all-but-initial sessions. - move || loop { - let (task, sender) = select_biased! { - recv(runnable_task_receiver.for_select()) -> message => { - let Ok(message) = message else { - break; - }; - if let Some(task) = runnable_task_receiver.after_select(message) { - (task, &finished_blocked_task_sender) - } else { - continue; + move || { + loop { + let (task, sender) = select_biased! { + recv(runnable_task_receiver.for_select()) -> message => { + let Ok(message) = message else { + break; + }; + if let Some(task) = runnable_task_receiver.after_select(message) { + (task, &finished_blocked_task_sender) + } else { + continue; + } + }, + recv(runnable_task_receiver.aux_for_select()) -> task => { + if let Ok(task) = task { + (task, &finished_idle_task_sender) + } else { + runnable_task_receiver.never_receive_from_aux(); + continue; + } + }, + }; + defer! { + if !thread::panicking() { + return; } - }, - recv(runnable_task_receiver.aux_for_select()) -> task => { - if let Ok(task) = task { - (task, &finished_idle_task_sender) + + // The scheduler thread can't detect panics in handler threads with + // disconnected channel errors, unless all of them has died. So, send an + // explicit Err promptly. + let current_thread = thread::current(); + error!("handler thread is panicking: {:?}", current_thread); + if sender.send(Err(HandlerPanicked)).is_ok() { + info!("notified a panic from {:?}", current_thread); } else { - runnable_task_receiver.never_receive_from_aux(); - continue; + // It seems that the scheduler thread has been aborted already... + warn!("failed to notify a panic from {:?}", current_thread); } - }, - }; - defer! { - if !thread::panicking() { - return; } - - // The scheduler thread can't detect panics in handler threads with - // disconnected channel errors, unless all of them has died. So, send an - // explicit Err promptly. - let current_thread = thread::current(); - error!("handler thread is panicking: {:?}", current_thread); - if sender.send(Err(HandlerPanicked)).is_ok() { - info!("notified a panic from {:?}", current_thread); - } else { - // It seems that the scheduler thread has been aborted already... - warn!("failed to notify a panic from {:?}", current_thread); + let mut task = ExecutedTask::new_boxed(task); + Self::execute_task_with_handler( + runnable_task_receiver.context(), + &mut task, + &pool.handler_context, + ); + if sender.send(Ok(task)).is_err() { + warn!("handler_thread: scheduler thread aborted..."); + break; } } - let mut task = ExecutedTask::new_boxed(task); - Self::execute_task_with_handler( - runnable_task_receiver.context(), - &mut task, - &pool.handler_context, - ); - if sender.send(Ok(task)).is_err() { - warn!("handler_thread: scheduler thread aborted..."); - break; - } } }; @@ -1441,7 +1442,7 @@ impl InstalledScheduler for PooledScheduler { impl UninstalledScheduler for PooledSchedulerInner where - S: SpawnableScheduler>, + S: SpawnableScheduler, TH: TaskHandler, { fn return_to_pool(self: Box) {