Skip to content

Commit

Permalink
i#6831 sched refactor: Add set_cur_input() hooks
Browse files Browse the repository at this point in the history
Adds swap_in_input() and swap_out_input() subclass hooks inside
set_cur_input().  This allows moving ready queue functions into the
dynamic subclass, and will be used there for future live workload
tracking for #7067.

Moves add_to_ready_queue(), add_to_ready_queue_hold_locks(),
add_to_unscheduled_queue(), and print_queue_stats() into the dynamic
subclass.

Issue: #7067, #6831
  • Loading branch information
derekbruening committed Nov 19, 2024
1 parent b158d1f commit e300c97
Show file tree
Hide file tree
Showing 6 changed files with 292 additions and 169 deletions.
161 changes: 154 additions & 7 deletions clients/drcachesim/scheduler/scheduler_dynamic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@
namespace dynamorio {
namespace drmemtrace {

template <typename RecordType, typename ReaderType>
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::~scheduler_dynamic_tmpl_t()
{
#ifndef NDEBUG
VPRINT(this, 1, "%-37s: %9" PRId64 "\n", "Unscheduled queue lock acquired",
unscheduled_priority_.lock->get_count_acquired());
VPRINT(this, 1, "%-37s: %9" PRId64 "\n", "Unscheduled queue lock contended",
unscheduled_priority_.lock->get_count_contended());
#endif
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::scheduler_status_t
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::set_initial_schedule(
Expand Down Expand Up @@ -112,7 +123,7 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::set_initial_schedule(
target = *input->binding.begin();
else
output = (output + 1) % outputs_.size();
this->add_to_ready_queue(target, input);
add_to_ready_queue(target, input);
}
stream_status_t status = rebalance_queues(0, {});
if (status != sched_type_t::STATUS_OK) {
Expand All @@ -124,7 +135,7 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::set_initial_schedule(
#ifndef NDEBUG
status =
#endif
this->pop_from_ready_queue(i, i, queue_next);
pop_from_ready_queue(i, i, queue_next);
assert(status == sched_type_t::STATUS_OK || status == sched_type_t::STATUS_IDLE);
if (queue_next == nullptr)
set_cur_input(i, sched_type_t::INVALID_INPUT_ORDINAL);
Expand All @@ -137,6 +148,33 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::set_initial_schedule(
return sched_type_t::STATUS_SUCCESS;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::swap_out_input(
output_ordinal_t output, input_ordinal_t input, int workload,
bool caller_holds_input_lock)
{
// Now that the caller has updated prev_info, add it to the ready queue (once on
// the queue others can see it and pop it off).
if (!inputs_[input].at_eof) {
// This is unsafe if the caller holds this input lock: we disallow it here
// and in the parent's set_cur_input().
assert(!caller_holds_input_lock);
add_to_ready_queue(output, &inputs_[input]);
}
// TODO i#7067: Track peak live core usage per workload here.
return sched_type_t::STATUS_OK;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::swap_in_input(output_ordinal_t output,
input_ordinal_t input)
{
// TODO i#7067: Track peak live core usage per workload here.
return sched_type_t::STATUS_OK;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::set_output_active(
Expand Down Expand Up @@ -180,6 +218,19 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::pick_next_input_for_mode(
output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index,
input_ordinal_t &index)
{
VDO(this, 1, {
static int64_t global_heartbeat;
// 10K is too frequent for simple analyzer runs: it is too noisy with
// the new core-sharded-by-default for new users using defaults.
// 50K is a reasonable compromise.
// XXX: Add a runtime option to tweak this.
static constexpr int64_t GLOBAL_HEARTBEAT_CADENCE = 50000;
// We are ok with races as the cadence is approximate.
if (++global_heartbeat % GLOBAL_HEARTBEAT_CADENCE == 0) {
print_queue_stats();
}
});

uint64_t cur_time = get_output_time(output);
uint64_t last_time = last_rebalance_time_.load(std::memory_order_acquire);
if (last_time == 0) {
Expand Down Expand Up @@ -337,7 +388,7 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::pick_next_input_for_mode(
// waiting.
set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL);
input_info_t *queue_next = nullptr;
stream_status_t status = this->pop_from_ready_queue(output, output, queue_next);
stream_status_t status = pop_from_ready_queue(output, output, queue_next);
if (status != sched_type_t::STATUS_OK) {
if (status == sched_type_t::STATUS_IDLE) {
outputs_[output].waiting = true;
Expand Down Expand Up @@ -643,7 +694,7 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::process_marker(
// for things on unsched q); once it's on the new queue we don't
// do anything further here so we're good to go.
target_lock.unlock();
this->add_to_ready_queue(resume_output, target);
add_to_ready_queue(resume_output, target);
target_lock.lock();
} else {
// We assume blocked_time is from _ARG_TIMEOUT and is not from
Expand Down Expand Up @@ -780,7 +831,7 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::rebalance_queues(
// We remove from the back to avoid penalizing the next-to-run entries
// at the front of the queue by putting them at the back of another
// queue.
status = this->pop_from_ready_queue_hold_locks(
status = pop_from_ready_queue_hold_locks(
i, sched_type_t::INVALID_OUTPUT_ORDINAL, queue_next,
/*from_back=*/true);
if (status == sched_type_t::STATUS_OK && queue_next != nullptr) {
Expand All @@ -805,7 +856,7 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::rebalance_queues(
input.binding.find(i) != input.binding.end()) {
VPRINT(this, 3, "Rebalance iteration %d: output %d taking input %d\n",
iteration, i, ordinal);
this->add_to_ready_queue_hold_locks(i, &input);
add_to_ready_queue_hold_locks(i, &input);
} else {
incompatible_inputs.push_back(ordinal);
}
Expand Down Expand Up @@ -912,6 +963,62 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::ready_queue_empty(
return outputs_[output].ready_queue.queue.empty();
}

template <typename RecordType, typename ReaderType>
void
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::add_to_unscheduled_queue(
input_info_t *input)
{
assert(input->lock->owned_by_cur_thread());
std::lock_guard<mutex_dbg_owned> unsched_lock(*unscheduled_priority_.lock);
assert(input->unscheduled &&
input->blocked_time == 0); // Else should be in regular queue.
VPRINT(this, 4, "add_to_unscheduled_queue (pre-size %zu): input %d priority %d\n",
unscheduled_priority_.queue.size(), input->index, input->priority);
input->queue_counter = ++unscheduled_priority_.fifo_counter;
unscheduled_priority_.queue.push(input);
input->prev_output = input->containing_output;
input->containing_output = sched_type_t::INVALID_INPUT_ORDINAL;
}

template <typename RecordType, typename ReaderType>
void
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::add_to_ready_queue_hold_locks(
output_ordinal_t output, input_info_t *input)
{
assert(input->lock->owned_by_cur_thread());
assert(!this->need_output_lock() ||
outputs_[output].ready_queue.lock->owned_by_cur_thread());
if (input->unscheduled && input->blocked_time == 0) {
// Ensure we get prev_output set for start-unscheduled so they won't
// all resume on output #0 but rather on the initial round-robin assignment.
input->containing_output = output;
add_to_unscheduled_queue(input);
return;
}
assert(input->binding.empty() || input->binding.find(output) != input->binding.end());
VPRINT(this, 4,
"add_to_ready_queue (pre-size %zu): input %d priority %d timestamp delta "
"%" PRIu64 " block time %" PRIu64 " start time %" PRIu64 "\n",
outputs_[output].ready_queue.queue.size(), input->index, input->priority,
input->reader->get_last_timestamp() - input->base_timestamp,
input->blocked_time, input->blocked_start_time);
if (input->blocked_time > 0)
++outputs_[output].ready_queue.num_blocked;
input->queue_counter = ++outputs_[output].ready_queue.fifo_counter;
outputs_[output].ready_queue.queue.push(input);
input->containing_output = output;
}

template <typename RecordType, typename ReaderType>
void
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::add_to_ready_queue(
output_ordinal_t output, input_info_t *input)
{
auto scoped_lock = acquire_scoped_output_lock_if_necessary(output);
std::lock_guard<mutex_dbg_owned> input_lock(*input->lock);
add_to_ready_queue_hold_locks(output, input);
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::pop_from_ready_queue_hold_locks(
Expand Down Expand Up @@ -1037,7 +1144,7 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::pop_from_ready_queue_hold_lock
// Re-add the blocked ones to the back.
for (input_info_t *save : blocked) {
std::lock_guard<mutex_dbg_owned> input_lock(*save->lock);
this->add_to_ready_queue_hold_locks(from_output, save);
add_to_ready_queue_hold_locks(from_output, save);
}
auto res_lock = (res == nullptr) ? std::unique_lock<mutex_dbg_owned>()
: std::unique_lock<mutex_dbg_owned>(*res->lock);
Expand Down Expand Up @@ -1098,6 +1205,46 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::pop_from_ready_queue(
return status;
}

template <typename RecordType, typename ReaderType>
void
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::print_queue_stats()
{
size_t unsched_size = 0;
{
std::lock_guard<mutex_dbg_owned> unsched_lock(*unscheduled_priority_.lock);
unsched_size = unscheduled_priority_.queue.size();
}
int live = this->live_input_count_.load(std::memory_order_acquire);
// Make our multi-line output more atomic.
std::ostringstream ostr;
ostr << "Queue snapshot: inputs: " << live - unsched_size << " schedulable, "
<< unsched_size << " unscheduled, " << inputs_.size() - live << " eof\n";
for (unsigned int i = 0; i < outputs_.size(); ++i) {
auto lock = acquire_scoped_output_lock_if_necessary(i);
uint64_t cur_time = get_output_time(i);
ostr << " out #" << i << " @" << cur_time << ": running #"
<< outputs_[i].cur_input << "; " << outputs_[i].ready_queue.queue.size()
<< " in queue; " << outputs_[i].ready_queue.num_blocked << " blocked\n";
std::set<input_info_t *> readd;
input_info_t *res = nullptr;
while (!outputs_[i].ready_queue.queue.empty()) {
res = outputs_[i].ready_queue.queue.top();
readd.insert(res);
outputs_[i].ready_queue.queue.pop();
std::lock_guard<mutex_dbg_owned> input_lock(*res->lock);
if (res->blocked_time > 0) {
ostr << " " << res->index << " still blocked for "
<< res->blocked_time - (cur_time - res->blocked_start_time) << "\n";
}
}
// Re-add the ones we skipped, but without changing their counters so we
// preserve the prior FIFO order.
for (input_info_t *add : readd)
outputs_[i].ready_queue.queue.push(add);
}
VPRINT(this, 0, "%s\n", ostr.str().c_str());
}

template class scheduler_dynamic_tmpl_t<memref_t, reader_t>;
template class scheduler_dynamic_tmpl_t<trace_entry_t,
dynamorio::drmemtrace::record_reader_t>;
Expand Down
17 changes: 17 additions & 0 deletions clients/drcachesim/scheduler/scheduler_fixed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,23 @@ scheduler_fixed_tmpl_t<RecordType, ReaderType>::set_initial_schedule(
return sched_type_t::STATUS_SUCCESS;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_fixed_tmpl_t<RecordType, ReaderType>::swap_out_input(
output_ordinal_t output, input_ordinal_t input, int workload,
bool caller_holds_input_lock)
{
return sched_type_t::STATUS_OK;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_fixed_tmpl_t<RecordType, ReaderType>::swap_in_input(output_ordinal_t output,
input_ordinal_t input)
{
return sched_type_t::STATUS_OK;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_fixed_tmpl_t<RecordType, ReaderType>::pick_next_input_for_mode(
Expand Down
Loading

0 comments on commit e300c97

Please sign in to comment.