From 9d56777ed3962b7d447958439f329056b629ea1b Mon Sep 17 00:00:00 2001 From: Mark Oostveen Date: Wed, 24 Jan 2024 00:18:54 +0100 Subject: [PATCH] WIP --- JobSystem/Src/JobSystem/AtomicMutex.h | 8 +- JobSystem/Src/JobSystem/JobSystem.cpp | 81 +++------- JobSystem/Src/JobSystem/WorkerThread.cpp | 183 +++++++++++++++++------ JobSystem/Src/JobSystem/WorkerThread.h | 13 +- 4 files changed, 172 insertions(+), 113 deletions(-) diff --git a/JobSystem/Src/JobSystem/AtomicMutex.h b/JobSystem/Src/JobSystem/AtomicMutex.h index 1cff738..c908b6b 100644 --- a/JobSystem/Src/JobSystem/AtomicMutex.h +++ b/JobSystem/Src/JobSystem/AtomicMutex.h @@ -12,7 +12,13 @@ namespace JbSystem mutex(mutex&&) = delete; mutex& operator=(const mutex&) = delete; mutex& operator=(mutex&&) = delete; - ~mutex() noexcept { unlock(); } + ~mutex() noexcept + { + if (_flag.load()) + { + unlock(); + } + } bool try_lock() noexcept { diff --git a/JobSystem/Src/JobSystem/JobSystem.cpp b/JobSystem/Src/JobSystem/JobSystem.cpp index 200eeac..a327da9 100644 --- a/JobSystem/Src/JobSystem/JobSystem.cpp +++ b/JobSystem/Src/JobSystem/JobSystem.cpp @@ -175,7 +175,7 @@ namespace JbSystem wasActive = false; for (JobSystemWorker& worker : _workers) { - if (!worker._isRunning.load()) + if (!worker.IsRunning()) { continue; } @@ -714,7 +714,9 @@ namespace JbSystem // Wait for task to complete, allocate boolean on the heap because it's possible that we do not have access to our stack auto* finished = new (location) std::atomic(false); - auto waitLambda = [finished]() { finished->store(true); }; + auto waitLambda = [finished]() { + finished->store(true); + }; ScheduleAfterJobCompletion({jobId}, maximumHelpEffort, waitLambda); int waitingPeriod = 0; @@ -842,9 +844,7 @@ namespace JbSystem continue; } - worker._scheduledJobsMutex.lock(); - totalJobs += static_cast(worker._scheduledJobs.size()); - worker._scheduledJobsMutex.unlock(); + totalJobs += static_cast(worker.ScheduledJobCount()); } votedWorkers++; // Increase to include main @@ -922,9 +922,9 @@ namespace JbSystem { auto& worker = _workers.at(i); - if (!worker.IsActive()) + if (!worker.IsRunning() && !worker.IsActive()) { - worker._shutdownRequested.store(false); + worker.ReleaseShutdown(); worker.Start(); } } @@ -991,32 +991,14 @@ namespace JbSystem for (size_t i = workerIndex; i < _workers.size(); i++) { auto& currentWorker = _workers.at(i); - currentWorker._jobsRequiringIgnoringMutex.lock(); - for (const auto& jobWithIgnores : currentWorker._jobsRequiringIgnoring) - { - // Do not execute the proposed job if it's forbidden by other jobs currently being executed - if (jobWithIgnores->GetIgnoreCallback()(currentJob->GetId())) - { - currentWorker._jobsRequiringIgnoringMutex.unlock(); - return false; - } - } - currentWorker._jobsRequiringIgnoringMutex.unlock(); + if (currentWorker.IsJobIgnored(currentJob)) + return false; } for (size_t i = 0; i < workerIndex; i++) { auto& currentWorker = _workers.at(i); - currentWorker._jobsRequiringIgnoringMutex.lock(); - for (const auto& jobWithIgnores : currentWorker._jobsRequiringIgnoring) - { - // Do not execute the proposed job if it's forbidden by other jobs currently being executed - if (jobWithIgnores->GetIgnoreCallback()(currentJob->GetId())) - { - currentWorker._jobsRequiringIgnoringMutex.unlock(); - return false; - } - } - currentWorker._jobsRequiringIgnoringMutex.unlock(); + if (currentWorker.IsJobIgnored(currentJob)) + return false; } return true; @@ -1031,16 +1013,14 @@ namespace JbSystem const IgnoreJobCallback& callback = currentJob->GetIgnoreCallback(); if (callback) { - const std::scoped_lock lock(worker._jobsRequiringIgnoringMutex); - worker._jobsRequiringIgnoring.emplace(currentJob); + worker.IgnoreJob(currentJob); } currentJob->Run(); if (callback) { - const std::scoped_lock lock(worker._jobsRequiringIgnoringMutex); - worker._jobsRequiringIgnoring.erase(currentJob); + worker.StopIgnoringJob(currentJob); } for (size_t i = 0; i < jobStack.size(); i++) @@ -1057,23 +1037,16 @@ namespace JbSystem void JobSystem::RunJobInNewThread(JobSystemWorker& worker, Job*& currentJob) { - const JobId jobId = currentJob->GetId(); - worker._pausedJobsMutex.lock(); - worker._pausedJobs.emplace(jobId, JobSystemWorker::PausedJob(currentJob, worker)); - worker._pausedJobsMutex.unlock(); - + worker.PauseJob(currentJob); // Exit function when job was picked up in reasonal amount of time std::chrono::high_resolution_clock::time_point startTimePoint = std::chrono::high_resolution_clock::now(); while(std::chrono::high_resolution_clock::now() - startTimePoint < std::chrono::microseconds(50)) { - worker._pausedJobsMutex.lock(); - if (!worker._pausedJobs.contains(jobId)) + if (!worker.IsJobPaused(currentJob)) { - worker._pausedJobsMutex.unlock(); return; } - worker._pausedJobsMutex.unlock(); std::this_thread::yield(); } @@ -1181,29 +1154,9 @@ namespace JbSystem MaybeHelpLowerQueue(priority); } - const JobId& id = newJob->GetId(); - - worker._modifyingThread.lock(); - worker._scheduledJobsMutex.lock(); - assert(worker._scheduledJobs.contains(id.ID())); - - if (priority == JobPriority::High) - { - worker._highPriorityTaskQueue.emplace_back(newJob); - } - - else if (priority == JobPriority::Normal) - { - worker._normalPriorityTaskQueue.emplace_back(newJob); - } - - else if (priority == JobPriority::Low) - { - worker._lowPriorityTaskQueue.emplace_back(newJob); - } + JobId id = newJob->GetId(); - worker._modifyingThread.unlock(); - worker._scheduledJobsMutex.unlock(); + worker.Schedule(newJob, priority); MaybeOptimize(); diff --git a/JobSystem/Src/JobSystem/WorkerThread.cpp b/JobSystem/Src/JobSystem/WorkerThread.cpp index c2da50d..6f7b870 100644 --- a/JobSystem/Src/JobSystem/WorkerThread.cpp +++ b/JobSystem/Src/JobSystem/WorkerThread.cpp @@ -227,6 +227,11 @@ namespace JbSystem _shutdownRequested.store(true); } + void JobSystemWorker::ReleaseShutdown() + { + _shutdownRequested.store(false); + } + bool JobSystemWorker::Busy() const { return _isBusy.load(std::memory_order_acquire); @@ -304,6 +309,11 @@ namespace JbSystem return Active.load(std::memory_order_acquire); } + bool JobSystemWorker::IsRunning() const + { + return _isRunning.load(); + } + void JobSystemWorker::WaitForShutdown() { const std::unique_lock ul(_isRunningMutex); @@ -316,21 +326,33 @@ namespace JbSystem return; } + if (std::this_thread::get_id() == _worker.get_id()) + { + return; + } + _modifyingThread.lock(); -#ifndef JBSYSTEM_KEEP_ALIVE - if (IsActive() || _shutdownRequested.load()) + if (IsRunning() || IsActive() || _shutdownRequested.load()) { _modifyingThread.unlock(); return; } +#ifndef JBSYSTEM_KEEP_ALIVE + if (_worker.get_id() != std::thread::id()) { if (_worker.joinable()) + { + _modifyingThread.unlock(); _worker.join(); + _modifyingThread.lock(); + } else + { _worker.detach(); + } } _shutdownRequested.store(false); @@ -409,6 +431,7 @@ namespace JbSystem } JobSystem::RunJob(executingWorker, pausedJob.AffectedJob); + return true; } @@ -418,7 +441,7 @@ namespace JbSystem Job* JobSystemWorker::TryTakeJob(const JobPriority& maxTimeInvestment) { - if (!_modifyingThread.try_lock()) + if (!_jobsMutex.try_lock()) { return nullptr; } @@ -429,8 +452,8 @@ namespace JbSystem { Job* value = _highPriorityTaskQueue.front(); _highPriorityTaskQueue.pop_front(); + _jobsMutex.unlock(); assert(IsJobScheduled(value->GetId())); - _modifyingThread.unlock(); return value; } } @@ -441,8 +464,8 @@ namespace JbSystem { Job* value = _normalPriorityTaskQueue.front(); _normalPriorityTaskQueue.pop_front(); + _jobsMutex.unlock(); assert(IsJobScheduled(value->GetId())); - _modifyingThread.unlock(); return value; } } @@ -453,13 +476,13 @@ namespace JbSystem { Job* value = _lowPriorityTaskQueue.front(); _lowPriorityTaskQueue.pop_front(); + _jobsMutex.unlock(); assert(IsJobScheduled(value->GetId())); - _modifyingThread.unlock(); return value; } } - _modifyingThread.unlock(); + _jobsMutex.unlock(); return nullptr; } @@ -467,7 +490,7 @@ namespace JbSystem { const int& id = jobId.ID(); - const std::scoped_lock lock(_modifyingThread); + const std::scoped_lock lock(_jobsMutex); for (const auto& highPriorityJob : _highPriorityTaskQueue) { if (highPriorityJob->GetId().ID() == id) @@ -497,10 +520,8 @@ namespace JbSystem size_t JobSystemWorker::ScheduledJobCount() { - _scheduledJobsMutex.lock(); - const size_t scheduledCount = _scheduledJobs.size(); - _scheduledJobsMutex.unlock(); - return scheduledCount; + std::scoped_lock scheduledJobsLock(_scheduledJobsMutex); + return _scheduledJobs.size(); } void JobSystemWorker::UnScheduleJob(const JobId& previouslyScheduledJob) @@ -508,24 +529,26 @@ namespace JbSystem assert(!IsJobInQueue(previouslyScheduledJob)); // In case the task is still scheduled then it wasn't removed properly const int& id = previouslyScheduledJob.ID(); - _modifyingThread.lock(); - _scheduledJobsMutex.lock(); - assert(_scheduledJobs.contains(id)); - _scheduledJobs.erase(id); - _scheduledJobsMutex.unlock(); - _modifyingThread.unlock(); + + // From this point we need some locking + { + std::scoped_lock scheduledJobsLock(_scheduledJobsMutex); + assert(_scheduledJobs.contains(id)); + _scheduledJobs.erase(id); + } } void JobSystemWorker::ScheduleJob(const JobId& jobId) { const int& id = jobId.ID(); - _modifyingThread.lock(); - _scheduledJobsMutex.lock(); - assert(!_scheduledJobs.contains(id)); - _scheduledJobs.emplace(id); - _scheduledJobsMutex.unlock(); - _modifyingThread.unlock(); + + // From this point we need some locking + { + std::scoped_lock scheduledJobsLock(_scheduledJobsMutex); + assert(!_scheduledJobs.contains(id)); + _scheduledJobs.emplace(id); + } } bool JobSystemWorker::GiveJob(Job* const& newJob, const JobPriority& priority) @@ -535,44 +558,72 @@ namespace JbSystem return false; } - _modifyingThread.lock(); - assert(_scheduledJobs.contains(newJob->GetId().ID())); - - if (priority == JobPriority::High) + // From this point we need locking for modifying thread { - _highPriorityTaskQueue.emplace_back(newJob); - } + std::scoped_lock modifyingLock(_jobsMutex); + assert(_scheduledJobs.contains(newJob->GetId().ID())); - else if (priority == JobPriority::Normal) - { - _normalPriorityTaskQueue.emplace_back(newJob); - } + if (priority == JobPriority::High) + { + _highPriorityTaskQueue.emplace_back(newJob); + } - else if (priority == JobPriority::Low) - { - _lowPriorityTaskQueue.emplace_back(newJob); + else if (priority == JobPriority::Normal) + { + _normalPriorityTaskQueue.emplace_back(newJob); + } + + else if (priority == JobPriority::Low) + { + _lowPriorityTaskQueue.emplace_back(newJob); + } } - _modifyingThread.unlock(); return true; } void JobSystemWorker::GiveFutureJob(const JobId& jobId) { - _scheduledJobsMutex.lock(); + std::scoped_lock lock(_scheduledJobsMutex); _scheduledJobs.emplace(jobId.ID()); - _scheduledJobsMutex.unlock(); } void JobSystemWorker::GiveFutureJobs(const std::vector& newjobs, int startIndex, int size) { - _scheduledJobsMutex.lock(); + std::scoped_lock lock(_scheduledJobsMutex); for (int i = 0; i < size; i++) { _scheduledJobs.emplace(newjobs[startIndex + i]->GetId().ID()); } - _scheduledJobsMutex.unlock(); + } + + void JobSystemWorker::Schedule(Job* newJob, const JobPriority& priority) + { + #ifndef NDEBUG + // In Debug builds ensure that the to be scheduled job isn't scheduled + { + std::scoped_lock scheduledJobsLock(_scheduledJobsMutex); + assert(_scheduledJobs.contains(newJob->GetId().ID())); + + } + #endif + std::scoped_lock modifyingLock(_jobsMutex); + + if (priority == JobPriority::High) + { + _highPriorityTaskQueue.emplace_back(newJob); + } + + else if (priority == JobPriority::Normal) + { + _normalPriorityTaskQueue.emplace_back(newJob); + } + + else if (priority == JobPriority::Low) + { + _lowPriorityTaskQueue.emplace_back(newJob); + } } void JobSystemWorker::FinishJob(Job*& job) @@ -589,10 +640,50 @@ namespace JbSystem bool JobSystemWorker::IsJobScheduled(const JobId& jobId) { - _scheduledJobsMutex.lock(); - const bool contains = _scheduledJobs.contains(jobId.ID()); - _scheduledJobsMutex.unlock(); - return contains; + std::scoped_lock lock(_scheduledJobsMutex); + return _scheduledJobs.contains(jobId.ID()); + } + + bool JobSystemWorker::IsJobIgnored(Job* job) + { + std::scoped_lock lock(_jobsRequiringIgnoringMutex); + for (const auto& jobWithIgnores : _jobsRequiringIgnoring) + { + // Do not execute the proposed job if it's forbidden by other jobs currently being executed + if (jobWithIgnores->GetIgnoreCallback()(job->GetId())) + { + return true; + } + } + return false; + } + + void JobSystemWorker::IgnoreJob(Job* job) + { + const std::scoped_lock lock(_jobsRequiringIgnoringMutex); + _jobsRequiringIgnoring.emplace(job); + } + + void JobSystemWorker::StopIgnoringJob(Job* job) + { + const std::scoped_lock lock(_jobsRequiringIgnoringMutex); + _jobsRequiringIgnoring.erase(job); + } + + void JobSystemWorker::PauseJob(Job* job) + { + std::scoped_lock lock(_pausedJobsMutex); + _pausedJobs.emplace(job->GetId(), JobSystemWorker::PausedJob(job, *this)); + } + + bool JobSystemWorker::IsJobPaused(Job* job) + { + std::scoped_lock lock(_pausedJobsMutex); + if (!_pausedJobs.contains(job->GetId())) + { + return false; + } + return true; } JobSystemWorker::PausedJob::PausedJob(Job* affectedJob, JobSystemWorker& worker) : AffectedJob(affectedJob), Worker(worker) diff --git a/JobSystem/Src/JobSystem/WorkerThread.h b/JobSystem/Src/JobSystem/WorkerThread.h index c16c25d..b55ddb8 100644 --- a/JobSystem/Src/JobSystem/WorkerThread.h +++ b/JobSystem/Src/JobSystem/WorkerThread.h @@ -16,8 +16,6 @@ namespace JbSystem class JobSystemWorker { - friend class JobSystem; - public: JobSystemWorker() = delete; JobSystemWorker(JobSystemWorker&&) noexcept; @@ -33,6 +31,7 @@ namespace JbSystem /// /// bool IsActive() const; + bool IsRunning() const; void WaitForShutdown(); void Start(); // Useful when thread became lost for some reason int WorkerId(); @@ -58,6 +57,8 @@ namespace JbSystem void GiveFutureJob(const JobId& jobId); void GiveFutureJobs(const std::vector& newjobs, int startIndex, int size); + void Schedule(Job* newJob, const JobPriority& priority); + /// /// Finishes job and cleans up after /// @@ -65,6 +66,12 @@ namespace JbSystem void FinishJob(Job*& job); bool IsJobScheduled(const JobId& jobId); + bool IsJobIgnored(Job* job); + void IgnoreJob(Job* job); + void StopIgnoringJob(Job* job); + + void PauseJob(Job* job); + bool IsJobPaused(Job* job); void ThreadLoop(); @@ -82,6 +89,7 @@ namespace JbSystem void CompleteAnalyticsTick(); void RequestShutdown(); + void ReleaseShutdown(); // Is the read suppost to be active std::atomic Active; @@ -114,6 +122,7 @@ namespace JbSystem std::atomic _isBusy; JbSystem::mutex _modifyingThread; + JbSystem::mutex _jobsMutex; JbSystem::mutex _scheduledJobsMutex; JbSystem::mutex _isRunningMutex; JbSystem::mutex _jobsRequiringIgnoringMutex; // DeadLock prevention