Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
markoostveen committed Jan 23, 2024
1 parent 5fc7966 commit 9d56777
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 113 deletions.
8 changes: 7 additions & 1 deletion JobSystem/Src/JobSystem/AtomicMutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ namespace JbSystem
mutex(mutex&&) = delete;
mutex& operator=(const mutex&) = delete;
mutex& operator=(mutex&&) = delete;
~mutex() noexcept { unlock(); }
~mutex() noexcept
{
if (_flag.load())
{
unlock();
}
}

bool try_lock() noexcept
{
Expand Down
81 changes: 17 additions & 64 deletions JobSystem/Src/JobSystem/JobSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ namespace JbSystem
wasActive = false;
for (JobSystemWorker& worker : _workers)
{
if (!worker._isRunning.load())
if (!worker.IsRunning())
{
continue;
}
Expand Down Expand Up @@ -714,7 +714,9 @@ namespace JbSystem

// Wait for task to complete, allocate boolean on the heap because it's possible that we do not have access to our stack
auto* finished = new (location) std::atomic<bool>(false);
auto waitLambda = [finished]() { finished->store(true); };
auto waitLambda = [finished]() {
finished->store(true);
};

ScheduleAfterJobCompletion({jobId}, maximumHelpEffort, waitLambda);
int waitingPeriod = 0;
Expand Down Expand Up @@ -842,9 +844,7 @@ namespace JbSystem
continue;
}

worker._scheduledJobsMutex.lock();
totalJobs += static_cast<int>(worker._scheduledJobs.size());
worker._scheduledJobsMutex.unlock();
totalJobs += static_cast<int>(worker.ScheduledJobCount());
}

votedWorkers++; // Increase to include main
Expand Down Expand Up @@ -922,9 +922,9 @@ namespace JbSystem
{
auto& worker = _workers.at(i);

if (!worker.IsActive())
if (!worker.IsRunning() && !worker.IsActive())
{
worker._shutdownRequested.store(false);
worker.ReleaseShutdown();
worker.Start();
}
}
Expand Down Expand Up @@ -991,32 +991,14 @@ namespace JbSystem
for (size_t i = workerIndex; i < _workers.size(); i++)
{
auto& currentWorker = _workers.at(i);
currentWorker._jobsRequiringIgnoringMutex.lock();
for (const auto& jobWithIgnores : currentWorker._jobsRequiringIgnoring)
{
// Do not execute the proposed job if it's forbidden by other jobs currently being executed
if (jobWithIgnores->GetIgnoreCallback()(currentJob->GetId()))
{
currentWorker._jobsRequiringIgnoringMutex.unlock();
return false;
}
}
currentWorker._jobsRequiringIgnoringMutex.unlock();
if (currentWorker.IsJobIgnored(currentJob))
return false;
}
for (size_t i = 0; i < workerIndex; i++)
{
auto& currentWorker = _workers.at(i);
currentWorker._jobsRequiringIgnoringMutex.lock();
for (const auto& jobWithIgnores : currentWorker._jobsRequiringIgnoring)
{
// Do not execute the proposed job if it's forbidden by other jobs currently being executed
if (jobWithIgnores->GetIgnoreCallback()(currentJob->GetId()))
{
currentWorker._jobsRequiringIgnoringMutex.unlock();
return false;
}
}
currentWorker._jobsRequiringIgnoringMutex.unlock();
if (currentWorker.IsJobIgnored(currentJob))
return false;
}

return true;
Expand All @@ -1031,16 +1013,14 @@ namespace JbSystem
const IgnoreJobCallback& callback = currentJob->GetIgnoreCallback();
if (callback)
{
const std::scoped_lock<JbSystem::mutex> lock(worker._jobsRequiringIgnoringMutex);
worker._jobsRequiringIgnoring.emplace(currentJob);
worker.IgnoreJob(currentJob);
}

currentJob->Run();

if (callback)
{
const std::scoped_lock<JbSystem::mutex> lock(worker._jobsRequiringIgnoringMutex);
worker._jobsRequiringIgnoring.erase(currentJob);
worker.StopIgnoringJob(currentJob);
}

for (size_t i = 0; i < jobStack.size(); i++)
Expand All @@ -1057,23 +1037,16 @@ namespace JbSystem

void JobSystem::RunJobInNewThread(JobSystemWorker& worker, Job*& currentJob)
{
const JobId jobId = currentJob->GetId();
worker._pausedJobsMutex.lock();
worker._pausedJobs.emplace(jobId, JobSystemWorker::PausedJob(currentJob, worker));
worker._pausedJobsMutex.unlock();

worker.PauseJob(currentJob);

// Exit function when job was picked up in reasonal amount of time
std::chrono::high_resolution_clock::time_point startTimePoint = std::chrono::high_resolution_clock::now();
while(std::chrono::high_resolution_clock::now() - startTimePoint < std::chrono::microseconds(50))
{
worker._pausedJobsMutex.lock();
if (!worker._pausedJobs.contains(jobId))
if (!worker.IsJobPaused(currentJob))
{
worker._pausedJobsMutex.unlock();
return;
}
worker._pausedJobsMutex.unlock();
std::this_thread::yield();
}

Expand Down Expand Up @@ -1181,29 +1154,9 @@ namespace JbSystem
MaybeHelpLowerQueue(priority);
}

const JobId& id = newJob->GetId();

worker._modifyingThread.lock();
worker._scheduledJobsMutex.lock();
assert(worker._scheduledJobs.contains(id.ID()));

if (priority == JobPriority::High)
{
worker._highPriorityTaskQueue.emplace_back(newJob);
}

else if (priority == JobPriority::Normal)
{
worker._normalPriorityTaskQueue.emplace_back(newJob);
}

else if (priority == JobPriority::Low)
{
worker._lowPriorityTaskQueue.emplace_back(newJob);
}
JobId id = newJob->GetId();

worker._modifyingThread.unlock();
worker._scheduledJobsMutex.unlock();
worker.Schedule(newJob, priority);

MaybeOptimize();

Expand Down
Loading

0 comments on commit 9d56777

Please sign in to comment.