Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply misc small cleanups to unified scheduler #4080

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions core/src/banking_simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,7 @@ impl SimulatorLoop {
.bank_forks
.read()
.unwrap()
.working_bank_with_scheduler()
.clone_with_scheduler();
.working_bank_with_scheduler();
self.poh_recorder
.write()
.unwrap()
Expand Down Expand Up @@ -676,11 +675,7 @@ impl BankingSimulator {
let parent_slot = self.parent_slot().unwrap();
let mut packet_batches_by_time = self.banking_trace_events.packet_batches_by_time;
let freeze_time_by_slot = self.banking_trace_events.freeze_time_by_slot;
let bank = bank_forks
.read()
.unwrap()
.working_bank_with_scheduler()
.clone_with_scheduler();
let bank = bank_forks.read().unwrap().working_bank_with_scheduler();

let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
assert_eq!(parent_slot, bank.slot());
Expand Down
4 changes: 2 additions & 2 deletions runtime/src/bank_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,8 @@ impl BankForks {
self[self.highest_slot()].clone()
}

pub fn working_bank_with_scheduler(&self) -> &BankWithScheduler {
&self.banks[&self.highest_slot()]
pub fn working_bank_with_scheduler(&self) -> BankWithScheduler {
self.banks[&self.highest_slot()].clone_with_scheduler()
Comment on lines +287 to +288
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it turned out returning all call sites ended up calling .clone_with_scheduler() immediately after this. so, just do it inside here.

also, this makes it consistent the cousin (BankForks::working_bank), which isn't returning refs as well.

}

/// Register to be notified when a bank has been dumped (due to duplicate block handling)
Expand Down
95 changes: 48 additions & 47 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ mod chained_channel {

pub(super) fn send_chained_channel(
&mut self,
context: C,
context: &C,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taking ref here will soon be desired in upcoming pr...

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

difference here is only that context is not dropped at the end of this function.

count: usize,
) -> std::result::Result<(), SendError<ChainedChannel<P, C>>> {
let (chained_sender, chained_receiver) = crossbeam_channel::unbounded();
Expand Down Expand Up @@ -771,7 +771,6 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
fn new(pool: Arc<SchedulerPool<S, TH>>) -> Self {
let (new_task_sender, new_task_receiver) = crossbeam_channel::unbounded();
let (session_result_sender, session_result_receiver) = crossbeam_channel::unbounded();
let handler_count = pool.handler_count;

Self {
scheduler_id: pool.new_scheduler_id(),
Expand All @@ -782,7 +781,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
session_result_receiver,
session_result_with_timings: None,
scheduler_thread: None,
handler_threads: Vec::with_capacity(handler_count),
handler_threads: vec![],
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handler_threads is reassigned in start_threads, so no need to reserve capacity. also, this isn't perf sensitive code.

}
}

Expand Down Expand Up @@ -1101,7 +1100,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
// enter into the preceding `while(!is_finished) {...}` loop again.
// Before that, propagate new SchedulingContext to handler threads
runnable_task_sender
.send_chained_channel(new_context, handler_count)
.send_chained_channel(&new_context, handler_count)
.unwrap();
result_with_timings = new_result_with_timings;
}
Expand Down Expand Up @@ -1152,54 +1151,56 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
// 2. Subsequent contexts are propagated explicitly inside `.after_select()` as part of
// `select_biased!`, which are sent from `.send_chained_channel()` in the scheduler
// thread for all-but-initial sessions.
move || loop {
let (task, sender) = select_biased! {
recv(runnable_task_receiver.for_select()) -> message => {
let Ok(message) = message else {
break;
};
if let Some(task) = runnable_task_receiver.after_select(message) {
(task, &finished_blocked_task_sender)
} else {
continue;
move || {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just indent stuff

loop {
let (task, sender) = select_biased! {
recv(runnable_task_receiver.for_select()) -> message => {
let Ok(message) = message else {
break;
};
if let Some(task) = runnable_task_receiver.after_select(message) {
(task, &finished_blocked_task_sender)
} else {
continue;
}
},
recv(runnable_task_receiver.aux_for_select()) -> task => {
if let Ok(task) = task {
(task, &finished_idle_task_sender)
} else {
runnable_task_receiver.never_receive_from_aux();
continue;
}
},
};
defer! {
if !thread::panicking() {
return;
}
},
recv(runnable_task_receiver.aux_for_select()) -> task => {
if let Ok(task) = task {
(task, &finished_idle_task_sender)

// The scheduler thread can't detect panics in handler threads with
// disconnected channel errors, unless all of them has died. So, send an
// explicit Err promptly.
let current_thread = thread::current();
error!("handler thread is panicking: {:?}", current_thread);
if sender.send(Err(HandlerPanicked)).is_ok() {
info!("notified a panic from {:?}", current_thread);
} else {
runnable_task_receiver.never_receive_from_aux();
continue;
// It seems that the scheduler thread has been aborted already...
warn!("failed to notify a panic from {:?}", current_thread);
}
},
};
defer! {
if !thread::panicking() {
return;
}

// The scheduler thread can't detect panics in handler threads with
// disconnected channel errors, unless all of them has died. So, send an
// explicit Err promptly.
let current_thread = thread::current();
error!("handler thread is panicking: {:?}", current_thread);
if sender.send(Err(HandlerPanicked)).is_ok() {
info!("notified a panic from {:?}", current_thread);
} else {
// It seems that the scheduler thread has been aborted already...
warn!("failed to notify a panic from {:?}", current_thread);
let mut task = ExecutedTask::new_boxed(task);
Self::execute_task_with_handler(
runnable_task_receiver.context(),
&mut task,
&pool.handler_context,
);
if sender.send(Ok(task)).is_err() {
warn!("handler_thread: scheduler thread aborted...");
break;
}
}
let mut task = ExecutedTask::new_boxed(task);
Self::execute_task_with_handler(
runnable_task_receiver.context(),
&mut task,
&pool.handler_context,
);
if sender.send(Ok(task)).is_err() {
warn!("handler_thread: scheduler thread aborted...");
break;
}
}
};

Expand Down Expand Up @@ -1441,7 +1442,7 @@ impl<TH: TaskHandler> InstalledScheduler for PooledScheduler<TH> {

impl<S, TH> UninstalledScheduler for PooledSchedulerInner<S, TH>
where
S: SpawnableScheduler<TH, Inner = PooledSchedulerInner<S, TH>>,
S: SpawnableScheduler<TH, Inner = Self>,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see 2 line above... ;)

TH: TaskHandler,
{
fn return_to_pool(self: Box<Self>) {
Expand Down
Loading