Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

i#6831 sched refactor, step 7: Split eof_or_idle #7083

Merged
merged 7 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
new scheduler_replay_tmpl_t<RecordType, ReaderType>);
} else {
// Non-dynamic and non-replay fixed modes such as analyzer serial and
// parallel modes.
// parallel modes. Although serial does do interleaving by timestamp
// it is closer to parallel mode than the file-based replay modes.
impl_ = std::unique_ptr<scheduler_impl_tmpl_t<RecordType, ReaderType>,
scheduler_impl_deleter_t>(
new scheduler_fixed_tmpl_t<RecordType, ReaderType>);
Expand Down
838 changes: 782 additions & 56 deletions clients/drcachesim/scheduler/scheduler_dynamic.cpp

Large diffs are not rendered by default.

84 changes: 69 additions & 15 deletions clients/drcachesim/scheduler/scheduler_fixed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@
#include "scheduler.h"
#include "scheduler_impl.h"

#include <atomic>
#include <cinttypes>
#include <cstdint>
#include <mutex>
#include <thread>

#include "memref.h"
#include "mutex_dbg_owned.h"
Expand All @@ -50,45 +48,89 @@
namespace dynamorio {
namespace drmemtrace {

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::scheduler_status_t
scheduler_fixed_tmpl_t<RecordType, ReaderType>::set_initial_schedule(
std::unordered_map<int, std::vector<int>> &workload2inputs)
{
if (options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT) {
// Assign the inputs up front to avoid locks once we're in parallel mode.
// We use a simple round-robin static assignment for now.
for (int i = 0; i < static_cast<input_ordinal_t>(inputs_.size()); ++i) {
size_t index = i % outputs_.size();
if (outputs_[index].input_indices.empty())
set_cur_input(static_cast<input_ordinal_t>(index), i);
outputs_[index].input_indices.push_back(i);
VPRINT(this, 2, "Assigning input #%d to output #%zd\n", i, index);
}
} else if (options_.mapping == sched_type_t::MAP_TO_RECORDED_OUTPUT) {
if (options_.replay_as_traced_istream != nullptr) {
return sched_type_t::STATUS_ERROR_INVALID_PARAMETER;
} else if (outputs_.size() > 1) {
return sched_type_t::STATUS_ERROR_INVALID_PARAMETER;
} else if (inputs_.size() == 1) {
set_cur_input(0, 0);
} else {
// The old file_reader_t interleaving would output the top headers for every
// thread first and then pick the oldest timestamp once it reached a
// timestamp. We instead queue those headers so we can start directly with the
// oldest timestamp's thread.
uint64_t min_time = std::numeric_limits<uint64_t>::max();
input_ordinal_t min_input = -1;
for (int i = 0; i < static_cast<input_ordinal_t>(inputs_.size()); ++i) {
if (inputs_[i].next_timestamp < min_time) {
min_time = inputs_[i].next_timestamp;
min_input = i;
}
}
if (min_input < 0)
return sched_type_t::STATUS_ERROR_INVALID_PARAMETER;
set_cur_input(0, static_cast<input_ordinal_t>(min_input));
}
} else {
return sched_type_t::STATUS_ERROR_INVALID_PARAMETER;
}
return sched_type_t::STATUS_SUCCESS;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_fixed_tmpl_t<RecordType, ReaderType>::pick_next_input_for_mode(
output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index,
input_ordinal_t &index)
{
if (this->options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) {
if (options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) {
uint64_t min_time = std::numeric_limits<uint64_t>::max();
for (size_t i = 0; i < this->inputs_.size(); ++i) {
std::lock_guard<mutex_dbg_owned> lock(*this->inputs_[i].lock);
if (!this->inputs_[i].at_eof && this->inputs_[i].next_timestamp > 0 &&
this->inputs_[i].next_timestamp < min_time) {
min_time = this->inputs_[i].next_timestamp;
for (size_t i = 0; i < inputs_.size(); ++i) {
std::lock_guard<mutex_dbg_owned> lock(*inputs_[i].lock);
if (!inputs_[i].at_eof && inputs_[i].next_timestamp > 0 &&
inputs_[i].next_timestamp < min_time) {
min_time = inputs_[i].next_timestamp;
index = static_cast<int>(i);
}
}
if (index < 0) {
stream_status_t status = this->eof_or_idle(output, prev_index);
if (status != sched_type_t::STATUS_STOLE)
return status;
index = this->outputs_[output].cur_input;
index = outputs_[output].cur_input;
return sched_type_t::STATUS_OK;
}
VPRINT(this, 2,
"next_record[%d]: advancing to timestamp %" PRIu64 " == input #%d\n",
output, min_time, index);
} else if (this->options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT) {
} else if (options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT) {
// We're done with the prior thread; take the next one that was
// pre-allocated to this output (pre-allocated to avoid locks). Invariant:
// the same output will not be accessed by two different threads
// simultaneously in this mode, allowing us to support a lock-free
// parallel-friendly increment here.
int indices_index = ++this->outputs_[output].input_indices_index;
if (indices_index >=
static_cast<int>(this->outputs_[output].input_indices.size())) {
int indices_index = ++outputs_[output].input_indices_index;
if (indices_index >= static_cast<int>(outputs_[output].input_indices.size())) {
VPRINT(this, 2, "next_record[%d]: all at eof\n", output);
return sched_type_t::STATUS_EOF;
}
index = this->outputs_[output].input_indices[indices_index];
index = outputs_[output].input_indices[indices_index];
VPRINT(this, 2, "next_record[%d]: advancing to local index %d == input #%d\n",
output, indices_index, index);
} else
Expand All @@ -103,12 +145,24 @@ scheduler_fixed_tmpl_t<RecordType, ReaderType>::check_for_input_switch(
output_ordinal_t output, RecordType &record, input_info_t *input, uint64_t cur_time,
bool &need_new_input, bool &preempt, uint64_t &blocked_time)
{
if (this->options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS &&
if (options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS &&
this->record_type_is_timestamp(record, input->next_timestamp))
need_new_input = true;
return sched_type_t::STATUS_OK;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_fixed_tmpl_t<RecordType, ReaderType>::eof_or_idle_for_mode(
output_ordinal_t output, input_ordinal_t prev_input)
{
if (options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT ||
this->live_input_count_.load(std::memory_order_acquire) == 0) {
return sched_type_t::STATUS_EOF;
}
return sched_type_t::STATUS_IDLE;
}

template class scheduler_fixed_tmpl_t<memref_t, reader_t>;
template class scheduler_fixed_tmpl_t<trace_entry_t,
dynamorio::drmemtrace::record_reader_t>;
Expand Down
Loading
Loading