From e300c974d49a80fa8e2e86cf36c069538f4499e5 Mon Sep 17 00:00:00 2001 From: Derek Bruening Date: Thu, 14 Nov 2024 13:45:56 -0500 Subject: [PATCH] i#6831 sched refactor: Add set_cur_input() hooks Adds swap_in_input() and swap_out_input() subclass hooks inside set_cur_input(). This allows moving ready queue functions into the dynamic subclass, and will be used there for future live workload tracking for #7067. Moves add_to_ready_queue(), add_to_ready_queue_hold_locks(), add_to_unscheduled_queue(), and print_queue_stats() into the dynamic subclass. Issue: #7067, #6831 --- .../scheduler/scheduler_dynamic.cpp | 161 ++++++++++++++++- .../drcachesim/scheduler/scheduler_fixed.cpp | 17 ++ .../drcachesim/scheduler/scheduler_impl.cpp | 165 +++--------------- clients/drcachesim/scheduler/scheduler_impl.h | 90 +++++++--- .../drcachesim/scheduler/scheduler_replay.cpp | 17 ++ .../drcachesim/tests/scheduler_unit_tests.cpp | 11 ++ 6 files changed, 292 insertions(+), 169 deletions(-) diff --git a/clients/drcachesim/scheduler/scheduler_dynamic.cpp b/clients/drcachesim/scheduler/scheduler_dynamic.cpp index b2a63a6ed90..79e5d87b990 100644 --- a/clients/drcachesim/scheduler/scheduler_dynamic.cpp +++ b/clients/drcachesim/scheduler/scheduler_dynamic.cpp @@ -57,6 +57,17 @@ namespace dynamorio { namespace drmemtrace { +template +scheduler_dynamic_tmpl_t::~scheduler_dynamic_tmpl_t() +{ +#ifndef NDEBUG + VPRINT(this, 1, "%-37s: %9" PRId64 "\n", "Unscheduled queue lock acquired", + unscheduled_priority_.lock->get_count_acquired()); + VPRINT(this, 1, "%-37s: %9" PRId64 "\n", "Unscheduled queue lock contended", + unscheduled_priority_.lock->get_count_contended()); +#endif +} + template typename scheduler_tmpl_t::scheduler_status_t scheduler_dynamic_tmpl_t::set_initial_schedule( @@ -112,7 +123,7 @@ scheduler_dynamic_tmpl_t::set_initial_schedule( target = *input->binding.begin(); else output = (output + 1) % outputs_.size(); - this->add_to_ready_queue(target, input); + add_to_ready_queue(target, input); } stream_status_t status = rebalance_queues(0, {}); if (status != sched_type_t::STATUS_OK) { @@ -124,7 +135,7 @@ scheduler_dynamic_tmpl_t::set_initial_schedule( #ifndef NDEBUG status = #endif - this->pop_from_ready_queue(i, i, queue_next); + pop_from_ready_queue(i, i, queue_next); assert(status == sched_type_t::STATUS_OK || status == sched_type_t::STATUS_IDLE); if (queue_next == nullptr) set_cur_input(i, sched_type_t::INVALID_INPUT_ORDINAL); @@ -137,6 +148,33 @@ scheduler_dynamic_tmpl_t::set_initial_schedule( return sched_type_t::STATUS_SUCCESS; } +template +typename scheduler_tmpl_t::stream_status_t +scheduler_dynamic_tmpl_t::swap_out_input( + output_ordinal_t output, input_ordinal_t input, int workload, + bool caller_holds_input_lock) +{ + // Now that the caller has updated prev_info, add it to the ready queue (once on + // the queue others can see it and pop it off). + if (!inputs_[input].at_eof) { + // This is unsafe if the caller holds this input lock: we disallow it here + // and in the parent's set_cur_input(). + assert(!caller_holds_input_lock); + add_to_ready_queue(output, &inputs_[input]); + } + // TODO i#7067: Track peak live core usage per workload here. + return sched_type_t::STATUS_OK; +} + +template +typename scheduler_tmpl_t::stream_status_t +scheduler_dynamic_tmpl_t::swap_in_input(output_ordinal_t output, + input_ordinal_t input) +{ + // TODO i#7067: Track peak live core usage per workload here. + return sched_type_t::STATUS_OK; +} + template typename scheduler_tmpl_t::stream_status_t scheduler_dynamic_tmpl_t::set_output_active( @@ -180,6 +218,19 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index, input_ordinal_t &index) { + VDO(this, 1, { + static int64_t global_heartbeat; + // 10K is too frequent for simple analyzer runs: it is too noisy with + // the new core-sharded-by-default for new users using defaults. + // 50K is a reasonable compromise. + // XXX: Add a runtime option to tweak this. + static constexpr int64_t GLOBAL_HEARTBEAT_CADENCE = 50000; + // We are ok with races as the cadence is approximate. + if (++global_heartbeat % GLOBAL_HEARTBEAT_CADENCE == 0) { + print_queue_stats(); + } + }); + uint64_t cur_time = get_output_time(output); uint64_t last_time = last_rebalance_time_.load(std::memory_order_acquire); if (last_time == 0) { @@ -337,7 +388,7 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( // waiting. set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL); input_info_t *queue_next = nullptr; - stream_status_t status = this->pop_from_ready_queue(output, output, queue_next); + stream_status_t status = pop_from_ready_queue(output, output, queue_next); if (status != sched_type_t::STATUS_OK) { if (status == sched_type_t::STATUS_IDLE) { outputs_[output].waiting = true; @@ -643,7 +694,7 @@ scheduler_dynamic_tmpl_t::process_marker( // for things on unsched q); once it's on the new queue we don't // do anything further here so we're good to go. target_lock.unlock(); - this->add_to_ready_queue(resume_output, target); + add_to_ready_queue(resume_output, target); target_lock.lock(); } else { // We assume blocked_time is from _ARG_TIMEOUT and is not from @@ -780,7 +831,7 @@ scheduler_dynamic_tmpl_t::rebalance_queues( // We remove from the back to avoid penalizing the next-to-run entries // at the front of the queue by putting them at the back of another // queue. - status = this->pop_from_ready_queue_hold_locks( + status = pop_from_ready_queue_hold_locks( i, sched_type_t::INVALID_OUTPUT_ORDINAL, queue_next, /*from_back=*/true); if (status == sched_type_t::STATUS_OK && queue_next != nullptr) { @@ -805,7 +856,7 @@ scheduler_dynamic_tmpl_t::rebalance_queues( input.binding.find(i) != input.binding.end()) { VPRINT(this, 3, "Rebalance iteration %d: output %d taking input %d\n", iteration, i, ordinal); - this->add_to_ready_queue_hold_locks(i, &input); + add_to_ready_queue_hold_locks(i, &input); } else { incompatible_inputs.push_back(ordinal); } @@ -912,6 +963,62 @@ scheduler_dynamic_tmpl_t::ready_queue_empty( return outputs_[output].ready_queue.queue.empty(); } +template +void +scheduler_dynamic_tmpl_t::add_to_unscheduled_queue( + input_info_t *input) +{ + assert(input->lock->owned_by_cur_thread()); + std::lock_guard unsched_lock(*unscheduled_priority_.lock); + assert(input->unscheduled && + input->blocked_time == 0); // Else should be in regular queue. + VPRINT(this, 4, "add_to_unscheduled_queue (pre-size %zu): input %d priority %d\n", + unscheduled_priority_.queue.size(), input->index, input->priority); + input->queue_counter = ++unscheduled_priority_.fifo_counter; + unscheduled_priority_.queue.push(input); + input->prev_output = input->containing_output; + input->containing_output = sched_type_t::INVALID_INPUT_ORDINAL; +} + +template +void +scheduler_dynamic_tmpl_t::add_to_ready_queue_hold_locks( + output_ordinal_t output, input_info_t *input) +{ + assert(input->lock->owned_by_cur_thread()); + assert(!this->need_output_lock() || + outputs_[output].ready_queue.lock->owned_by_cur_thread()); + if (input->unscheduled && input->blocked_time == 0) { + // Ensure we get prev_output set for start-unscheduled so they won't + // all resume on output #0 but rather on the initial round-robin assignment. + input->containing_output = output; + add_to_unscheduled_queue(input); + return; + } + assert(input->binding.empty() || input->binding.find(output) != input->binding.end()); + VPRINT(this, 4, + "add_to_ready_queue (pre-size %zu): input %d priority %d timestamp delta " + "%" PRIu64 " block time %" PRIu64 " start time %" PRIu64 "\n", + outputs_[output].ready_queue.queue.size(), input->index, input->priority, + input->reader->get_last_timestamp() - input->base_timestamp, + input->blocked_time, input->blocked_start_time); + if (input->blocked_time > 0) + ++outputs_[output].ready_queue.num_blocked; + input->queue_counter = ++outputs_[output].ready_queue.fifo_counter; + outputs_[output].ready_queue.queue.push(input); + input->containing_output = output; +} + +template +void +scheduler_dynamic_tmpl_t::add_to_ready_queue( + output_ordinal_t output, input_info_t *input) +{ + auto scoped_lock = acquire_scoped_output_lock_if_necessary(output); + std::lock_guard input_lock(*input->lock); + add_to_ready_queue_hold_locks(output, input); +} + template typename scheduler_tmpl_t::stream_status_t scheduler_dynamic_tmpl_t::pop_from_ready_queue_hold_locks( @@ -1037,7 +1144,7 @@ scheduler_dynamic_tmpl_t::pop_from_ready_queue_hold_lock // Re-add the blocked ones to the back. for (input_info_t *save : blocked) { std::lock_guard input_lock(*save->lock); - this->add_to_ready_queue_hold_locks(from_output, save); + add_to_ready_queue_hold_locks(from_output, save); } auto res_lock = (res == nullptr) ? std::unique_lock() : std::unique_lock(*res->lock); @@ -1098,6 +1205,46 @@ scheduler_dynamic_tmpl_t::pop_from_ready_queue( return status; } +template +void +scheduler_dynamic_tmpl_t::print_queue_stats() +{ + size_t unsched_size = 0; + { + std::lock_guard unsched_lock(*unscheduled_priority_.lock); + unsched_size = unscheduled_priority_.queue.size(); + } + int live = this->live_input_count_.load(std::memory_order_acquire); + // Make our multi-line output more atomic. + std::ostringstream ostr; + ostr << "Queue snapshot: inputs: " << live - unsched_size << " schedulable, " + << unsched_size << " unscheduled, " << inputs_.size() - live << " eof\n"; + for (unsigned int i = 0; i < outputs_.size(); ++i) { + auto lock = acquire_scoped_output_lock_if_necessary(i); + uint64_t cur_time = get_output_time(i); + ostr << " out #" << i << " @" << cur_time << ": running #" + << outputs_[i].cur_input << "; " << outputs_[i].ready_queue.queue.size() + << " in queue; " << outputs_[i].ready_queue.num_blocked << " blocked\n"; + std::set readd; + input_info_t *res = nullptr; + while (!outputs_[i].ready_queue.queue.empty()) { + res = outputs_[i].ready_queue.queue.top(); + readd.insert(res); + outputs_[i].ready_queue.queue.pop(); + std::lock_guard input_lock(*res->lock); + if (res->blocked_time > 0) { + ostr << " " << res->index << " still blocked for " + << res->blocked_time - (cur_time - res->blocked_start_time) << "\n"; + } + } + // Re-add the ones we skipped, but without changing their counters so we + // preserve the prior FIFO order. + for (input_info_t *add : readd) + outputs_[i].ready_queue.queue.push(add); + } + VPRINT(this, 0, "%s\n", ostr.str().c_str()); +} + template class scheduler_dynamic_tmpl_t; template class scheduler_dynamic_tmpl_t; diff --git a/clients/drcachesim/scheduler/scheduler_fixed.cpp b/clients/drcachesim/scheduler/scheduler_fixed.cpp index c8f4a53b222..9075c2b5dc6 100644 --- a/clients/drcachesim/scheduler/scheduler_fixed.cpp +++ b/clients/drcachesim/scheduler/scheduler_fixed.cpp @@ -97,6 +97,23 @@ scheduler_fixed_tmpl_t::set_initial_schedule( return sched_type_t::STATUS_SUCCESS; } +template +typename scheduler_tmpl_t::stream_status_t +scheduler_fixed_tmpl_t::swap_out_input( + output_ordinal_t output, input_ordinal_t input, int workload, + bool caller_holds_input_lock) +{ + return sched_type_t::STATUS_OK; +} + +template +typename scheduler_tmpl_t::stream_status_t +scheduler_fixed_tmpl_t::swap_in_input(output_ordinal_t output, + input_ordinal_t input) +{ + return sched_type_t::STATUS_OK; +} + template typename scheduler_tmpl_t::stream_status_t scheduler_fixed_tmpl_t::pick_next_input_for_mode( diff --git a/clients/drcachesim/scheduler/scheduler_impl.cpp b/clients/drcachesim/scheduler/scheduler_impl.cpp index 7055ade2cea..d5329be4050 100644 --- a/clients/drcachesim/scheduler/scheduler_impl.cpp +++ b/clients/drcachesim/scheduler/scheduler_impl.cpp @@ -663,12 +663,6 @@ scheduler_impl_tmpl_t::~scheduler_impl_tmpl_t() outputs_[i].ready_queue.lock->get_count_contended()); #endif } -#ifndef NDEBUG - VPRINT(this, 1, "%-37s: %9" PRId64 "\n", "Unscheduled queue lock acquired", - unscheduled_priority_.lock->get_count_acquired()); - VPRINT(this, 1, "%-37s: %9" PRId64 "\n", "Unscheduled queue lock contended", - unscheduled_priority_.lock->get_count_contended()); -#endif } template @@ -2184,63 +2178,6 @@ scheduler_impl_tmpl_t::close_schedule_segment( return sched_type_t::STATUS_OK; } -template -void -scheduler_impl_tmpl_t::add_to_unscheduled_queue( - input_info_t *input) -{ - assert(input->lock->owned_by_cur_thread()); - std::lock_guard unsched_lock(*unscheduled_priority_.lock); - assert(input->unscheduled && - input->blocked_time == 0); // Else should be in regular queue. - VPRINT(this, 4, "add_to_unscheduled_queue (pre-size %zu): input %d priority %d\n", - unscheduled_priority_.queue.size(), input->index, input->priority); - input->queue_counter = ++unscheduled_priority_.fifo_counter; - unscheduled_priority_.queue.push(input); - input->prev_output = input->containing_output; - input->containing_output = sched_type_t::INVALID_INPUT_ORDINAL; -} - -template -void -scheduler_impl_tmpl_t::add_to_ready_queue_hold_locks( - output_ordinal_t output, input_info_t *input) -{ - assert(input->lock->owned_by_cur_thread()); - assert(!need_output_lock() || - outputs_[output].ready_queue.lock->owned_by_cur_thread()); - if (input->unscheduled && input->blocked_time == 0) { - // Ensure we get prev_output set for start-unscheduled so they won't - // all resume on output #0 but rather on the initial round-robin assignment. - input->containing_output = output; - add_to_unscheduled_queue(input); - return; - } - assert(input->binding.empty() || input->binding.find(output) != input->binding.end()); - VPRINT( - this, 4, - "add_to_ready_queue (pre-size %zu): input %d priority %d timestamp delta %" PRIu64 - " block time %" PRIu64 " start time %" PRIu64 "\n", - outputs_[output].ready_queue.queue.size(), input->index, input->priority, - input->reader->get_last_timestamp() - input->base_timestamp, input->blocked_time, - input->blocked_start_time); - if (input->blocked_time > 0) - ++outputs_[output].ready_queue.num_blocked; - input->queue_counter = ++outputs_[output].ready_queue.fifo_counter; - outputs_[output].ready_queue.queue.push(input); - input->containing_output = output; -} - -template -void -scheduler_impl_tmpl_t::add_to_ready_queue(output_ordinal_t output, - input_info_t *input) -{ - auto scoped_lock = acquire_scoped_output_lock_if_necessary(output); - std::lock_guard input_lock(*input->lock); - add_to_ready_queue_hold_locks(output, input); -} - template uint64_t scheduler_impl_tmpl_t::scale_blocked_time( @@ -2270,31 +2207,34 @@ scheduler_impl_tmpl_t::set_cur_input( assert(output >= 0 && output < static_cast(outputs_.size())); // 'input' might be sched_type_t::INVALID_INPUT_ORDINAL. assert(input < static_cast(inputs_.size())); + // The caller should never hold the input lock for MAP_TO_ANY_OUTPUT. + assert(options_.mapping != sched_type_t::MAP_TO_ANY_OUTPUT || + !caller_holds_cur_input_lock); int prev_input = outputs_[output].cur_input; if (prev_input >= 0) { if (prev_input != input) { input_info_t &prev_info = inputs_[prev_input]; - auto scoped_lock = caller_holds_cur_input_lock - ? std::unique_lock() - : std::unique_lock(*prev_info.lock); - prev_info.cur_output = sched_type_t::INVALID_OUTPUT_ORDINAL; - prev_info.last_run_time = get_output_time(output); - if (options_.schedule_record_ostream != nullptr) { - stream_status_t status = close_schedule_segment(output, prev_info); - if (status != sched_type_t::STATUS_OK) - return status; + // This is different from prev_workload below: that waits for a valid + // new workload (so swap-in trigger). + int workload = -1; + { + auto scoped_lock = caller_holds_cur_input_lock + ? std::unique_lock() + : std::unique_lock(*prev_info.lock); + prev_info.cur_output = sched_type_t::INVALID_OUTPUT_ORDINAL; + prev_info.last_run_time = get_output_time(output); + if (options_.schedule_record_ostream != nullptr) { + stream_status_t status = close_schedule_segment(output, prev_info); + if (status != sched_type_t::STATUS_OK) + return status; + } + workload = prev_info.workload; } - } - // Now that we've updated prev_info, add it to the ready queue (once on the - // queues others can see it and pop it off, though they can't access/modify its - // fields (for identifying if it can be migrated, e.g.) until we release the - // input lock, so it should be safe to add first, but this order is more - // straightforward). - if (options_.mapping == sched_type_t::MAP_TO_ANY_OUTPUT && prev_input != input && - !inputs_[prev_input].at_eof) { - // The caller should never hold the input lock for MAP_TO_ANY_OUTPUT. - assert(!caller_holds_cur_input_lock); - add_to_ready_queue(output, &inputs_[prev_input]); + // Let subclasses act on the outgoing input. + stream_status_t res = + swap_out_input(output, prev_input, workload, caller_holds_cur_input_lock); + if (res != sched_type_t::STATUS_OK) + return res; } } else if (options_.schedule_record_ostream != nullptr && (outputs_[output].record.back().type == schedule_record_t::IDLE || @@ -2398,6 +2338,12 @@ scheduler_impl_tmpl_t::set_cur_input( return status; } } + + // Let subclasses act on the incoming input. + stream_status_t res = swap_in_input(output, input); + if (res != sched_type_t::STATUS_OK) + return res; + return sched_type_t::STATUS_OK; } @@ -2425,19 +2371,6 @@ typename scheduler_tmpl_t::stream_status_t scheduler_impl_tmpl_t::pick_next_input(output_ordinal_t output, uint64_t blocked_time) { - VDO(this, 1, { - static int64_t global_heartbeat; - // 10K is too frequent for simple analyzer runs: it is too noisy with - // the new core-sharded-by-default for new users using defaults. - // 50K is a reasonable compromise. - // XXX: Add a runtime option to tweak this. - static constexpr int64_t GLOBAL_HEARTBEAT_CADENCE = 50000; - // We are ok with races as the cadence is approximate. - if (++global_heartbeat % GLOBAL_HEARTBEAT_CADENCE == 0) { - print_queue_stats(); - } - }); - stream_status_t res = sched_type_t::STATUS_OK; const input_ordinal_t prev_index = outputs_[output].cur_input; input_ordinal_t index = sched_type_t::INVALID_INPUT_ORDINAL; @@ -2934,46 +2867,6 @@ scheduler_impl_tmpl_t::get_statistic( return static_cast(outputs_[output].stats[stat]); } -template -void -scheduler_impl_tmpl_t::print_queue_stats() -{ - size_t unsched_size = 0; - { - std::lock_guard unsched_lock(*unscheduled_priority_.lock); - unsched_size = unscheduled_priority_.queue.size(); - } - int live = live_input_count_.load(std::memory_order_acquire); - // Make our multi-line output more atomic. - std::ostringstream ostr; - ostr << "Queue snapshot: inputs: " << live - unsched_size << " schedulable, " - << unsched_size << " unscheduled, " << inputs_.size() - live << " eof\n"; - for (unsigned int i = 0; i < outputs_.size(); ++i) { - auto lock = acquire_scoped_output_lock_if_necessary(i); - uint64_t cur_time = get_output_time(i); - ostr << " out #" << i << " @" << cur_time << ": running #" - << outputs_[i].cur_input << "; " << outputs_[i].ready_queue.queue.size() - << " in queue; " << outputs_[i].ready_queue.num_blocked << " blocked\n"; - std::set readd; - input_info_t *res = nullptr; - while (!outputs_[i].ready_queue.queue.empty()) { - res = outputs_[i].ready_queue.queue.top(); - readd.insert(res); - outputs_[i].ready_queue.queue.pop(); - std::lock_guard input_lock(*res->lock); - if (res->blocked_time > 0) { - ostr << " " << res->index << " still blocked for " - << res->blocked_time - (cur_time - res->blocked_start_time) << "\n"; - } - } - // Re-add the ones we skipped, but without changing their counters so we preserve - // the prior FIFO order. - for (input_info_t *add : readd) - outputs_[i].ready_queue.queue.push(add); - } - VPRINT(this, 0, "%s\n", ostr.str().c_str()); -} - template class scheduler_impl_tmpl_t; template class scheduler_impl_tmpl_t; diff --git a/clients/drcachesim/scheduler/scheduler_impl.h b/clients/drcachesim/scheduler/scheduler_impl.h index e571842085c..c1c3149ccb1 100644 --- a/clients/drcachesim/scheduler/scheduler_impl.h +++ b/clients/drcachesim/scheduler/scheduler_impl.h @@ -247,6 +247,8 @@ template class scheduler_impl_tmpl_t // The units are in simuilation time. uint64_t blocked_time = 0; uint64_t blocked_start_time = 0; + // XXX i#6831: Move fields like this to be specific to subclasses by changing + // inputs_ to a vector of unique_ptr and then subclassing input_info_t. // An input can be "unscheduled" and not on the ready_priority_ run queue at all // with an infinite timeout until directly targeted. Such inputs are stored // in the unscheduled_priority_ queue. @@ -383,29 +385,9 @@ template class scheduler_impl_tmpl_t std::unique_lock acquire_scoped_output_lock_if_necessary(output_ordinal_t output); - // If input->unscheduled is true and input->blocked_time is 0, input - // is placed on the unscheduled_priority_ queue instead. - // The caller cannot hold the input's lock: this routine will acquire it. - void - add_to_ready_queue(output_ordinal_t output, input_info_t *input); - - // Identical to add_to_ready_queue() except the output's lock must be held by the - // caller. - // The caller must also hold the input's lock. - void - add_to_ready_queue_hold_locks(output_ordinal_t output, input_info_t *input); - - // The caller must hold the input's lock. - void - add_to_unscheduled_queue(input_info_t *input); - uint64_t scale_blocked_time(uint64_t initial_time) const; - // Up to the caller to check verbosity before calling. - void - print_queue_stats(); - void update_switch_stats(output_ordinal_t output, input_ordinal_t prev_input, input_ordinal_t new_input); @@ -451,6 +433,8 @@ template class scheduler_impl_tmpl_t input_ordinal_t cur_input = sched_type_t::INVALID_INPUT_ORDINAL; // Holds the prior non-invalid input. input_ordinal_t prev_input = sched_type_t::INVALID_INPUT_ORDINAL; + // XXX i#6831: Move fields like this to be specific to subclasses by changing + // inputs_ to a vector of unique_ptr and then subclassing output_info_t. // For static schedules we can populate this up front and avoid needing a // lock for dynamically finding the next input, keeping things parallel. std::vector input_indices; @@ -570,6 +554,21 @@ template class scheduler_impl_tmpl_t virtual scheduler_status_t set_initial_schedule(std::unordered_map> &workload2inputs) = 0; + // When an output's input changes from a valid (not INVALID_INPUT_ORDINAL) input to + // something else, swap_out_input() is called on the outgoing input (whose lock is + // held if "caller_holds_input_lock" is true; it will never be true for + // MAP_TO_ANY_OUTPUT). This is called after the input's fields (such as cur_output + // and last_run_time) have been updated. + virtual stream_status_t + swap_out_input(output_ordinal_t output, input_ordinal_t input, int workload, + bool caller_holds_input_lock) = 0; + + // When an output's input changes to a valid (not INVALID_INPUT_ORDINAL) input + // different from the previous input, swap_in_input() is called on the incoming + // input (whose lock is always held by the caller). + virtual stream_status_t + swap_in_input(output_ordinal_t output, input_ordinal_t input) = 0; + // Allow subclasses to perform custom initial marker processing during // get_initial_input_content(). Returns whether to keep reading. // The caller will stop calling when an instruction record is reached. @@ -928,11 +927,6 @@ template class scheduler_impl_tmpl_t // ready_queue-related plus record and record_index fields which are accessed under // the output's own lock. std::vector outputs_; - // This lock protects unscheduled_priority_ and unscheduled_counter_. - // It should be acquired *after* both output or input locks: it is narrowmost. - mutex_dbg_owned unsched_lock_; - // Inputs that are unscheduled indefinitely until directly targeted. - input_queue_t unscheduled_priority_; // Count of inputs not yet at eof. std::atomic live_input_count_; // In replay mode, count of outputs not yet at the end of the replay sequence. @@ -995,6 +989,7 @@ class scheduler_dynamic_tmpl_t : public scheduler_impl_tmpl_t; @@ -1008,11 +1003,11 @@ class scheduler_dynamic_tmpl_t : public scheduler_impl_tmpl_t::InputTimestampComparator; using typename scheduler_impl_tmpl_t::workload_tid_t; + using typename scheduler_impl_tmpl_t::input_queue_t; using scheduler_impl_tmpl_t::options_; using scheduler_impl_tmpl_t::outputs_; using scheduler_impl_tmpl_t::inputs_; using scheduler_impl_tmpl_t::error_string_; - using scheduler_impl_tmpl_t::unscheduled_priority_; using scheduler_impl_tmpl_t::set_cur_input; using scheduler_impl_tmpl_t::acquire_scoped_output_lock_if_necessary; @@ -1024,6 +1019,12 @@ class scheduler_dynamic_tmpl_t : public scheduler_impl_tmpl_t> &workload2inputs) override; + stream_status_t + swap_out_input(output_ordinal_t output, input_ordinal_t input, int workload, + bool caller_holds_input_lock) override; + stream_status_t + swap_in_input(output_ordinal_t output, input_ordinal_t input) override; + stream_status_t pick_next_input_for_mode(output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index, input_ordinal_t &index) override; @@ -1058,6 +1059,22 @@ class scheduler_dynamic_tmpl_t : public scheduler_impl_tmpl_tunscheduled is true and input->blocked_time is 0, input + // is placed on the unscheduled_priority_ queue instead. + // The caller cannot hold the input's lock: this routine will acquire it. + void + add_to_ready_queue(output_ordinal_t output, input_info_t *input); + + // Identical to add_to_ready_queue() except the output's lock must be held by the + // caller. + // The caller must also hold the input's lock. + void + add_to_ready_queue_hold_locks(output_ordinal_t output, input_info_t *input); + + // The caller must hold the input's lock. + void + add_to_unscheduled_queue(input_info_t *input); + // "for_output" is which output stream is looking for a new input; only an // input which is able to run on that output will be selected. // for_output can be INVALID_OUTPUT_ORDINAL, which will ignore bindings. @@ -1073,9 +1090,18 @@ class scheduler_dynamic_tmpl_t : public scheduler_impl_tmpl_t rebalancer_; std::atomic last_rebalance_time_; + // This lock protects unscheduled_priority_ and unscheduled_counter_. + // It should be acquired *after* both output or input locks: it is narrowmost. + mutex_dbg_owned unsched_lock_; + // Inputs that are unscheduled indefinitely until directly targeted. + input_queue_t unscheduled_priority_; }; // Specialized code for replaying schedules: either a recorded dynamic schedule @@ -1105,6 +1131,12 @@ class scheduler_replay_tmpl_t : public scheduler_impl_tmpl_t> &workload2inputs) override; + stream_status_t + swap_out_input(output_ordinal_t output, input_ordinal_t input, int workload, + bool caller_holds_input_lock) override; + stream_status_t + swap_in_input(output_ordinal_t output, input_ordinal_t input) override; + stream_status_t pick_next_input_for_mode(output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index, input_ordinal_t &index) override; @@ -1146,6 +1178,12 @@ class scheduler_fixed_tmpl_t : public scheduler_impl_tmpl_t> &workload2inputs) override; + stream_status_t + swap_out_input(output_ordinal_t output, input_ordinal_t input, int workload, + bool caller_holds_input_lock) override; + stream_status_t + swap_in_input(output_ordinal_t output, input_ordinal_t input) override; + stream_status_t pick_next_input_for_mode(output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index, input_ordinal_t &index) override; diff --git a/clients/drcachesim/scheduler/scheduler_replay.cpp b/clients/drcachesim/scheduler/scheduler_replay.cpp index b27a2aa6d41..037c9715dcf 100644 --- a/clients/drcachesim/scheduler/scheduler_replay.cpp +++ b/clients/drcachesim/scheduler/scheduler_replay.cpp @@ -296,6 +296,23 @@ scheduler_replay_tmpl_t::read_and_instantiate_traced_sch return sched_type_t::STATUS_SUCCESS; } +template +typename scheduler_tmpl_t::stream_status_t +scheduler_replay_tmpl_t::swap_out_input( + output_ordinal_t output, input_ordinal_t input, int workload, + bool caller_holds_input_lock) +{ + return sched_type_t::STATUS_OK; +} + +template +typename scheduler_tmpl_t::stream_status_t +scheduler_replay_tmpl_t::swap_in_input(output_ordinal_t output, + input_ordinal_t input) +{ + return sched_type_t::STATUS_OK; +} + template typename scheduler_tmpl_t::stream_status_t scheduler_replay_tmpl_t::pick_next_input_for_mode( diff --git a/clients/drcachesim/tests/scheduler_unit_tests.cpp b/clients/drcachesim/tests/scheduler_unit_tests.cpp index a2f6346b3fd..5521ac198a2 100644 --- a/clients/drcachesim/tests/scheduler_unit_tests.cpp +++ b/clients/drcachesim/tests/scheduler_unit_tests.cpp @@ -2976,6 +2976,17 @@ class test_scheduler_base_t : public scheduler_impl_t { return sched_type_t::STATUS_ERROR_NOT_IMPLEMENTED; } stream_status_t + swap_out_input(output_ordinal_t output, input_ordinal_t input, int workload, + bool caller_holds_input_lock) override + { + return sched_type_t::STATUS_NOT_IMPLEMENTED; + } + stream_status_t + swap_in_input(output_ordinal_t output, input_ordinal_t input) override + { + return sched_type_t::STATUS_NOT_IMPLEMENTED; + } + stream_status_t pick_next_input_for_mode(output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index, input_ordinal_t &index) override {