Skip to content

Commit

Permalink
i#6831 sched refactor, step 5: Split check for whether to switch inpu…
Browse files Browse the repository at this point in the history
…ts (#7079)

Splits the mode-specific middle of next_record() into a new virtual
method check_for_input_switch() for identifying whether to trigger a
switch and call pick_next_input_for_mode(). The new method is
implemented in the new subclasses scheduler_{dynamic,fixed,replay}.cpp.

Issue: #6831
  • Loading branch information
derekbruening authored Nov 13, 2024
1 parent 88abaaa commit 1977651
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 170 deletions.
117 changes: 117 additions & 0 deletions clients/drcachesim/scheduler/scheduler_dynamic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,123 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::pick_next_input_for_mode(
return sched_type_t::STATUS_OK;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::check_for_input_switch(
output_ordinal_t output, RecordType &record, input_info_t *input, uint64_t cur_time,
bool &need_new_input, bool &preempt, uint64_t &blocked_time)
{
trace_marker_type_t marker_type;
uintptr_t marker_value;
// While regular traces typically always have a syscall marker when
// there's a maybe-blocking marker, some tests and synthetic traces have
// just the maybe so we check both.
if (input->processing_syscall || input->processing_maybe_blocking_syscall) {
// Wait until we're past all the markers associated with the syscall.
// XXX: We may prefer to stop before the return value marker for
// futex, or a kernel xfer marker, but our recorded format is on instr
// boundaries so we live with those being before the switch.
// XXX: Once we insert kernel traces, we may have to try harder
// to stop before the post-syscall records.
if (this->record_type_is_instr_boundary(record,
this->outputs_[output].last_record)) {
if (input->switch_to_input != sched_type_t::INVALID_INPUT_ORDINAL) {
// The switch request overrides any latency threshold.
need_new_input = true;
VPRINT(this, 3,
"next_record[%d]: direct switch on low-latency "
"syscall in "
"input %d\n",
output, input->index);
} else if (input->blocked_time > 0) {
// If we've found out another way that this input should
// block, use that time and do a switch.
need_new_input = true;
blocked_time = input->blocked_time;
VPRINT(this, 3, "next_record[%d]: blocked time set for input %d\n",
output, input->index);
} else if (input->unscheduled) {
need_new_input = true;
VPRINT(this, 3, "next_record[%d]: input %d going unscheduled\n", output,
input->index);
} else if (this->syscall_incurs_switch(input, blocked_time)) {
// Model as blocking and should switch to a different input.
need_new_input = true;
VPRINT(this, 3, "next_record[%d]: hit blocking syscall in input %d\n",
output, input->index);
}
input->processing_syscall = false;
input->processing_maybe_blocking_syscall = false;
input->pre_syscall_timestamp = 0;
input->syscall_timeout_arg = 0;
}
}
if (this->outputs_[output].hit_switch_code_end) {
// We have to delay so the end marker is still in_context_switch_code.
this->outputs_[output].in_context_switch_code = false;
this->outputs_[output].hit_switch_code_end = false;
// We're now back "on the clock".
if (this->options_.quantum_unit == sched_type_t::QUANTUM_TIME)
input->prev_time_in_quantum = cur_time;
// XXX: If we add a skip feature triggered on the output stream,
// we'll want to make sure skipping while in these switch and kernel
// sequences is handled correctly.
}
if (this->record_type_is_marker(record, marker_type, marker_value)) {
this->process_marker(*input, output, marker_type, marker_value);
}
if (this->options_.quantum_unit == sched_type_t::QUANTUM_INSTRUCTIONS &&
this->record_type_is_instr_boundary(record, this->outputs_[output].last_record) &&
!this->outputs_[output].in_kernel_code) {
++input->instrs_in_quantum;
if (input->instrs_in_quantum > this->options_.quantum_duration_instrs) {
// We again prefer to switch to another input even if the current
// input has the oldest timestamp, prioritizing context switches
// over timestamp ordering.
VPRINT(this, 4, "next_record[%d]: input %d hit end of instr quantum\n",
output, input->index);
preempt = true;
need_new_input = true;
input->instrs_in_quantum = 0;
++this->outputs_[output]
.stats[memtrace_stream_t::SCHED_STAT_QUANTUM_PREEMPTS];
}
} else if (this->options_.quantum_unit == sched_type_t::QUANTUM_TIME) {
if (cur_time == 0 || cur_time < input->prev_time_in_quantum) {
VPRINT(this, 1,
"next_record[%d]: invalid time %" PRIu64 " vs start %" PRIu64 "\n",
output, cur_time, input->prev_time_in_quantum);
return sched_type_t::STATUS_INVALID;
}
input->time_spent_in_quantum += cur_time - input->prev_time_in_quantum;
input->prev_time_in_quantum = cur_time;
double elapsed_micros = static_cast<double>(input->time_spent_in_quantum) /
this->options_.time_units_per_us;
if (elapsed_micros >= this->options_.quantum_duration_us &&
// We only switch on instruction boundaries. We could possibly switch
// in between (e.g., scatter/gather long sequence of reads/writes) by
// setting input->switching_pre_instruction.
this->record_type_is_instr_boundary(record,
this->outputs_[output].last_record)) {
VPRINT(this, 4,
"next_record[%d]: input %d hit end of time quantum after %" PRIu64
"\n",
output, input->index, input->time_spent_in_quantum);
preempt = true;
need_new_input = true;
input->time_spent_in_quantum = 0;
++this->outputs_[output]
.stats[memtrace_stream_t::SCHED_STAT_QUANTUM_PREEMPTS];
}
}
// For sched_type_t::DEPENDENCY_TIMESTAMPS: enforcing asked-for
// context switch rates is more important that honoring precise
// trace-buffer-based timestamp inter-input dependencies so we do not end a
// quantum early due purely to timestamps.

return sched_type_t::STATUS_OK;
}

template class scheduler_dynamic_tmpl_t<memref_t, reader_t>;
template class scheduler_dynamic_tmpl_t<trace_entry_t,
dynamorio::drmemtrace::record_reader_t>;
Expand Down
12 changes: 12 additions & 0 deletions clients/drcachesim/scheduler/scheduler_fixed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,18 @@ scheduler_fixed_tmpl_t<RecordType, ReaderType>::pick_next_input_for_mode(
return sched_type_t::STATUS_OK;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_fixed_tmpl_t<RecordType, ReaderType>::check_for_input_switch(
output_ordinal_t output, RecordType &record, input_info_t *input, uint64_t cur_time,
bool &need_new_input, bool &preempt, uint64_t &blocked_time)
{
if (this->options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS &&
this->record_type_is_timestamp(record, input->next_timestamp))
need_new_input = true;
return sched_type_t::STATUS_OK;
}

template class scheduler_fixed_tmpl_t<memref_t, reader_t>;
template class scheduler_fixed_tmpl_t<trace_entry_t,
dynamorio::drmemtrace::record_reader_t>;
Expand Down
171 changes: 7 additions & 164 deletions clients/drcachesim/scheduler/scheduler_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3408,168 +3408,11 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t outp
bool need_new_input = false;
bool preempt = false;
uint64_t blocked_time = 0;
uint64_t prev_time_in_quantum = 0;
// XXX i#6831: Refactor to use subclasses or templates to specialize
// scheduler code based on mapping options, to avoid these top-level
// conditionals in many functions? The next_record() and pick_next_input()
// could also be put into output_info_t, promoting it to a class and
// subclassing it per mapping mode.
if (options_.mapping == sched_type_t::MAP_AS_PREVIOUSLY) {
// Our own index is only modified by us so we can cache it here.
int record_index =
outputs_[output].record_index->load(std::memory_order_acquire);
assert(record_index >= 0);
if (record_index >= static_cast<int>(outputs_[output].record.size())) {
// We're on the last record.
VPRINT(this, 4, "next_record[%d]: on last record\n", output);
} else if (outputs_[output].record[record_index].type ==
schedule_record_t::SKIP) {
VPRINT(this, 5, "next_record[%d]: need new input after skip\n", output);
need_new_input = true;
} else if (outputs_[output].record[record_index].type ==
schedule_record_t::SYNTHETIC_END) {
VPRINT(this, 5, "next_record[%d]: at synthetic end\n", output);
} else {
const schedule_record_t &segment = outputs_[output].record[record_index];
assert(segment.type == schedule_record_t::DEFAULT);
uint64_t start = segment.value.start_instruction;
uint64_t stop = segment.stop_instruction;
// The stop is exclusive. 0 does mean to do nothing (easiest
// to have an empty record to share the next-entry for a start skip
// or other cases).
// Only check for stop when we've exhausted the queue, or we have
// a starter schedule with a 0,0 entry prior to a first skip entry
// (as just mentioned, it is easier to have a seemingly-redundant entry
// to get into the trace reading loop and then do something like a skip
// from the start rather than adding logic into the setup code).
if (get_instr_ordinal(*input) >= stop &&
(!input->cur_from_queue || (start == 0 && stop == 0))) {
VPRINT(this, 5,
"next_record[%d]: need new input: at end of segment in=%d "
"stop=%" PRId64 "\n",
output, input->index, stop);
need_new_input = true;
}
}
} else if (options_.mapping == sched_type_t::MAP_TO_ANY_OUTPUT) {
trace_marker_type_t marker_type;
uintptr_t marker_value;
// While regular traces typically always have a syscall marker when
// there's a maybe-blocking marker, some tests and synthetic traces have
// just the maybe so we check both.
if (input->processing_syscall || input->processing_maybe_blocking_syscall) {
// Wait until we're past all the markers associated with the syscall.
// XXX: We may prefer to stop before the return value marker for
// futex, or a kernel xfer marker, but our recorded format is on instr
// boundaries so we live with those being before the switch.
// XXX: Once we insert kernel traces, we may have to try harder
// to stop before the post-syscall records.
if (record_type_is_instr_boundary(record, outputs_[output].last_record)) {
if (input->switch_to_input != sched_type_t::INVALID_INPUT_ORDINAL) {
// The switch request overrides any latency threshold.
need_new_input = true;
VPRINT(this, 3,
"next_record[%d]: direct switch on low-latency "
"syscall in "
"input %d\n",
output, input->index);
} else if (input->blocked_time > 0) {
// If we've found out another way that this input should
// block, use that time and do a switch.
need_new_input = true;
blocked_time = input->blocked_time;
VPRINT(this, 3,
"next_record[%d]: blocked time set for input %d\n", output,
input->index);
} else if (input->unscheduled) {
need_new_input = true;
VPRINT(this, 3, "next_record[%d]: input %d going unscheduled\n",
output, input->index);
} else if (syscall_incurs_switch(input, blocked_time)) {
// Model as blocking and should switch to a different input.
need_new_input = true;
VPRINT(this, 3,
"next_record[%d]: hit blocking syscall in input %d\n",
output, input->index);
}
input->processing_syscall = false;
input->processing_maybe_blocking_syscall = false;
input->pre_syscall_timestamp = 0;
input->syscall_timeout_arg = 0;
}
}
if (outputs_[output].hit_switch_code_end) {
// We have to delay so the end marker is still in_context_switch_code.
outputs_[output].in_context_switch_code = false;
outputs_[output].hit_switch_code_end = false;
// We're now back "on the clock".
if (options_.quantum_unit == sched_type_t::QUANTUM_TIME)
input->prev_time_in_quantum = cur_time;
// XXX: If we add a skip feature triggered on the output stream,
// we'll want to make sure skipping while in these switch and kernel
// sequences is handled correctly.
}
if (record_type_is_marker(record, marker_type, marker_value)) {
process_marker(*input, output, marker_type, marker_value);
}
if (options_.quantum_unit == sched_type_t::QUANTUM_INSTRUCTIONS &&
record_type_is_instr_boundary(record, outputs_[output].last_record) &&
!outputs_[output].in_kernel_code) {
++input->instrs_in_quantum;
if (input->instrs_in_quantum > options_.quantum_duration_instrs) {
// We again prefer to switch to another input even if the current
// input has the oldest timestamp, prioritizing context switches
// over timestamp ordering.
VPRINT(this, 4,
"next_record[%d]: input %d hit end of instr quantum\n", output,
input->index);
preempt = true;
need_new_input = true;
input->instrs_in_quantum = 0;
++outputs_[output]
.stats[memtrace_stream_t::SCHED_STAT_QUANTUM_PREEMPTS];
}
} else if (options_.quantum_unit == sched_type_t::QUANTUM_TIME) {
if (cur_time == 0 || cur_time < input->prev_time_in_quantum) {
VPRINT(this, 1,
"next_record[%d]: invalid time %" PRIu64 " vs start %" PRIu64
"\n",
output, cur_time, input->prev_time_in_quantum);
return sched_type_t::STATUS_INVALID;
}
input->time_spent_in_quantum += cur_time - input->prev_time_in_quantum;
prev_time_in_quantum = input->prev_time_in_quantum;
input->prev_time_in_quantum = cur_time;
double elapsed_micros =
static_cast<double>(input->time_spent_in_quantum) /
options_.time_units_per_us;
if (elapsed_micros >= options_.quantum_duration_us &&
// We only switch on instruction boundaries. We could possibly switch
// in between (e.g., scatter/gather long sequence of reads/writes) by
// setting input->switching_pre_instruction.
record_type_is_instr_boundary(record, outputs_[output].last_record)) {
VPRINT(
this, 4,
"next_record[%d]: input %d hit end of time quantum after %" PRIu64
"\n",
output, input->index, input->time_spent_in_quantum);
preempt = true;
need_new_input = true;
input->time_spent_in_quantum = 0;
++outputs_[output]
.stats[memtrace_stream_t::SCHED_STAT_QUANTUM_PREEMPTS];
}
}
}
if (options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS &&
options_.mapping != sched_type_t::MAP_AS_PREVIOUSLY &&
// For sched_type_t::MAP_TO_ANY_OUTPUT with timestamps: enforcing asked-for
// context switch rates is more important that honoring precise
// trace-buffer-based timestamp inter-input dependencies so we do not end a
// quantum early due purely to timestamps.
options_.mapping != sched_type_t::MAP_TO_ANY_OUTPUT &&
record_type_is_timestamp(record, input->next_timestamp))
need_new_input = true;
uint64_t prev_time_in_quantum = input->prev_time_in_quantum;
stream_status_t res = check_for_input_switch(
output, record, input, cur_time, need_new_input, preempt, blocked_time);
if (res != sched_type_t::STATUS_OK && res != sched_type_t::STATUS_SKIPPED)
return res;
if (need_new_input) {
int prev_input = outputs_[output].cur_input;
VPRINT(this, 5, "next_record[%d]: need new input (cur=%d)\n", output,
Expand All @@ -3579,7 +3422,7 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t outp
VPRINT(this, 5, "next_record[%d]: queuing candidate record\n", output);
input->queue.push_back(record);
lock.unlock();
stream_status_t res = pick_next_input(output, blocked_time);
res = pick_next_input(output, blocked_time);
if (res != sched_type_t::STATUS_OK && res != sched_type_t::STATUS_WAIT &&
res != sched_type_t::STATUS_SKIPPED)
return res;
Expand Down Expand Up @@ -3634,7 +3477,7 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t outp
if (input->needs_roi && options_.mapping != sched_type_t::MAP_AS_PREVIOUSLY &&
!input->regions_of_interest.empty()) {
input_ordinal_t prev_input = input->index;
stream_status_t res = advance_region_of_interest(output, record, *input);
res = advance_region_of_interest(output, record, *input);
if (res == sched_type_t::STATUS_SKIPPED) {
// We need either the queue or to re-de-ref the reader so we loop,
// but we do not want to come back here.
Expand Down
Loading

0 comments on commit 1977651

Please sign in to comment.