Skip to content

Commit

Permalink
i#6882: Set drmemtrace scheduler init state to post-skip (#6894)
Browse files Browse the repository at this point in the history
For inputs with regions of interest that skip over their initial
records, the drmemtrace scheduler was using the initial records for the
initial timestamp and start-unscheduled state, which is incorrect.

Solves this by applying an initial region skip at init time instead of
waiting for an input to be scheduled. This requires shifting around when
record file output is recorded.

Adds a unit test that fails without this fix as it leaves one input
permanently unscheduled and has the two inputs in the wrong order
without using the skip timestamp.

Fixes #6882
  • Loading branch information
derekbruening authored Jul 19, 2024
1 parent 8d5e61c commit e099287
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 55 deletions.
151 changes: 105 additions & 46 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1676,7 +1676,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::process_next_initial_record(
return false;
}
} else if (marker_type == TRACE_MARKER_TYPE_SYSCALL_UNSCHEDULE) {
if (options_.honor_direct_switches) {
if (options_.honor_direct_switches && options_.mapping != MAP_AS_PREVIOUSLY) {
input.unscheduled = true;
// Ignore this marker during regular processing.
input.skip_next_unscheduled = true;
Expand All @@ -1701,6 +1701,29 @@ scheduler_tmpl_t<RecordType, ReaderType>::get_initial_input_content(
// output stream(s).
for (size_t i = 0; i < inputs_.size(); ++i) {
input_info_t &input = inputs_[i];

// If the input jumps to the middle immediately, do that now so we'll have
// the proper start timestamp.
if (!input.regions_of_interest.empty() &&
// The docs say for replay we allow the user to pass ROI but ignore it.
// Maybe we should disallow it so we don't need checks like this?
options_.mapping != MAP_AS_PREVIOUSLY) {
RecordType record = create_invalid_record();
sched_type_t::stream_status_t res =
advance_region_of_interest(/*output=*/-1, record, input);
if (res == sched_type_t::STATUS_SKIPPED) {
input.next_timestamp =
static_cast<uintptr_t>(input.reader->get_last_timestamp());
// We can skip the rest of the loop here (the filetype will be there
// in the stream).
continue;
}
if (res != sched_type_t::STATUS_OK) {
VPRINT(this, 1, "Failed to advance initial ROI with status %d\n", res);
return sched_type_t::STATUS_ERROR_RANGE_INVALID;
}
}

bool found_filetype = false;
bool found_timestamp = !gather_timestamps || input.next_timestamp > 0;
if (process_next_initial_record(input, create_invalid_record(), found_filetype,
Expand Down Expand Up @@ -2078,30 +2101,60 @@ scheduler_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(
return sched_type_t::STATUS_OK;

VPRINT(this, 2,
"skipping from %" PRIu64 " to %" PRIu64 " instrs (%" PRIu64
"skipping from %" PRId64 " to %" PRIu64 " instrs (%" PRIu64
" in reader) for ROI\n",
cur_instr, cur_range.start_instruction,
cur_range.start_instruction - cur_reader_instr - 1);
if (options_.schedule_record_ostream != nullptr) {
sched_type_t::stream_status_t status = close_schedule_segment(output, input);
if (status != sched_type_t::STATUS_OK)
return status;
status = record_schedule_segment(output, schedule_record_t::SKIP, input.index,
cur_instr, cur_range.start_instruction);
if (status != sched_type_t::STATUS_OK)
return status;
status = record_schedule_segment(output, schedule_record_t::DEFAULT, input.index,
cur_range.start_instruction);
if (status != sched_type_t::STATUS_OK)
return status;
if (output >= 0) {
record_schedule_skip(output, input.index, cur_instr,
cur_range.start_instruction);
} // Else, will be done in set_cur_input once assigned to an output.
}
if (cur_range.start_instruction < cur_reader_instr) {
// We do not support skipping without skipping over the pre-read: we would
// need to extract from the queue.
return sched_type_t::STATUS_INVALID;
}
return skip_instructions(output, input,
cur_range.start_instruction - cur_reader_instr - 1);
return skip_instructions(input, cur_range.start_instruction - cur_reader_instr - 1);
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::record_schedule_skip(output_ordinal_t output,
input_ordinal_t input,
uint64_t start_instruction,
uint64_t stop_instruction)
{
if (options_.schedule_record_ostream == nullptr)
return sched_type_t::STATUS_INVALID;
sched_type_t::stream_status_t status;
// Close any prior default record for this input. If we switched inputs,
// we'll already have closed the prior in set_cur_input().
if (outputs_[output].record.back().type == schedule_record_t::DEFAULT &&
outputs_[output].record.back().key.input == input) {
status = close_schedule_segment(output, inputs_[input]);
if (status != sched_type_t::STATUS_OK)
return status;
}
if (outputs_[output].record.size() == 1) {
// Replay doesn't handle starting out with a skip record: we need a
// start=0,stop=0 dummy entry to get things rolling at the start of
// an output's records, if we're the first record after the version.
assert(outputs_[output].record.back().type == schedule_record_t::VERSION);
status = record_schedule_segment(output, schedule_record_t::DEFAULT, input, 0, 0);
if (status != sched_type_t::STATUS_OK)
return status;
}
status = record_schedule_segment(output, schedule_record_t::SKIP, input,
start_instruction, stop_instruction);
if (status != sched_type_t::STATUS_OK)
return status;
status = record_schedule_segment(output, schedule_record_t::DEFAULT, input,
stop_instruction);
if (status != sched_type_t::STATUS_OK)
return status;
return sched_type_t::STATUS_OK;
}

template <typename RecordType, typename ReaderType>
Expand All @@ -2124,8 +2177,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::clear_input_queue(input_info_t &input)

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::skip_instructions(output_ordinal_t output,
input_info_t &input,
scheduler_tmpl_t<RecordType, ReaderType>::skip_instructions(input_info_t &input,
uint64_t skip_amount)
{
// reader_t::at_eof_ is true until init() is called.
Expand Down Expand Up @@ -2161,28 +2213,11 @@ scheduler_tmpl_t<RecordType, ReaderType>::skip_instructions(output_ordinal_t out
}
}
input.in_cur_region = true;
auto *stream = outputs_[output].stream;

// We've documented that an output stream's ordinals ignore skips in its input
// streams, so we do not need to remember the input's ordinals pre-skip and increase
// our output's ordinals commensurately post-skip.

// If we skipped from the start we may not have seen the initial headers:
// use the input's cached copies.
// We set the version and filetype up front for outputs with
// an initial input, so we check a different field to detect a
// skip.
if (stream->cache_line_size_ == 0 ||
// Check the version too as a fallback for inputs with no cache size.
stream->version_ == 0) {
stream->version_ = input.reader->get_version();
stream->last_timestamp_ = input.reader->get_last_timestamp();
stream->first_timestamp_ = input.reader->get_first_timestamp();
stream->filetype_ = input.reader->get_filetype();
stream->cache_line_size_ = input.reader->get_cache_line_size();
stream->chunk_instr_count_ = input.reader->get_chunk_instr_count();
stream->page_size_ = input.reader->get_page_size();
}
// We let the user know we've skipped. There's no discontinuity for the
// first one so we do not insert a marker there (if we do want to insert one,
// we need to update the view tool to handle a window marker as the very
Expand Down Expand Up @@ -2229,6 +2264,10 @@ scheduler_tmpl_t<RecordType, ReaderType>::record_schedule_segment(
// idle records quickly balloon the file.
return sched_type_t::STATUS_OK;
}
VPRINT(this, 4,
"recording out=%d type=%d input=%d start=%" PRIu64 " stop=%" PRIu64
" time=%" PRIu64 "\n",
output, type, input, start_instruction, stop_instruction, timestamp);
outputs_[output].record.emplace_back(type, input, start_instruction, stop_instruction,
timestamp);
// The stop is typically updated later in close_schedule_segment().
Expand Down Expand Up @@ -2275,9 +2314,11 @@ scheduler_tmpl_t<RecordType, ReaderType>::close_schedule_segment(output_ordinal_
input.index);
++instr_ord;
}
VPRINT(
this, 3, "close_schedule_segment: input=%d start=%" PRId64 " stop=%" PRId64 "\n",
input.index, outputs_[output].record.back().value.start_instruction, instr_ord);
VPRINT(this, 3,
"close_schedule_segment: input=%d type=%d start=%" PRIu64 " stop=%" PRIu64
"\n",
input.index, outputs_[output].record.back().type,
outputs_[output].record.back().value.start_instruction, instr_ord);
// Check for empty default entries, except the starter 0,0 ones.
assert(outputs_[output].record.back().type != schedule_record_t::DEFAULT ||
outputs_[output].record.back().value.start_instruction < instr_ord ||
Expand Down Expand Up @@ -2500,11 +2541,17 @@ scheduler_tmpl_t<RecordType, ReaderType>::set_cur_input(output_ordinal_t output,

std::lock_guard<std::mutex> lock(*inputs_[input].lock);

if (prev_input < 0 && outputs_[output].stream->filetype_ == 0) {
if (prev_input < 0 && outputs_[output].stream->version_ == 0) {
// Set the version and filetype up front, to let the user query at init time
// as documented.
outputs_[output].stream->version_ = inputs_[input].reader->get_version();
outputs_[output].stream->filetype_ = inputs_[input].reader->get_filetype();
// as documented. Also set the other fields in case we did a skip for ROI.
auto *stream = outputs_[output].stream;
stream->version_ = inputs_[input].reader->get_version();
stream->last_timestamp_ = inputs_[input].reader->get_last_timestamp();
stream->first_timestamp_ = inputs_[input].reader->get_first_timestamp();
stream->filetype_ = inputs_[input].reader->get_filetype();
stream->cache_line_size_ = inputs_[input].reader->get_cache_line_size();
stream->chunk_instr_count_ = inputs_[input].reader->get_chunk_instr_count();
stream->page_size_ = inputs_[input].reader->get_page_size();
}

if (inputs_[input].pid != INVALID_PID) {
Expand Down Expand Up @@ -2541,14 +2588,26 @@ scheduler_tmpl_t<RecordType, ReaderType>::set_cur_input(output_ordinal_t output,
}

inputs_[input].prev_time_in_quantum = outputs_[output].cur_time;

if (options_.schedule_record_ostream != nullptr) {
uint64_t instr_ord = get_instr_ordinal(inputs_[input]);
VPRINT(this, 3, "set_cur_input: recording input=%d start=%" PRId64 "\n", input,
instr_ord);
sched_type_t::stream_status_t status =
record_schedule_segment(output, schedule_record_t::DEFAULT, input, instr_ord);
if (status != sched_type_t::STATUS_OK)
return status;
if (!inputs_[input].regions_of_interest.empty() &&
inputs_[input].cur_region == 0 && inputs_[input].in_cur_region &&
(instr_ord == inputs_[input].regions_of_interest[0].start_instruction ||
// The ord may be 1 less because we're still on the inserted timestamp.
instr_ord + 1 == inputs_[input].regions_of_interest[0].start_instruction)) {
// We skipped during init but didn't have an output for recording the skip:
// record it now.
record_schedule_skip(output, input, 0,
inputs_[input].regions_of_interest[0].start_instruction);
} else {
sched_type_t::stream_status_t status = record_schedule_segment(
output, schedule_record_t::DEFAULT, input, instr_ord);
if (status != sched_type_t::STATUS_OK)
return status;
}
}
return STATUS_OK;
}
Expand Down Expand Up @@ -2665,7 +2724,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
"next_record[%d]: skipping from %" PRId64 " to %" PRId64
" in %d for schedule\n",
output, cur_reader_instr, segment.stop_instruction, index);
auto status = skip_instructions(output, inputs_[index],
auto status = skip_instructions(inputs_[index],
segment.stop_instruction - cur_reader_instr -
1 /*exclusive*/);
// Increment the region to get window id markers with ordinals.
Expand Down
22 changes: 21 additions & 1 deletion clients/drcachesim/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
STATUS_ERROR_FILE_READ_FAILED, /**< Error: file read failed. */
STATUS_ERROR_NOT_IMPLEMENTED, /**< Error: not implemented. */
STATUS_ERROR_FILE_WRITE_FAILED, /**< Error: file write failed. */
STATUS_ERROR_RANGE_INVALID, /**< Error: region of interest invalid. */
};

/**
Expand Down Expand Up @@ -204,6 +205,12 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
{
}
/** Convenience constructor for common usage. */
input_thread_info_t(memref_tid_t tid, std::vector<range_t> regions)
: tids(1, tid)
, regions_of_interest(regions)
{
}
/** Convenience constructor for common usage. */
input_thread_info_t(memref_tid_t tid, int priority)
: tids(1, tid)
, priority(priority)
Expand Down Expand Up @@ -252,6 +259,13 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
* separation, but it is not inserted prior to the first range. A
* #dynamorio::drmemtrace::TRACE_TYPE_THREAD_EXIT record is inserted after the
* final range. These ranges must be non-overlapping and in increasing order.
*
* Be aware that selecting a subset of code can remove inter-input
* communication steps that could be required for forward progress.
* For example, if selected subsets include #TRACE_MARKER_TYPE_SYSCALL_UNSCHEDULE
* with no timeout but do not include a corresponding
* #TRACE_MARKER_TYPE_SYSCALL_SCHEDULE for wakeup, an input could remain
* unscheduled.
*/
std::vector<range_t> regions_of_interest;
};
Expand Down Expand Up @@ -1518,7 +1532,13 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
// Does a direct skip, unconditionally.
// The caller must hold the input.lock.
stream_status_t
skip_instructions(output_ordinal_t output, input_info_t &input, uint64_t skip_amount);
skip_instructions(input_info_t &input, uint64_t skip_amount);

// Records an input skip in the output's recorded schedule.
// The caller must hold the input.lock.
stream_status_t
record_schedule_skip(output_ordinal_t output, input_ordinal_t input,
uint64_t start_instruction, uint64_t stop_instruction);

scheduler_status_t
read_and_instantiate_traced_schedule();
Expand Down
Loading

0 comments on commit e099287

Please sign in to comment.