diff --git a/clients/drcachesim/scheduler/scheduler_dynamic.cpp b/clients/drcachesim/scheduler/scheduler_dynamic.cpp index 9897d010f01..7867f811a4e 100644 --- a/clients/drcachesim/scheduler/scheduler_dynamic.cpp +++ b/clients/drcachesim/scheduler/scheduler_dynamic.cpp @@ -155,7 +155,7 @@ scheduler_dynamic_tmpl_t::set_output_active( } // Move the ready_queue to other outputs. { - auto lock = this->acquire_scoped_output_lock_if_necessary(output); + auto lock = acquire_scoped_output_lock_if_necessary(output); while (!outputs_[output].ready_queue.queue.empty()) { input_info_t *tomove = outputs_[output].ready_queue.queue.top(); ordinals.push_back(tomove->index); @@ -174,7 +174,7 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index, input_ordinal_t &index) { - uint64_t cur_time = this->get_output_time(output); + uint64_t cur_time = get_output_time(output); uint64_t last_time = last_rebalance_time_.load(std::memory_order_acquire); if (last_time == 0) { // Initialize. @@ -200,7 +200,7 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( VPRINT(this, 2, "next_record[%d]: blocked time %" PRIu64 "\n", output, blocked_time); inputs_[prev_index].blocked_time = blocked_time; - inputs_[prev_index].blocked_start_time = this->get_output_time(output); + inputs_[prev_index].blocked_start_time = get_output_time(output); } } if (prev_index != sched_type_t::INVALID_INPUT_ORDINAL && @@ -217,7 +217,7 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( target_input_lock.unlock(); { auto target_output_lock = - this->acquire_scoped_output_lock_if_necessary(target_output); + acquire_scoped_output_lock_if_necessary(target_output); target_input_lock.lock(); if (out.ready_queue.queue.find(target)) { VPRINT(this, 2, @@ -400,7 +400,7 @@ scheduler_dynamic_tmpl_t::check_for_input_switch( need_new_input = true; VPRINT(this, 3, "next_record[%d]: input %d going unscheduled\n", output, input->index); - } else if (this->syscall_incurs_switch(input, blocked_time)) { + } else if (syscall_incurs_switch(input, blocked_time)) { // Model as blocking and should switch to a different input. need_new_input = true; VPRINT(this, 3, "next_record[%d]: hit blocking syscall in input %d\n", @@ -475,6 +475,215 @@ scheduler_dynamic_tmpl_t::check_for_input_switch( return sched_type_t::STATUS_OK; } +template +void +scheduler_dynamic_tmpl_t::process_marker( + input_info_t &input, output_ordinal_t output, trace_marker_type_t marker_type, + uintptr_t marker_value) +{ + assert(input.lock->owned_by_cur_thread()); + switch (marker_type) { + case TRACE_MARKER_TYPE_SYSCALL: + input.processing_syscall = true; + input.pre_syscall_timestamp = input.reader->get_last_timestamp(); + break; + case TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL: + input.processing_maybe_blocking_syscall = true; + // Generally we should already have the timestamp from a just-prior + // syscall marker, but we support tests and other synthetic sequences + // with just a maybe-blocking. + input.pre_syscall_timestamp = input.reader->get_last_timestamp(); + break; + case TRACE_MARKER_TYPE_CONTEXT_SWITCH_START: + outputs_[output].in_context_switch_code = true; + ANNOTATE_FALLTHROUGH; + case TRACE_MARKER_TYPE_SYSCALL_TRACE_START: + outputs_[output].in_kernel_code = true; + break; + case TRACE_MARKER_TYPE_CONTEXT_SWITCH_END: + // We have to delay until the next record. + outputs_[output].hit_switch_code_end = true; + ANNOTATE_FALLTHROUGH; + case TRACE_MARKER_TYPE_SYSCALL_TRACE_END: + outputs_[output].in_kernel_code = false; + break; + case TRACE_MARKER_TYPE_DIRECT_THREAD_SWITCH: { + if (!options_.honor_direct_switches) + break; + ++outputs_[output].stats[memtrace_stream_t::SCHED_STAT_DIRECT_SWITCH_ATTEMPTS]; + memref_tid_t target_tid = marker_value; + auto it = this->tid2input_.find(workload_tid_t(input.workload, target_tid)); + if (it == this->tid2input_.end()) { + VPRINT(this, 1, "Failed to find input for target switch thread %" PRId64 "\n", + target_tid); + } else { + input.switch_to_input = it->second; + } + // Trigger a switch either indefinitely or until timeout. + if (input.skip_next_unscheduled) { + // The underlying kernel mechanism being modeled only supports a single + // request: they cannot accumulate. Timing differences in the trace could + // perhaps result in multiple lining up when the didn't in the real app; + // but changing the scheme here could also push representatives in the + // other direction. + input.skip_next_unscheduled = false; + VPRINT(this, 3, + "input %d unschedule request ignored due to prior schedule request " + "@%" PRIu64 "\n", + input.index, input.reader->get_last_timestamp()); + break; + } + input.unscheduled = true; + if (!options_.honor_infinite_timeouts && input.syscall_timeout_arg == 0) { + // As our scheduling is imperfect we do not risk things being blocked + // indefinitely: we instead have a timeout, but the maximum value. + input.syscall_timeout_arg = options_.block_time_max_us; + if (input.syscall_timeout_arg == 0) + input.syscall_timeout_arg = 1; + } + if (input.syscall_timeout_arg > 0) { + input.blocked_time = scale_blocked_time(input.syscall_timeout_arg); + // Clamp at 1 since 0 means an infinite timeout for unscheduled=true. + if (input.blocked_time == 0) + input.blocked_time = 1; + input.blocked_start_time = get_output_time(output); + VPRINT(this, 3, "input %d unscheduled for %" PRIu64 " @%" PRIu64 "\n", + input.index, input.blocked_time, input.reader->get_last_timestamp()); + } else { + VPRINT(this, 3, "input %d unscheduled indefinitely @%" PRIu64 "\n", + input.index, input.reader->get_last_timestamp()); + } + break; + } + case TRACE_MARKER_TYPE_SYSCALL_ARG_TIMEOUT: + // This is cleared at the post-syscall instr. + input.syscall_timeout_arg = static_cast(marker_value); + break; + case TRACE_MARKER_TYPE_SYSCALL_UNSCHEDULE: + if (!options_.honor_direct_switches) + break; + if (input.skip_next_unscheduled) { + input.skip_next_unscheduled = false; + VPRINT(this, 3, + "input %d unschedule request ignored due to prior schedule request " + "@%" PRIu64 "\n", + input.index, input.reader->get_last_timestamp()); + break; + } + // Trigger a switch either indefinitely or until timeout. + input.unscheduled = true; + if (!options_.honor_infinite_timeouts && input.syscall_timeout_arg == 0) { + // As our scheduling is imperfect we do not risk things being blocked + // indefinitely: we instead have a timeout, but the maximum value. + input.syscall_timeout_arg = options_.block_time_max_us; + if (input.syscall_timeout_arg == 0) + input.syscall_timeout_arg = 1; + } + if (input.syscall_timeout_arg > 0) { + input.blocked_time = scale_blocked_time(input.syscall_timeout_arg); + // Clamp at 1 since 0 means an infinite timeout for unscheduled=true. + if (input.blocked_time == 0) + input.blocked_time = 1; + input.blocked_start_time = get_output_time(output); + VPRINT(this, 3, "input %d unscheduled for %" PRIu64 " @%" PRIu64 "\n", + input.index, input.blocked_time, input.reader->get_last_timestamp()); + } else { + VPRINT(this, 3, "input %d unscheduled indefinitely @%" PRIu64 "\n", + input.index, input.reader->get_last_timestamp()); + } + break; + case TRACE_MARKER_TYPE_SYSCALL_SCHEDULE: { + if (!options_.honor_direct_switches) + break; + memref_tid_t target_tid = marker_value; + auto it = this->tid2input_.find(workload_tid_t(input.workload, target_tid)); + if (it == this->tid2input_.end()) { + VPRINT(this, 1, + "Failed to find input for switchto::resume target tid %" PRId64 "\n", + target_tid); + return; + } + input_ordinal_t target_idx = it->second; + VPRINT(this, 3, "input %d re-scheduling input %d @%" PRIu64 "\n", input.index, + target_idx, input.reader->get_last_timestamp()); + // Release the input lock before acquiring more input locks + input.lock->unlock(); + { + input_info_t *target = &inputs_[target_idx]; + std::unique_lock target_lock(*target->lock); + if (target->at_eof) { + VPRINT(this, 3, "input %d at eof ignoring re-schedule\n", target_idx); + } else if (target->unscheduled) { + target->unscheduled = false; + bool on_unsched_queue = false; + { + std::lock_guard unsched_lock( + *unscheduled_priority_.lock); + if (unscheduled_priority_.queue.find(target)) { + unscheduled_priority_.queue.erase(target); + on_unsched_queue = true; + } + } + // We have to give up the unsched lock before calling add_to_ready_queue + // as it acquires the output lock. + if (on_unsched_queue) { + output_ordinal_t resume_output = target->prev_output; + if (resume_output == sched_type_t::INVALID_OUTPUT_ORDINAL) + resume_output = output; + // We can't hold any locks when calling add_to_ready_queue. + // This input is no longer on any queue, so few things can happen + // while we don't hold the input lock: a competing _SCHEDULE will + // not find the output and it can't have blocked_time>0 (invariant + // for things on unsched q); once it's on the new queue we don't + // do anything further here so we're good to go. + target_lock.unlock(); + this->add_to_ready_queue(resume_output, target); + target_lock.lock(); + } else { + // We assume blocked_time is from _ARG_TIMEOUT and is not from + // regularly-blocking i/o. We assume i/o getting into the mix is + // rare enough or does not matter enough to try to have separate + // timeouts. + if (target->blocked_time > 0) { + VPRINT(this, 3, + "switchto::resume erasing blocked time for target " + "input %d\n", + target->index); + output_ordinal_t target_output = target->containing_output; + // There could be no output owner if we're mid-rebalance. + if (target_output != sched_type_t::INVALID_OUTPUT_ORDINAL) { + // We can't hold the input lock to acquire the output lock. + target_lock.unlock(); + { + auto scoped_output_lock = + acquire_scoped_output_lock_if_necessary( + target_output); + output_info_t &out = outputs_[target_output]; + if (out.ready_queue.queue.find(target)) { + --out.ready_queue.num_blocked; + } + // Decrement this holding the lock to synch with + // pop_from_ready_queue(). + target->blocked_time = 0; + } + target_lock.lock(); + } else + target->blocked_time = 0; + } + } + } else { + VPRINT(this, 3, "input %d will skip next unschedule\n", target_idx); + target->skip_next_unscheduled = true; + } + } + input.lock->lock(); + break; + } + default: // Nothing to do. + break; + } +} + template typename scheduler_tmpl_t::stream_status_t scheduler_dynamic_tmpl_t::rebalance_queues( @@ -490,9 +699,9 @@ scheduler_dynamic_tmpl_t::rebalance_queues( stream_status_t status = sched_type_t::STATUS_OK; assert(options_.mapping == sched_type_t::MAP_TO_ANY_OUTPUT); VPRINT(this, 1, "Output %d triggered a rebalance @%" PRIu64 ":\n", triggering_output, - this->get_output_time(triggering_output)); + get_output_time(triggering_output)); // First, update the time to avoid more threads coming here. - last_rebalance_time_.store(this->get_output_time(triggering_output), + last_rebalance_time_.store(get_output_time(triggering_output), std::memory_order_release); VPRINT(this, 2, "Before rebalance:\n"); VDO(this, 2, { this->print_queue_stats(); }); @@ -553,7 +762,7 @@ scheduler_dynamic_tmpl_t::rebalance_queues( for (unsigned int i = 0; i < outputs_.size(); ++i) { if (!outputs_[i].active->load(std::memory_order_acquire)) continue; - auto lock = this->acquire_scoped_output_lock_if_necessary(i); + auto lock = acquire_scoped_output_lock_if_necessary(i); // Only remove on the 1st iteration; later we can exceed due to binding // constraints. while (iteration == 0 && outputs_[i].ready_queue.queue.size() > avg_ceiling) { @@ -616,6 +825,273 @@ scheduler_dynamic_tmpl_t::rebalance_queues( return status; } +template +typename scheduler_tmpl_t::stream_status_t +scheduler_dynamic_tmpl_t::eof_or_idle_for_mode( + output_ordinal_t output, input_ordinal_t prev_input) +{ + int live_inputs = this->live_input_count_.load(std::memory_order_acquire); + if (live_inputs == 0) { + return sched_type_t::STATUS_EOF; + } + if (live_inputs <= + static_cast(inputs_.size() * options_.exit_if_fraction_inputs_left)) { + VPRINT(this, 1, "output %d exiting early with %d live inputs left\n", output, + live_inputs); + return sched_type_t::STATUS_EOF; + } + // Before going idle, try to steal work from another output. + // We start with us+1 to avoid everyone stealing from the low-numbered outputs. + // We only try when we first transition to idle; we rely on rebalancing after + // that, to avoid repeatededly grabbing other output's locks over and over. + if (!outputs_[output].tried_to_steal_on_idle) { + outputs_[output].tried_to_steal_on_idle = true; + for (unsigned int i = 1; i < outputs_.size(); ++i) { + output_ordinal_t target = (output + i) % outputs_.size(); + assert(target != output); // Sanity check (we won't reach "output"). + input_info_t *queue_next = nullptr; + stream_status_t status = pop_from_ready_queue(target, output, queue_next); + if (status == sched_type_t::STATUS_OK && queue_next != nullptr) { + set_cur_input(output, queue_next->index); + ++outputs_[output].stats[memtrace_stream_t::SCHED_STAT_RUNQUEUE_STEALS]; + VPRINT(this, 2, + "eof_or_idle: output %d stole input %d from %d's ready_queue\n", + output, queue_next->index, target); + return sched_type_t::STATUS_STOLE; + } + // We didn't find anything; loop and check another output. + } + VPRINT(this, 3, "eof_or_idle: output %d failed to steal from anyone\n", output); + } + return sched_type_t::STATUS_IDLE; +} + +template +bool +scheduler_dynamic_tmpl_t::syscall_incurs_switch( + input_info_t *input, uint64_t &blocked_time) +{ + assert(input->lock->owned_by_cur_thread()); + uint64_t post_time = input->reader->get_last_timestamp(); + assert(input->processing_syscall || input->processing_maybe_blocking_syscall); + if (input->reader->get_version() < TRACE_ENTRY_VERSION_FREQUENT_TIMESTAMPS) { + // This is a legacy trace that does not have timestamps bracketing syscalls. + // We switch on every maybe-blocking syscall in this case and have a simplified + // blocking model. + blocked_time = options_.blocking_switch_threshold; + return input->processing_maybe_blocking_syscall; + } + assert(input->pre_syscall_timestamp > 0); + assert(input->pre_syscall_timestamp <= post_time); + uint64_t latency = post_time - input->pre_syscall_timestamp; + uint64_t threshold = input->processing_maybe_blocking_syscall + ? options_.blocking_switch_threshold + : options_.syscall_switch_threshold; + blocked_time = scale_blocked_time(latency); + VPRINT(this, 3, + "input %d %ssyscall latency %" PRIu64 " * scale %6.3f => blocked time %" PRIu64 + "\n", + input->index, + input->processing_maybe_blocking_syscall ? "maybe-blocking " : "", latency, + options_.block_time_multiplier, blocked_time); + return latency >= threshold; +} + +template +bool +scheduler_dynamic_tmpl_t::ready_queue_empty( + output_ordinal_t output) +{ + auto lock = acquire_scoped_output_lock_if_necessary(output); + return outputs_[output].ready_queue.queue.empty(); +} + +template +typename scheduler_tmpl_t::stream_status_t +scheduler_dynamic_tmpl_t::pop_from_ready_queue_hold_locks( + output_ordinal_t from_output, output_ordinal_t for_output, input_info_t *&new_input, + bool from_back) +{ + assert(!this->need_output_lock() || + (outputs_[from_output].ready_queue.lock->owned_by_cur_thread() && + (from_output == for_output || + for_output == sched_type_t::INVALID_OUTPUT_ORDINAL || + outputs_[for_output].ready_queue.lock->owned_by_cur_thread()))); + std::set skipped; + std::set blocked; + input_info_t *res = nullptr; + stream_status_t status = sched_type_t::STATUS_OK; + uint64_t cur_time = get_output_time(from_output); + while (!outputs_[from_output].ready_queue.queue.empty()) { + if (from_back) { + res = outputs_[from_output].ready_queue.queue.back(); + outputs_[from_output].ready_queue.queue.erase(res); + } else if (options_.randomize_next_input) { + res = outputs_[from_output].ready_queue.queue.get_random_entry(); + outputs_[from_output].ready_queue.queue.erase(res); + } else { + res = outputs_[from_output].ready_queue.queue.top(); + outputs_[from_output].ready_queue.queue.pop(); + } + std::lock_guard input_lock(*res->lock); + assert(!res->unscheduled || + res->blocked_time > 0); // Should be in unscheduled_priority_. + if (res->binding.empty() || for_output == sched_type_t::INVALID_OUTPUT_ORDINAL || + res->binding.find(for_output) != res->binding.end()) { + // For blocked inputs, as we don't have interrupts or other regular + // control points we only check for being unblocked when an input + // would be chosen to run. We thus keep blocked inputs in the ready queue. + if (res->blocked_time > 0) { + --outputs_[from_output].ready_queue.num_blocked; + if (!options_.honor_infinite_timeouts) { + // cur_time can be 0 at initialization time. + if (res->blocked_start_time == 0 && cur_time > 0) { + // This was a start-unscheduled input: we didn't have a valid + // time at initialization. + res->blocked_start_time = cur_time; + } + } else + assert(cur_time > 0); + } + if (res->blocked_time > 0 && + // cur_time can be 0 at initialization time. + (cur_time == 0 || + // Guard against time going backward (happens for wall-clock: i#6966). + cur_time < res->blocked_start_time || + cur_time - res->blocked_start_time < res->blocked_time)) { + VPRINT(this, 4, "pop queue: %d still blocked for %" PRIu64 "\n", + res->index, + res->blocked_time - (cur_time - res->blocked_start_time)); + // We keep searching for a suitable input. + blocked.insert(res); + } else { + // This input is no longer blocked. + res->blocked_time = 0; + res->unscheduled = false; + VPRINT(this, 4, "pop queue: %d @ %" PRIu64 " no longer blocked\n", + res->index, cur_time); + // We've found a candidate. One final check if this is a migration. + bool found_candidate = false; + if (from_output == for_output) + found_candidate = true; + else { + assert(cur_time > 0 || res->last_run_time == 0); + if (res->last_run_time == 0) { + // For never-executed inputs we consider their last execution + // to be the very first simulation time, which we can't + // easily initialize until here. + res->last_run_time = outputs_[from_output].initial_cur_time->load( + std::memory_order_acquire); + } + VPRINT(this, 5, + "migration check %d to %d: cur=%" PRIu64 " last=%" PRIu64 + " delta=%" PRId64 " vs thresh %" PRIu64 "\n", + from_output, for_output, cur_time, res->last_run_time, + cur_time - res->last_run_time, + options_.migration_threshold_us); + // Guard against time going backward (happens for wall-clock: i#6966). + if (options_.migration_threshold_us == 0 || + // Allow free movement for the initial load balance at init time. + cur_time == 0 || + (cur_time > res->last_run_time && + cur_time - res->last_run_time >= + static_cast(options_.migration_threshold_us * + options_.time_units_per_us))) { + VPRINT(this, 2, "migrating %d to %d\n", from_output, for_output); + found_candidate = true; + // Do not count an initial rebalance as a migration. + if (cur_time > 0) { + ++outputs_[from_output] + .stats[memtrace_stream_t::SCHED_STAT_MIGRATIONS]; + } + } + } + if (found_candidate) + break; + else + skipped.insert(res); + } + } else { + // We keep searching for a suitable input. + skipped.insert(res); + } + res = nullptr; + } + if (res == nullptr && !blocked.empty()) { + // Do not hand out EOF thinking we're done: we still have inputs blocked + // on i/o, so just wait and retry. + if (for_output != sched_type_t::INVALID_OUTPUT_ORDINAL) + ++outputs_[for_output].idle_count; + status = sched_type_t::STATUS_IDLE; + } + // Re-add the ones we skipped, but without changing their counters so we preserve + // the prior FIFO order. + for (input_info_t *save : skipped) + outputs_[from_output].ready_queue.queue.push(save); + // Re-add the blocked ones to the back. + for (input_info_t *save : blocked) { + std::lock_guard input_lock(*save->lock); + this->add_to_ready_queue_hold_locks(from_output, save); + } + auto res_lock = (res == nullptr) ? std::unique_lock() + : std::unique_lock(*res->lock); + VDO(this, 1, { + static int output_heartbeat; + // We are ok with races as the cadence is approximate. + if (++output_heartbeat % 2000 == 0) { + size_t unsched_size = 0; + { + std::lock_guard unsched_lock( + *unscheduled_priority_.lock); + unsched_size = unscheduled_priority_.queue.size(); + } + VPRINT(this, 1, + "heartbeat[%d] %zd in queue; %d blocked; %zd unscheduled => %d %d\n", + from_output, outputs_[from_output].ready_queue.queue.size(), + outputs_[from_output].ready_queue.num_blocked, unsched_size, + res == nullptr ? -1 : res->index, status); + } + }); + if (res != nullptr) { + VPRINT(this, 4, + "pop_from_ready_queue[%d] (post-size %zu): input %d priority %d timestamp " + "delta %" PRIu64 "\n", + from_output, outputs_[from_output].ready_queue.queue.size(), res->index, + res->priority, res->reader->get_last_timestamp() - res->base_timestamp); + res->unscheduled = false; + res->prev_output = res->containing_output; + res->containing_output = for_output; + } + new_input = res; + return status; +} + +template +typename scheduler_tmpl_t::stream_status_t +scheduler_dynamic_tmpl_t::pop_from_ready_queue( + output_ordinal_t from_output, output_ordinal_t for_output, input_info_t *&new_input) +{ + stream_status_t status = sched_type_t::STATUS_OK; + { + std::unique_lock from_lock; + std::unique_lock for_lock; + // If we need both locks, acquire in increasing output order to avoid deadlocks if + // two outputs try to steal from each other. + if (from_output == for_output || + for_output == sched_type_t::INVALID_OUTPUT_ORDINAL) { + from_lock = acquire_scoped_output_lock_if_necessary(from_output); + } else if (from_output < for_output) { + from_lock = acquire_scoped_output_lock_if_necessary(from_output); + for_lock = acquire_scoped_output_lock_if_necessary(for_output); + } else { + for_lock = acquire_scoped_output_lock_if_necessary(for_output); + from_lock = acquire_scoped_output_lock_if_necessary(from_output); + } + status = pop_from_ready_queue_hold_locks(from_output, for_output, new_input); + } + return status; +} + template class scheduler_dynamic_tmpl_t; template class scheduler_dynamic_tmpl_t; diff --git a/clients/drcachesim/scheduler/scheduler_fixed.cpp b/clients/drcachesim/scheduler/scheduler_fixed.cpp index 2b7adee2be7..93227e62ee0 100644 --- a/clients/drcachesim/scheduler/scheduler_fixed.cpp +++ b/clients/drcachesim/scheduler/scheduler_fixed.cpp @@ -151,6 +151,18 @@ scheduler_fixed_tmpl_t::check_for_input_switch( return sched_type_t::STATUS_OK; } +template +typename scheduler_tmpl_t::stream_status_t +scheduler_fixed_tmpl_t::eof_or_idle_for_mode( + output_ordinal_t output, input_ordinal_t prev_input) +{ + if (options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT || + this->live_input_count_.load(std::memory_order_acquire) == 0) { + return sched_type_t::STATUS_EOF; + } + return sched_type_t::STATUS_IDLE; +} + template class scheduler_fixed_tmpl_t; template class scheduler_fixed_tmpl_t; diff --git a/clients/drcachesim/scheduler/scheduler_impl.cpp b/clients/drcachesim/scheduler/scheduler_impl.cpp index 89f5d65c1a7..77e845e9c7d 100644 --- a/clients/drcachesim/scheduler/scheduler_impl.cpp +++ b/clients/drcachesim/scheduler/scheduler_impl.cpp @@ -2184,14 +2184,6 @@ scheduler_impl_tmpl_t::close_schedule_segment( return sched_type_t::STATUS_OK; } -template -bool -scheduler_impl_tmpl_t::ready_queue_empty(output_ordinal_t output) -{ - auto lock = acquire_scoped_output_lock_if_necessary(output); - return outputs_[output].ready_queue.queue.empty(); -} - template void scheduler_impl_tmpl_t::add_to_unscheduled_queue( @@ -2249,192 +2241,6 @@ scheduler_impl_tmpl_t::add_to_ready_queue(output_ordinal add_to_ready_queue_hold_locks(output, input); } -template -typename scheduler_tmpl_t::stream_status_t -scheduler_impl_tmpl_t::pop_from_ready_queue_hold_locks( - output_ordinal_t from_output, output_ordinal_t for_output, input_info_t *&new_input, - bool from_back) -{ - assert(!need_output_lock() || - (outputs_[from_output].ready_queue.lock->owned_by_cur_thread() && - (from_output == for_output || - for_output == sched_type_t::INVALID_OUTPUT_ORDINAL || - outputs_[for_output].ready_queue.lock->owned_by_cur_thread()))); - std::set skipped; - std::set blocked; - input_info_t *res = nullptr; - stream_status_t status = sched_type_t::STATUS_OK; - uint64_t cur_time = get_output_time(from_output); - while (!outputs_[from_output].ready_queue.queue.empty()) { - if (from_back) { - res = outputs_[from_output].ready_queue.queue.back(); - outputs_[from_output].ready_queue.queue.erase(res); - } else if (options_.randomize_next_input) { - res = outputs_[from_output].ready_queue.queue.get_random_entry(); - outputs_[from_output].ready_queue.queue.erase(res); - } else { - res = outputs_[from_output].ready_queue.queue.top(); - outputs_[from_output].ready_queue.queue.pop(); - } - std::lock_guard input_lock(*res->lock); - assert(!res->unscheduled || - res->blocked_time > 0); // Should be in unscheduled_priority_. - if (res->binding.empty() || for_output == sched_type_t::INVALID_OUTPUT_ORDINAL || - res->binding.find(for_output) != res->binding.end()) { - // For blocked inputs, as we don't have interrupts or other regular - // control points we only check for being unblocked when an input - // would be chosen to run. We thus keep blocked inputs in the ready queue. - if (res->blocked_time > 0) { - --outputs_[from_output].ready_queue.num_blocked; - if (!options_.honor_infinite_timeouts) { - // cur_time can be 0 at initialization time. - if (res->blocked_start_time == 0 && cur_time > 0) { - // This was a start-unscheduled input: we didn't have a valid - // time at initialization. - res->blocked_start_time = cur_time; - } - } else - assert(cur_time > 0); - } - if (res->blocked_time > 0 && - // cur_time can be 0 at initialization time. - (cur_time == 0 || - // Guard against time going backward (happens for wall-clock: i#6966). - cur_time < res->blocked_start_time || - cur_time - res->blocked_start_time < res->blocked_time)) { - VPRINT(this, 4, "pop queue: %d still blocked for %" PRIu64 "\n", - res->index, - res->blocked_time - (cur_time - res->blocked_start_time)); - // We keep searching for a suitable input. - blocked.insert(res); - } else { - // This input is no longer blocked. - res->blocked_time = 0; - res->unscheduled = false; - VPRINT(this, 4, "pop queue: %d @ %" PRIu64 " no longer blocked\n", - res->index, cur_time); - // We've found a candidate. One final check if this is a migration. - bool found_candidate = false; - if (from_output == for_output) - found_candidate = true; - else { - assert(cur_time > 0 || res->last_run_time == 0); - if (res->last_run_time == 0) { - // For never-executed inputs we consider their last execution - // to be the very first simulation time, which we can't - // easily initialize until here. - res->last_run_time = outputs_[from_output].initial_cur_time->load( - std::memory_order_acquire); - } - VPRINT(this, 5, - "migration check %d to %d: cur=%" PRIu64 " last=%" PRIu64 - " delta=%" PRId64 " vs thresh %" PRIu64 "\n", - from_output, for_output, cur_time, res->last_run_time, - cur_time - res->last_run_time, - options_.migration_threshold_us); - // Guard against time going backward (happens for wall-clock: i#6966). - if (options_.migration_threshold_us == 0 || - // Allow free movement for the initial load balance at init time. - cur_time == 0 || - (cur_time > res->last_run_time && - cur_time - res->last_run_time >= - static_cast(options_.migration_threshold_us * - options_.time_units_per_us))) { - VPRINT(this, 2, "migrating %d to %d\n", from_output, for_output); - found_candidate = true; - // Do not count an initial rebalance as a migration. - if (cur_time > 0) { - ++outputs_[from_output] - .stats[memtrace_stream_t::SCHED_STAT_MIGRATIONS]; - } - } - } - if (found_candidate) - break; - else - skipped.insert(res); - } - } else { - // We keep searching for a suitable input. - skipped.insert(res); - } - res = nullptr; - } - if (res == nullptr && !blocked.empty()) { - // Do not hand out EOF thinking we're done: we still have inputs blocked - // on i/o, so just wait and retry. - if (for_output != sched_type_t::INVALID_OUTPUT_ORDINAL) - ++outputs_[for_output].idle_count; - status = sched_type_t::STATUS_IDLE; - } - // Re-add the ones we skipped, but without changing their counters so we preserve - // the prior FIFO order. - for (input_info_t *save : skipped) - outputs_[from_output].ready_queue.queue.push(save); - // Re-add the blocked ones to the back. - for (input_info_t *save : blocked) { - std::lock_guard input_lock(*save->lock); - add_to_ready_queue_hold_locks(from_output, save); - } - auto res_lock = (res == nullptr) ? std::unique_lock() - : std::unique_lock(*res->lock); - VDO(this, 1, { - static int output_heartbeat; - // We are ok with races as the cadence is approximate. - if (++output_heartbeat % 2000 == 0) { - size_t unsched_size = 0; - { - std::lock_guard unsched_lock( - *unscheduled_priority_.lock); - unsched_size = unscheduled_priority_.queue.size(); - } - VPRINT(this, 1, - "heartbeat[%d] %zd in queue; %d blocked; %zd unscheduled => %d %d\n", - from_output, outputs_[from_output].ready_queue.queue.size(), - outputs_[from_output].ready_queue.num_blocked, unsched_size, - res == nullptr ? -1 : res->index, status); - } - }); - if (res != nullptr) { - VPRINT(this, 4, - "pop_from_ready_queue[%d] (post-size %zu): input %d priority %d timestamp " - "delta %" PRIu64 "\n", - from_output, outputs_[from_output].ready_queue.queue.size(), res->index, - res->priority, res->reader->get_last_timestamp() - res->base_timestamp); - res->unscheduled = false; - res->prev_output = res->containing_output; - res->containing_output = for_output; - } - new_input = res; - return status; -} - -template -typename scheduler_tmpl_t::stream_status_t -scheduler_impl_tmpl_t::pop_from_ready_queue( - output_ordinal_t from_output, output_ordinal_t for_output, input_info_t *&new_input) -{ - stream_status_t status = sched_type_t::STATUS_OK; - { - std::unique_lock from_lock; - std::unique_lock for_lock; - // If we need both locks, acquire in increasing output order to avoid deadlocks if - // two outputs try to steal from each other. - if (from_output == for_output || - for_output == sched_type_t::INVALID_OUTPUT_ORDINAL) { - from_lock = acquire_scoped_output_lock_if_necessary(from_output); - } else if (from_output < for_output) { - from_lock = acquire_scoped_output_lock_if_necessary(from_output); - for_lock = acquire_scoped_output_lock_if_necessary(for_output); - } else { - for_lock = acquire_scoped_output_lock_if_necessary(for_output); - from_lock = acquire_scoped_output_lock_if_necessary(from_output); - } - status = pop_from_ready_queue_hold_locks(from_output, for_output, new_input); - } - return status; -} - template uint64_t scheduler_impl_tmpl_t::scale_blocked_time( @@ -2453,37 +2259,6 @@ scheduler_impl_tmpl_t::scale_blocked_time( return static_cast(scaled_us * options_.time_units_per_us); } -template -bool -scheduler_impl_tmpl_t::syscall_incurs_switch( - input_info_t *input, uint64_t &blocked_time) -{ - assert(input->lock->owned_by_cur_thread()); - uint64_t post_time = input->reader->get_last_timestamp(); - assert(input->processing_syscall || input->processing_maybe_blocking_syscall); - if (input->reader->get_version() < TRACE_ENTRY_VERSION_FREQUENT_TIMESTAMPS) { - // This is a legacy trace that does not have timestamps bracketing syscalls. - // We switch on every maybe-blocking syscall in this case and have a simplified - // blocking model. - blocked_time = options_.blocking_switch_threshold; - return input->processing_maybe_blocking_syscall; - } - assert(input->pre_syscall_timestamp > 0); - assert(input->pre_syscall_timestamp <= post_time); - uint64_t latency = post_time - input->pre_syscall_timestamp; - uint64_t threshold = input->processing_maybe_blocking_syscall - ? options_.blocking_switch_threshold - : options_.syscall_switch_threshold; - blocked_time = scale_blocked_time(latency); - VPRINT(this, 3, - "input %d %ssyscall latency %" PRIu64 " * scale %6.3f => blocked time %" PRIu64 - "\n", - input->index, - input->processing_maybe_blocking_syscall ? "maybe-blocking " : "", latency, - options_.block_time_multiplier, blocked_time); - return latency >= threshold; -} - template typename scheduler_tmpl_t::stream_status_t scheduler_impl_tmpl_t::set_cur_input( @@ -2716,215 +2491,6 @@ scheduler_impl_tmpl_t::update_switch_stats( } } -template -void -scheduler_impl_tmpl_t::process_marker( - input_info_t &input, output_ordinal_t output, trace_marker_type_t marker_type, - uintptr_t marker_value) -{ - assert(input.lock->owned_by_cur_thread()); - switch (marker_type) { - case TRACE_MARKER_TYPE_SYSCALL: - input.processing_syscall = true; - input.pre_syscall_timestamp = input.reader->get_last_timestamp(); - break; - case TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL: - input.processing_maybe_blocking_syscall = true; - // Generally we should already have the timestamp from a just-prior - // syscall marker, but we support tests and other synthetic sequences - // with just a maybe-blocking. - input.pre_syscall_timestamp = input.reader->get_last_timestamp(); - break; - case TRACE_MARKER_TYPE_CONTEXT_SWITCH_START: - outputs_[output].in_context_switch_code = true; - ANNOTATE_FALLTHROUGH; - case TRACE_MARKER_TYPE_SYSCALL_TRACE_START: - outputs_[output].in_kernel_code = true; - break; - case TRACE_MARKER_TYPE_CONTEXT_SWITCH_END: - // We have to delay until the next record. - outputs_[output].hit_switch_code_end = true; - ANNOTATE_FALLTHROUGH; - case TRACE_MARKER_TYPE_SYSCALL_TRACE_END: - outputs_[output].in_kernel_code = false; - break; - case TRACE_MARKER_TYPE_DIRECT_THREAD_SWITCH: { - if (!options_.honor_direct_switches) - break; - ++outputs_[output].stats[memtrace_stream_t::SCHED_STAT_DIRECT_SWITCH_ATTEMPTS]; - memref_tid_t target_tid = marker_value; - auto it = tid2input_.find(workload_tid_t(input.workload, target_tid)); - if (it == tid2input_.end()) { - VPRINT(this, 1, "Failed to find input for target switch thread %" PRId64 "\n", - target_tid); - } else { - input.switch_to_input = it->second; - } - // Trigger a switch either indefinitely or until timeout. - if (input.skip_next_unscheduled) { - // The underlying kernel mechanism being modeled only supports a single - // request: they cannot accumulate. Timing differences in the trace could - // perhaps result in multiple lining up when the didn't in the real app; - // but changing the scheme here could also push representatives in the - // other direction. - input.skip_next_unscheduled = false; - VPRINT(this, 3, - "input %d unschedule request ignored due to prior schedule request " - "@%" PRIu64 "\n", - input.index, input.reader->get_last_timestamp()); - break; - } - input.unscheduled = true; - if (!options_.honor_infinite_timeouts && input.syscall_timeout_arg == 0) { - // As our scheduling is imperfect we do not risk things being blocked - // indefinitely: we instead have a timeout, but the maximum value. - input.syscall_timeout_arg = options_.block_time_max_us; - if (input.syscall_timeout_arg == 0) - input.syscall_timeout_arg = 1; - } - if (input.syscall_timeout_arg > 0) { - input.blocked_time = scale_blocked_time(input.syscall_timeout_arg); - // Clamp at 1 since 0 means an infinite timeout for unscheduled=true. - if (input.blocked_time == 0) - input.blocked_time = 1; - input.blocked_start_time = get_output_time(output); - VPRINT(this, 3, "input %d unscheduled for %" PRIu64 " @%" PRIu64 "\n", - input.index, input.blocked_time, input.reader->get_last_timestamp()); - } else { - VPRINT(this, 3, "input %d unscheduled indefinitely @%" PRIu64 "\n", - input.index, input.reader->get_last_timestamp()); - } - break; - } - case TRACE_MARKER_TYPE_SYSCALL_ARG_TIMEOUT: - // This is cleared at the post-syscall instr. - input.syscall_timeout_arg = static_cast(marker_value); - break; - case TRACE_MARKER_TYPE_SYSCALL_UNSCHEDULE: - if (!options_.honor_direct_switches) - break; - if (input.skip_next_unscheduled) { - input.skip_next_unscheduled = false; - VPRINT(this, 3, - "input %d unschedule request ignored due to prior schedule request " - "@%" PRIu64 "\n", - input.index, input.reader->get_last_timestamp()); - break; - } - // Trigger a switch either indefinitely or until timeout. - input.unscheduled = true; - if (!options_.honor_infinite_timeouts && input.syscall_timeout_arg == 0) { - // As our scheduling is imperfect we do not risk things being blocked - // indefinitely: we instead have a timeout, but the maximum value. - input.syscall_timeout_arg = options_.block_time_max_us; - if (input.syscall_timeout_arg == 0) - input.syscall_timeout_arg = 1; - } - if (input.syscall_timeout_arg > 0) { - input.blocked_time = scale_blocked_time(input.syscall_timeout_arg); - // Clamp at 1 since 0 means an infinite timeout for unscheduled=true. - if (input.blocked_time == 0) - input.blocked_time = 1; - input.blocked_start_time = get_output_time(output); - VPRINT(this, 3, "input %d unscheduled for %" PRIu64 " @%" PRIu64 "\n", - input.index, input.blocked_time, input.reader->get_last_timestamp()); - } else { - VPRINT(this, 3, "input %d unscheduled indefinitely @%" PRIu64 "\n", - input.index, input.reader->get_last_timestamp()); - } - break; - case TRACE_MARKER_TYPE_SYSCALL_SCHEDULE: { - if (!options_.honor_direct_switches) - break; - memref_tid_t target_tid = marker_value; - auto it = tid2input_.find(workload_tid_t(input.workload, target_tid)); - if (it == tid2input_.end()) { - VPRINT(this, 1, - "Failed to find input for switchto::resume target tid %" PRId64 "\n", - target_tid); - return; - } - input_ordinal_t target_idx = it->second; - VPRINT(this, 3, "input %d re-scheduling input %d @%" PRIu64 "\n", input.index, - target_idx, input.reader->get_last_timestamp()); - // Release the input lock before acquiring more input locks - input.lock->unlock(); - { - input_info_t *target = &inputs_[target_idx]; - std::unique_lock target_lock(*target->lock); - if (target->at_eof) { - VPRINT(this, 3, "input %d at eof ignoring re-schedule\n", target_idx); - } else if (target->unscheduled) { - target->unscheduled = false; - bool on_unsched_queue = false; - { - std::lock_guard unsched_lock( - *unscheduled_priority_.lock); - if (unscheduled_priority_.queue.find(target)) { - unscheduled_priority_.queue.erase(target); - on_unsched_queue = true; - } - } - // We have to give up the unsched lock before calling add_to_ready_queue - // as it acquires the output lock. - if (on_unsched_queue) { - output_ordinal_t resume_output = target->prev_output; - if (resume_output == sched_type_t::INVALID_OUTPUT_ORDINAL) - resume_output = output; - // We can't hold any locks when calling add_to_ready_queue. - // This input is no longer on any queue, so few things can happen - // while we don't hold the input lock: a competing _SCHEDULE will - // not find the output and it can't have blocked_time>0 (invariant - // for things on unsched q); once it's on the new queue we don't - // do anything further here so we're good to go. - target_lock.unlock(); - add_to_ready_queue(resume_output, target); - target_lock.lock(); - } else { - // We assume blocked_time is from _ARG_TIMEOUT and is not from - // regularly-blocking i/o. We assume i/o getting into the mix is - // rare enough or does not matter enough to try to have separate - // timeouts. - if (target->blocked_time > 0) { - VPRINT(this, 3, - "switchto::resume erasing blocked time for target " - "input %d\n", - target->index); - output_ordinal_t target_output = target->containing_output; - // There could be no output owner if we're mid-rebalance. - if (target_output != sched_type_t::INVALID_OUTPUT_ORDINAL) { - // We can't hold the input lock to acquire the output lock. - target_lock.unlock(); - { - auto scoped_output_lock = - acquire_scoped_output_lock_if_necessary( - target_output); - output_info_t &out = outputs_[target_output]; - if (out.ready_queue.queue.find(target)) { - --out.ready_queue.num_blocked; - } - // Decrement this holding the lock to synch with - // pop_from_ready_queue(). - target->blocked_time = 0; - } - target_lock.lock(); - } else - target->blocked_time = 0; - } - } - } else { - VPRINT(this, 3, "input %d will skip next unschedule\n", target_idx); - target->skip_next_unscheduled = true; - } - } - input.lock->lock(); - break; - } - default: // Nothing to do. - break; - } -} - template typename scheduler_tmpl_t::stream_status_t scheduler_impl_tmpl_t::next_record(output_ordinal_t output, @@ -3325,55 +2891,10 @@ typename scheduler_tmpl_t::stream_status_t scheduler_impl_tmpl_t::eof_or_idle(output_ordinal_t output, input_ordinal_t prev_input) { - // XXX i#6831: Refactor to use subclasses or templates to specialize - // scheduler code based on mapping options, to avoid these top-level - // conditionals in many functions? - int live_inputs = live_input_count_.load(std::memory_order_acquire); - if (options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT || live_inputs == 0 || - // While a full schedule recorded should have each input hit either its - // EOF or ROI end, we have a fallback to avoid hangs for possible recorded - // schedules that end an input early deliberately without an ROI. - (options_.mapping == sched_type_t::MAP_AS_PREVIOUSLY && - live_replay_output_count_.load(std::memory_order_acquire) == 0)) { - assert(options_.mapping != sched_type_t::MAP_AS_PREVIOUSLY || - outputs_[output].at_eof); - return sched_type_t::STATUS_EOF; - } - if (options_.mapping == sched_type_t::MAP_TO_ANY_OUTPUT && - live_inputs <= - static_cast(inputs_.size() * options_.exit_if_fraction_inputs_left)) { - VPRINT(this, 1, "output %d exiting early with %d live inputs left\n", output, - live_inputs); - return sched_type_t::STATUS_EOF; - } - if (options_.mapping == sched_type_t::MAP_TO_ANY_OUTPUT) { - // Before going idle, try to steal work from another output. - // We start with us+1 to avoid everyone stealing from the low-numbered outputs. - // We only try when we first transition to idle; we rely on rebalancing after - // that, to avoid repeatededly grabbing other output's locks over and over. - if (!outputs_[output].tried_to_steal_on_idle) { - outputs_[output].tried_to_steal_on_idle = true; - for (unsigned int i = 1; i < outputs_.size(); ++i) { - output_ordinal_t target = (output + i) % outputs_.size(); - assert(target != output); // Sanity check (we won't reach "output"). - input_info_t *queue_next = nullptr; - stream_status_t status = pop_from_ready_queue(target, output, queue_next); - if (status == sched_type_t::STATUS_OK && queue_next != nullptr) { - set_cur_input(output, queue_next->index); - ++outputs_[output] - .stats[memtrace_stream_t::SCHED_STAT_RUNQUEUE_STEALS]; - VPRINT( - this, 2, - "eof_or_idle: output %d stole input %d from %d's ready_queue\n", - output, queue_next->index, target); - return sched_type_t::STATUS_STOLE; - } - // We didn't find anything; loop and check another output. - } - VPRINT(this, 3, "eof_or_idle: output %d failed to steal from anyone\n", - output); - } - } + stream_status_t res = eof_or_idle_for_mode(output, prev_input); + assert(res != sched_type_t::STATUS_OK); + if (res != sched_type_t::STATUS_IDLE) + return res; // We rely on rebalancing to handle the case of every input being unscheduled. outputs_[output].waiting = true; if (prev_input != sched_type_t::INVALID_INPUT_ORDINAL) diff --git a/clients/drcachesim/scheduler/scheduler_impl.h b/clients/drcachesim/scheduler/scheduler_impl.h index a8c84f7cc55..eb7e51ffe11 100644 --- a/clients/drcachesim/scheduler/scheduler_impl.h +++ b/clients/drcachesim/scheduler/scheduler_impl.h @@ -383,9 +383,6 @@ template class scheduler_impl_tmpl_t std::unique_lock acquire_scoped_output_lock_if_necessary(output_ordinal_t output); - bool - ready_queue_empty(output_ordinal_t output); - // If input->unscheduled is true and input->blocked_time is 0, input // is placed on the unscheduled_priority_ queue instead. // The caller cannot hold the input's lock: this routine will acquire it. @@ -405,26 +402,6 @@ template class scheduler_impl_tmpl_t uint64_t scale_blocked_time(uint64_t initial_time) const; - // The input's lock must be held by the caller. - // Returns a multiplier for how long the input should be considered blocked. - bool - syscall_incurs_switch(input_info_t *input, uint64_t &blocked_time); - - // "for_output" is which output stream is looking for a new input; only an - // input which is able to run on that output will be selected. - // for_output can be INVALID_OUTPUT_ORDINAL, which will ignore bindings. - // If from_output != for_output (including for_output == INVALID_OUTPUT_ORDINAL) - // this is a migration and only migration-ready inputs will be picked. - stream_status_t - pop_from_ready_queue(output_ordinal_t from_output, output_ordinal_t for_output, - input_info_t *&new_input); - - // Identical to pop_from_ready_queue but the caller must hold both output locks. - stream_status_t - pop_from_ready_queue_hold_locks(output_ordinal_t from_output, - output_ordinal_t for_output, input_info_t *&new_input, - bool from_back = false); - // Up to the caller to check verbosity before calling. void print_queue_stats(); @@ -821,12 +798,6 @@ template class scheduler_impl_tmpl_t void print_record(const RecordType &record); - // Process each marker seen for MAP_TO_ANY_OUTPUT during next_record(). - // The input's lock must be held by the caller. - virtual void - process_marker(input_info_t &input, output_ordinal_t output, - trace_marker_type_t marker_type, uintptr_t marker_value); - // Returns the get_stream_name() value for the current input stream scheduled on // the 'output_ordinal'-th output stream. std::string @@ -909,6 +880,11 @@ template class scheduler_impl_tmpl_t stream_status_t eof_or_idle(output_ordinal_t output, input_ordinal_t prev_input); + // mapping_t-mode specific actions when one output runs out of things to do. + // Success return values are either STATUS_IDLE or STATUS_EOF. + virtual stream_status_t + eof_or_idle_for_mode(output_ordinal_t output, input_ordinal_t prev_input) = 0; + // Returns whether the current record for the current input stream scheduled on // the 'output_ordinal'-th output stream is from a part of the trace corresponding // to kernel execution. @@ -1011,12 +987,17 @@ class scheduler_dynamic_tmpl_t : public scheduler_impl_tmpl_t::schedule_record_t; using typename scheduler_impl_tmpl_t::InputTimestampComparator; + using typename scheduler_impl_tmpl_t::workload_tid_t; using scheduler_impl_tmpl_t::options_; using scheduler_impl_tmpl_t::outputs_; using scheduler_impl_tmpl_t::inputs_; using scheduler_impl_tmpl_t::error_string_; using scheduler_impl_tmpl_t::unscheduled_priority_; using scheduler_impl_tmpl_t::set_cur_input; + using scheduler_impl_tmpl_t::acquire_scoped_output_lock_if_necessary; + using scheduler_impl_tmpl_t::get_output_time; + using scheduler_impl_tmpl_t::scale_blocked_time; protected: scheduler_status_t @@ -1031,13 +1012,46 @@ class scheduler_dynamic_tmpl_t : public scheduler_impl_tmpl_t inputs_to_add); + bool + ready_queue_empty(output_ordinal_t output); + + // "for_output" is which output stream is looking for a new input; only an + // input which is able to run on that output will be selected. + // for_output can be INVALID_OUTPUT_ORDINAL, which will ignore bindings. + // If from_output != for_output (including for_output == INVALID_OUTPUT_ORDINAL) + // this is a migration and only migration-ready inputs will be picked. + stream_status_t + pop_from_ready_queue(output_ordinal_t from_output, output_ordinal_t for_output, + input_info_t *&new_input); + + // Identical to pop_from_ready_queue but the caller must hold both output locks. + stream_status_t + pop_from_ready_queue_hold_locks(output_ordinal_t from_output, + output_ordinal_t for_output, input_info_t *&new_input, + bool from_back = false); + // Rebalancing coordination. std::atomic rebalancer_; std::atomic last_rebalance_time_; @@ -1078,6 +1092,10 @@ class scheduler_replay_tmpl_t : public scheduler_impl_tmpl_t::check_for_input_switch( return sched_type_t::STATUS_OK; } +template +typename scheduler_tmpl_t::stream_status_t +scheduler_replay_tmpl_t::eof_or_idle_for_mode( + output_ordinal_t output, input_ordinal_t prev_input) +{ + if (this->live_input_count_.load(std::memory_order_acquire) == 0 || + // While a full schedule recorded should have each input hit either its + // EOF or ROI end, we have a fallback to avoid hangs for possible recorded + // schedules that end an input early deliberately without an ROI. + (options_.mapping == sched_type_t::MAP_AS_PREVIOUSLY && + this->live_replay_output_count_.load(std::memory_order_acquire) == 0)) { + assert(options_.mapping != sched_type_t::MAP_AS_PREVIOUSLY || + outputs_[output].at_eof); + return sched_type_t::STATUS_EOF; + } + return sched_type_t::STATUS_IDLE; +} + template class scheduler_replay_tmpl_t; template class scheduler_replay_tmpl_t; diff --git a/clients/drcachesim/tests/scheduler_unit_tests.cpp b/clients/drcachesim/tests/scheduler_unit_tests.cpp index 586807c192d..082f64b89b3 100644 --- a/clients/drcachesim/tests/scheduler_unit_tests.cpp +++ b/clients/drcachesim/tests/scheduler_unit_tests.cpp @@ -2988,6 +2988,11 @@ class test_scheduler_base_t : public scheduler_impl_t { { return sched_type_t::STATUS_NOT_IMPLEMENTED; } + stream_status_t + eof_or_idle_for_mode(output_ordinal_t output, input_ordinal_t prev_input) override + { + return sched_type_t::STATUS_NOT_IMPLEMENTED; + } }; class test_scheduler_t : public test_scheduler_base_t { public: