Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

i#6831 sched refactor: Add set_cur_input() hooks #7089

Merged
merged 2 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 154 additions & 7 deletions clients/drcachesim/scheduler/scheduler_dynamic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@
namespace dynamorio {
namespace drmemtrace {

template <typename RecordType, typename ReaderType>
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::~scheduler_dynamic_tmpl_t()
{
#ifndef NDEBUG
VPRINT(this, 1, "%-37s: %9" PRId64 "\n", "Unscheduled queue lock acquired",
unscheduled_priority_.lock->get_count_acquired());
VPRINT(this, 1, "%-37s: %9" PRId64 "\n", "Unscheduled queue lock contended",
unscheduled_priority_.lock->get_count_contended());
#endif
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::scheduler_status_t
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::set_initial_schedule(
Expand Down Expand Up @@ -112,7 +123,7 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::set_initial_schedule(
target = *input->binding.begin();
else
output = (output + 1) % outputs_.size();
this->add_to_ready_queue(target, input);
add_to_ready_queue(target, input);
}
stream_status_t status = rebalance_queues(0, {});
if (status != sched_type_t::STATUS_OK) {
Expand All @@ -124,7 +135,7 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::set_initial_schedule(
#ifndef NDEBUG
status =
#endif
this->pop_from_ready_queue(i, i, queue_next);
pop_from_ready_queue(i, i, queue_next);
assert(status == sched_type_t::STATUS_OK || status == sched_type_t::STATUS_IDLE);
if (queue_next == nullptr)
set_cur_input(i, sched_type_t::INVALID_INPUT_ORDINAL);
Expand All @@ -137,6 +148,33 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::set_initial_schedule(
return sched_type_t::STATUS_SUCCESS;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::swap_out_input(
output_ordinal_t output, input_ordinal_t input, int workload,
bool caller_holds_input_lock)
{
// Now that the caller has updated prev_info, add it to the ready queue (once on
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
// the queue others can see it and pop it off).
if (!inputs_[input].at_eof) {
// This is unsafe if the caller holds this input lock: we disallow it here
// and in the parent's set_cur_input().
assert(!caller_holds_input_lock);
add_to_ready_queue(output, &inputs_[input]);
}
// TODO i#7067: Track peak live core usage per workload here.
return sched_type_t::STATUS_OK;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::swap_in_input(output_ordinal_t output,
input_ordinal_t input)
{
// TODO i#7067: Track peak live core usage per workload here.
return sched_type_t::STATUS_OK;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::set_output_active(
Expand Down Expand Up @@ -180,6 +218,19 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::pick_next_input_for_mode(
output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index,
input_ordinal_t &index)
{
VDO(this, 1, {
static int64_t global_heartbeat;
// 10K is too frequent for simple analyzer runs: it is too noisy with
// the new core-sharded-by-default for new users using defaults.
// 50K is a reasonable compromise.
// XXX: Add a runtime option to tweak this.
static constexpr int64_t GLOBAL_HEARTBEAT_CADENCE = 50000;
// We are ok with races as the cadence is approximate.
if (++global_heartbeat % GLOBAL_HEARTBEAT_CADENCE == 0) {
print_queue_stats();
}
});

uint64_t cur_time = get_output_time(output);
uint64_t last_time = last_rebalance_time_.load(std::memory_order_acquire);
if (last_time == 0) {
Expand Down Expand Up @@ -337,7 +388,7 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::pick_next_input_for_mode(
// waiting.
set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL);
input_info_t *queue_next = nullptr;
stream_status_t status = this->pop_from_ready_queue(output, output, queue_next);
stream_status_t status = pop_from_ready_queue(output, output, queue_next);
if (status != sched_type_t::STATUS_OK) {
if (status == sched_type_t::STATUS_IDLE) {
outputs_[output].waiting = true;
Expand Down Expand Up @@ -643,7 +694,7 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::process_marker(
// for things on unsched q); once it's on the new queue we don't
// do anything further here so we're good to go.
target_lock.unlock();
this->add_to_ready_queue(resume_output, target);
add_to_ready_queue(resume_output, target);
target_lock.lock();
} else {
// We assume blocked_time is from _ARG_TIMEOUT and is not from
Expand Down Expand Up @@ -780,7 +831,7 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::rebalance_queues(
// We remove from the back to avoid penalizing the next-to-run entries
// at the front of the queue by putting them at the back of another
// queue.
status = this->pop_from_ready_queue_hold_locks(
status = pop_from_ready_queue_hold_locks(
i, sched_type_t::INVALID_OUTPUT_ORDINAL, queue_next,
/*from_back=*/true);
if (status == sched_type_t::STATUS_OK && queue_next != nullptr) {
Expand All @@ -805,7 +856,7 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::rebalance_queues(
input.binding.find(i) != input.binding.end()) {
VPRINT(this, 3, "Rebalance iteration %d: output %d taking input %d\n",
iteration, i, ordinal);
this->add_to_ready_queue_hold_locks(i, &input);
add_to_ready_queue_hold_locks(i, &input);
} else {
incompatible_inputs.push_back(ordinal);
}
Expand Down Expand Up @@ -912,6 +963,62 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::ready_queue_empty(
return outputs_[output].ready_queue.queue.empty();
}

template <typename RecordType, typename ReaderType>
void
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::add_to_unscheduled_queue(
input_info_t *input)
{
assert(input->lock->owned_by_cur_thread());
std::lock_guard<mutex_dbg_owned> unsched_lock(*unscheduled_priority_.lock);
assert(input->unscheduled &&
input->blocked_time == 0); // Else should be in regular queue.
VPRINT(this, 4, "add_to_unscheduled_queue (pre-size %zu): input %d priority %d\n",
unscheduled_priority_.queue.size(), input->index, input->priority);
input->queue_counter = ++unscheduled_priority_.fifo_counter;
unscheduled_priority_.queue.push(input);
input->prev_output = input->containing_output;
input->containing_output = sched_type_t::INVALID_INPUT_ORDINAL;
}

template <typename RecordType, typename ReaderType>
void
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::add_to_ready_queue_hold_locks(
output_ordinal_t output, input_info_t *input)
{
assert(input->lock->owned_by_cur_thread());
assert(!this->need_output_lock() ||
outputs_[output].ready_queue.lock->owned_by_cur_thread());
if (input->unscheduled && input->blocked_time == 0) {
// Ensure we get prev_output set for start-unscheduled so they won't
// all resume on output #0 but rather on the initial round-robin assignment.
input->containing_output = output;
add_to_unscheduled_queue(input);
return;
}
assert(input->binding.empty() || input->binding.find(output) != input->binding.end());
VPRINT(this, 4,
"add_to_ready_queue (pre-size %zu): input %d priority %d timestamp delta "
"%" PRIu64 " block time %" PRIu64 " start time %" PRIu64 "\n",
outputs_[output].ready_queue.queue.size(), input->index, input->priority,
input->reader->get_last_timestamp() - input->base_timestamp,
input->blocked_time, input->blocked_start_time);
if (input->blocked_time > 0)
++outputs_[output].ready_queue.num_blocked;
input->queue_counter = ++outputs_[output].ready_queue.fifo_counter;
outputs_[output].ready_queue.queue.push(input);
input->containing_output = output;
}

template <typename RecordType, typename ReaderType>
void
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::add_to_ready_queue(
output_ordinal_t output, input_info_t *input)
{
auto scoped_lock = acquire_scoped_output_lock_if_necessary(output);
std::lock_guard<mutex_dbg_owned> input_lock(*input->lock);
add_to_ready_queue_hold_locks(output, input);
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::pop_from_ready_queue_hold_locks(
Expand Down Expand Up @@ -1037,7 +1144,7 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::pop_from_ready_queue_hold_lock
// Re-add the blocked ones to the back.
for (input_info_t *save : blocked) {
std::lock_guard<mutex_dbg_owned> input_lock(*save->lock);
this->add_to_ready_queue_hold_locks(from_output, save);
add_to_ready_queue_hold_locks(from_output, save);
}
auto res_lock = (res == nullptr) ? std::unique_lock<mutex_dbg_owned>()
: std::unique_lock<mutex_dbg_owned>(*res->lock);
Expand Down Expand Up @@ -1098,6 +1205,46 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::pop_from_ready_queue(
return status;
}

template <typename RecordType, typename ReaderType>
void
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::print_queue_stats()
{
size_t unsched_size = 0;
{
std::lock_guard<mutex_dbg_owned> unsched_lock(*unscheduled_priority_.lock);
unsched_size = unscheduled_priority_.queue.size();
}
int live = this->live_input_count_.load(std::memory_order_acquire);
// Make our multi-line output more atomic.
std::ostringstream ostr;
ostr << "Queue snapshot: inputs: " << live - unsched_size << " schedulable, "
<< unsched_size << " unscheduled, " << inputs_.size() - live << " eof\n";
for (unsigned int i = 0; i < outputs_.size(); ++i) {
auto lock = acquire_scoped_output_lock_if_necessary(i);
uint64_t cur_time = get_output_time(i);
ostr << " out #" << i << " @" << cur_time << ": running #"
<< outputs_[i].cur_input << "; " << outputs_[i].ready_queue.queue.size()
<< " in queue; " << outputs_[i].ready_queue.num_blocked << " blocked\n";
std::set<input_info_t *> readd;
input_info_t *res = nullptr;
while (!outputs_[i].ready_queue.queue.empty()) {
res = outputs_[i].ready_queue.queue.top();
readd.insert(res);
outputs_[i].ready_queue.queue.pop();
std::lock_guard<mutex_dbg_owned> input_lock(*res->lock);
if (res->blocked_time > 0) {
ostr << " " << res->index << " still blocked for "
<< res->blocked_time - (cur_time - res->blocked_start_time) << "\n";
}
}
// Re-add the ones we skipped, but without changing their counters so we
// preserve the prior FIFO order.
for (input_info_t *add : readd)
outputs_[i].ready_queue.queue.push(add);
}
VPRINT(this, 0, "%s\n", ostr.str().c_str());
}

template class scheduler_dynamic_tmpl_t<memref_t, reader_t>;
template class scheduler_dynamic_tmpl_t<trace_entry_t,
dynamorio::drmemtrace::record_reader_t>;
Expand Down
17 changes: 17 additions & 0 deletions clients/drcachesim/scheduler/scheduler_fixed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,23 @@ scheduler_fixed_tmpl_t<RecordType, ReaderType>::set_initial_schedule(
return sched_type_t::STATUS_SUCCESS;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_fixed_tmpl_t<RecordType, ReaderType>::swap_out_input(
output_ordinal_t output, input_ordinal_t input, int workload,
bool caller_holds_input_lock)
{
return sched_type_t::STATUS_OK;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_fixed_tmpl_t<RecordType, ReaderType>::swap_in_input(output_ordinal_t output,
input_ordinal_t input)
{
return sched_type_t::STATUS_OK;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_fixed_tmpl_t<RecordType, ReaderType>::pick_next_input_for_mode(
Expand Down
Loading
Loading