Skip to content

Commit

Permalink
Remove boost dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
markoostveen committed Aug 15, 2024
1 parent c4422a8 commit a5e799a
Show file tree
Hide file tree
Showing 11 changed files with 187 additions and 59 deletions.
2 changes: 1 addition & 1 deletion JobSystem/Benchmarks/FunctionCallBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ double CallMultiJobHeapWorker()
}

int runIndex = 0;
JbSystem::mutex printMutex;
JbSystem::Mutex printMutex;

void SimpleCallBenchmark()
{
Expand Down
2 changes: 1 addition & 1 deletion JobSystem/Benchmarks/ScalingBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ long long RunBenchmark()
auto masterJobs = std::make_shared<std::vector<JobId>>();
masterJobs->reserve(MasterJobs);

auto emplaceMutex = std::make_shared<JbSystem::mutex>();
auto emplaceMutex = std::make_shared<JbSystem::Mutex>();

auto scheduleJobs = JobSystem::CreateParallelJob(
0, MasterJobs, 1,
Expand Down
7 changes: 1 addition & 6 deletions JobSystem/Src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,4 @@ endif()
target_include_directories(JobSystem PUBLIC ${JobSystemHeaderDirectory})
set_property(TARGET JobSystem PROPERTY CXX_STANDARD 23)

target_link_libraries(JobSystem PRIVATE Threads::Threads)

find_package(Boost REQUIRED)
target_include_directories(JobSystem PUBLIC ${Boost_INCLUDE_DIRS})

#target_compile_definitions(JobSystem PUBLIC DEBUG)
target_link_libraries(JobSystem PRIVATE Threads::Threads)
14 changes: 7 additions & 7 deletions JobSystem/Src/JobSystem/AtomicMutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@

namespace JbSystem
{
class mutex
class Mutex
{
public:
mutex() : _flag(false) {}
mutex(const mutex&) = delete;
mutex(mutex&&) = delete;
mutex& operator=(const mutex&) = delete;
mutex& operator=(mutex&&) = delete;
~mutex() noexcept { unlock(); }
Mutex() : _flag(false) {}
Mutex(const Mutex&) = delete;
Mutex(Mutex&&) = delete;
Mutex& operator=(const Mutex&) = delete;
Mutex& operator=(Mutex&&) = delete;
~Mutex() noexcept { unlock(); }

bool try_lock() noexcept
{
Expand Down
74 changes: 49 additions & 25 deletions JobSystem/Src/JobSystem/JobSystem.cpp
Original file line number Diff line number Diff line change
@@ -1,36 +1,64 @@
#include "JobSystem.h"

#include <algorithm>
#include <functional>
#include <cstdint>
#include <iostream>

#include "boost/container/small_vector.hpp"
#include <boost/range/adaptor/reversed.hpp>

#include <string>
#include <ranges>

namespace JbSystem
{

// Thread locals
static thread_local std::uint16_t randomWorkerIndex;


// Prevent wild recursion patterns
const int maxThreadDepth = 5;
using JobStack = std::array<const Job*, maxThreadDepth + 1>;

static thread_local unsigned int threadDepth = 0; // recursion guard, threads must not be able to infinitely go into scopes
static thread_local boost::container::small_vector<const Job*, sizeof(const Job*) * maxThreadDepth>
jobStack; // stack of all jobs our current thread is executing
static thread_local JobStack jobStack; // stack of all jobs our current thread is executing
static thread_local uint8_t jobStackSize = 0; // To track the current size of the stack
static thread_local bool allowedToLowerQueue = true;
static thread_local unsigned int maybeLowerWorkDepth = 0;

// Control Optimization cycles
const int maxOptimizeInCycles = maxThreadDepth * 10;
static thread_local int optimizeInCycles = 0;


void jobStackPushJob(const Job* job) {
if (jobStackSize < maxThreadDepth) {
jobStack[jobStackSize++] = job;
} else {
// Handle stack overflow if necessary
// For example, throw an exception or log an error
}
}

void jobStackPopJob() {
if (jobStackSize > 0) {
--jobStackSize;
} else {
// Handle stack underflow if necessary
// For example, throw an exception or log an error
}
}

const Job* jobStackCurrentJob() {
if (jobStackSize > 0) {
return jobStack[jobStackSize - 1];
}
return nullptr; // No job currently being executed
}

bool JobInStack(const JobId& jobId)
{
for (const auto& job : jobStack)
for(uint8_t i = 0; i < jobStackSize; i++)
{
const auto& job = jobStack[i];

if (job->GetId() == jobId)
{
return true;
Expand All @@ -41,8 +69,10 @@ namespace JbSystem

bool IsProposedJobIgnoredByJobStack(const JobId& proposedJob)
{
for (const auto& job : jobStack)
for(uint8_t i = 0; i < jobStackSize; i++)
{
const auto& job = jobStack[i];

if (job->GetIgnoreCallback() == nullptr)
{
continue;
Expand Down Expand Up @@ -233,7 +263,7 @@ namespace JbSystem
ExecuteJob(JobPriority::Low); // Help complete the remaining jobs

wasActive = false;
for (JobSystemWorker& worker : boost::adaptors::reverse(_workers))
for (JobSystemWorker& worker : _workers | std::ranges::views::reverse)
{
if (!worker.IsActive())
{
Expand Down Expand Up @@ -367,9 +397,9 @@ namespace JbSystem
struct VoidJobTag
{
};
void* location = boost::singleton_pool<VoidJobTag, sizeof(JobSystemVoidJob)>::malloc();
void* location = MemoryPool<VoidJobTag, JobSystemVoidJob>::Get().Alloc();
auto destructorCallback = [](JobSystemVoidJob* const& job)
{ boost::singleton_pool<VoidJobTag, sizeof(JobSystemVoidJob)>::free(job); };
{ MemoryPool<VoidJobTag, JobSystemVoidJob>::Get().Free(job); };

return new (location) JobSystemVoidJob(function, destructorCallback);
}
Expand Down Expand Up @@ -616,7 +646,7 @@ namespace JbSystem
struct FinishedTag
{
};
void* location = boost::singleton_pool<FinishedTag, sizeof(std::atomic<bool>)>::malloc();
void* location = MemoryPool<FinishedTag, std::atomic<bool>>::Get().Alloc();

// Wait for task to complete, allocate boolean on the heap because it's possible that we do not have access to our stack
auto* finished = new (location) std::atomic<bool>(false);
Expand Down Expand Up @@ -660,7 +690,7 @@ namespace JbSystem
}
}

boost::singleton_pool<FinishedTag, sizeof(std::atomic<bool>)>::free(finished);
MemoryPool<FinishedTag, std::atomic<bool>>::Get().Free(finished);
threadDepth++;
}

Expand Down Expand Up @@ -920,31 +950,25 @@ namespace JbSystem
{
assert(!JobInStack(currentJob->GetId()));

jobStack.emplace_back(currentJob);

jobStackPushJob(currentJob);

const IgnoreJobCallback& callback = currentJob->GetIgnoreCallback();
if (callback)
{
const std::scoped_lock<JbSystem::mutex> lock(worker._jobsRequiringIgnoringMutex);
const std::scoped_lock<JbSystem::Mutex> lock(worker._jobsRequiringIgnoringMutex);
worker._jobsRequiringIgnoring.emplace(currentJob);
}

currentJob->Run();

if (callback)
{
const std::scoped_lock<JbSystem::mutex> lock(worker._jobsRequiringIgnoringMutex);
const std::scoped_lock<JbSystem::Mutex> lock(worker._jobsRequiringIgnoringMutex);
worker._jobsRequiringIgnoring.erase(currentJob);
}

for (size_t i = 0; i < jobStack.size(); i++)
{
if (jobStack.at(i)->GetId() == currentJob->GetId())
{
jobStack.erase(jobStack.begin() + i);
break;
}
}
jobStackPopJob();

worker.FinishJob(currentJob);
}
Expand Down
18 changes: 9 additions & 9 deletions JobSystem/Src/JobSystem/JobSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

#include "Job.h"
#include "WorkerThread.h"
#include "MemoryPool.h"

#include "boost/pool/singleton_pool.hpp"

#include <array>
#include <atomic>
Expand Down Expand Up @@ -249,7 +249,7 @@ namespace JbSystem
int _workerCount = 0;
std::vector<JobSystemWorker> _workers;

JbSystem::mutex _optimizePerformance;
JbSystem::Mutex _optimizePerformance;

const int _maxJobExecutionsBeforePerformanceOptimization = 10;
std::atomic<int> _jobExecutionsTillOptimization = _maxJobExecutionsBeforePerformanceOptimization;
Expand All @@ -259,7 +259,7 @@ namespace JbSystem
std::atomic<bool> _showStats;

// Deadlock prevention
JbSystem::mutex _spawnedThreadsMutex;
JbSystem::Mutex _spawnedThreadsMutex;
std::unordered_map<std::thread::id, std::thread> _spawnedThreadsExecutingIgnoredJobs;
};

Expand All @@ -269,15 +269,15 @@ namespace JbSystem
{
using FunctionType = std::remove_const_t<std::remove_reference_t<decltype(function)>>;

void* location = boost::singleton_pool <
void* location = MemoryPool<
typename JobSystemWithParametersJob<FunctionType, Args...>::Tag,
sizeof(JobSystemWithParametersJob<FunctionType, Args...>)>::malloc();
JobSystemWithParametersJob<FunctionType, Args...>>::Get().Alloc();
auto deconstructorCallback = [](JobSystemWithParametersJob<FunctionType, Args...>* const& job)
{
job->~JobSystemWithParametersJob();
boost::singleton_pool<
MemoryPool<
typename JobSystemWithParametersJob<FunctionType, Args...>::Tag,
sizeof(JobSystemWithParametersJob<FunctionType, Args...>)>::free(
JobSystemWithParametersJob<FunctionType, Args...>>::Get().Free(
job);
};
return new (location) JobSystemWithParametersJob<FunctionType, Args...>(
Expand Down Expand Up @@ -427,10 +427,10 @@ namespace JbSystem
callback->Run();
callback->Free();
dependencies->~vector();
boost::singleton_pool<DependenciesTag, sizeof(std::vector<JobId>)>::free(dependencies);
MemoryPool<DependenciesTag, std::vector<JobId>>::Get().Free(dependencies);
};

void* location = boost::singleton_pool<DependenciesTag, sizeof(std::vector<JobId>)>::malloc();
void* location = MemoryPool<DependenciesTag, std::vector<JobId>>::Get().Alloc();
auto* jobDependencies = new (location) std::vector<JobId>({dependencies});

Job* callbackJob = JobSystem::CreateJobWithParams(function, std::forward<Args>(args)...);
Expand Down
112 changes: 112 additions & 0 deletions JobSystem/Src/JobSystem/MemoryPool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#pragma once

#include "AtomicMutex.h"

#include <mutex>
#include <thread>

#define DEFAULT_MEMPOOL_SIZE 16

namespace JbSystem{

template <typename Tag/* Used to create more memory pools local to a specific combination of parameters, this will reduce pressure */, typename T> struct MemoryPool
{
private:
uint32_t size;
uint32_t capacity;

// Contains large memory blocks
// This is where the actual memory lives.
std::vector<T*> markers;

// Contains the available memory addresses
// When we need a new memory address, we will
// select the address available at the top of this stack.
T** memstack;

JbSystem::Mutex mutex;

public:
MemoryPool(uint32_t size = DEFAULT_MEMPOOL_SIZE) :
size(0),
capacity(std::max(size, (uint32_t)DEFAULT_MEMPOOL_SIZE))
{
T* new_block = (T*)calloc(capacity, sizeof(T));
markers.push_back(new_block);

memstack = (T**)calloc(capacity, sizeof(T*));
// Fill the stack with the available memory addresses.
for (uint32_t i = 0; i < capacity; i++)
{
memstack[i] = new_block + i;
}
}

~MemoryPool()
{
for (uint32_t i = 0; i < markers.size(); i++)
{
free(markers[i]);
}

free(memstack);
}

T* Alloc()
{
std::lock_guard lock(mutex);
if (size == capacity)
{
// Free the old stack of addresses
free(memstack);

// Allocated memory has filled, reallocate memory
memstack = (T**)calloc(2 * capacity, sizeof(T*));

T* newBlock = (T*)calloc(capacity, sizeof(T));

// Keep track of the large memory blocks for destructor
markers.push_back(newBlock);

// Record the newly available addresses in the stack
// Note that we don't care about the older addresses
// since they are already allocated and given out.
for (uint32_t i = 0; i < capacity; i++)
{
memstack[capacity + i] = newBlock + i;
}

capacity *= 2;
}

T* next = memstack[size++];
memset(next, 0, sizeof(T));
return next;
}

void Free(T* mem)
{
std::lock_guard lock(mutex);

// mem location is now available
// Add that at the top of the stack
memstack[--size] = mem;
}

int Size() {
std::lock_guard lock(mutex);

return size;
}
int Capacity() {
std::lock_guard lock(mutex);

return capacity;
}

static MemoryPool& Get(){
static MemoryPool pool;
return pool;
}
};
}
2 changes: 1 addition & 1 deletion JobSystem/Src/JobSystem/WorkerThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ namespace JbSystem
{

const int& id = jobId.ID();
const std::scoped_lock<JbSystem::mutex> lock(_modifyingThread);
const std::scoped_lock<JbSystem::Mutex> lock(_modifyingThread);
for (const auto& highPriorityJob : _highPriorityTaskQueue)
{
if (highPriorityJob->GetId().ID() == id)
Expand Down
Loading

0 comments on commit a5e799a

Please sign in to comment.