From e7871e3a9062e74a5b06dc7359e5a79b9ab4c2f9 Mon Sep 17 00:00:00 2001 From: philipportner Date: Thu, 31 Oct 2024 21:50:30 +0100 Subject: [PATCH 1/7] [#731] cleanup queue mode magic numbers --- src/runtime/local/vectorized/MTWrapper.h | 13 ++++++------- src/runtime/local/vectorized/MTWrapper_dense.cpp | 2 +- src/runtime/local/vectorized/WorkerCPU.h | 14 +++++++------- 3 files changed, 14 insertions(+), 15 deletions(-) diff --git a/src/runtime/local/vectorized/MTWrapper.h b/src/runtime/local/vectorized/MTWrapper.h index 81d1883a7..ff93cd8f2 100644 --- a/src/runtime/local/vectorized/MTWrapper.h +++ b/src/runtime/local/vectorized/MTWrapper.h @@ -51,9 +51,7 @@ template class MTWrapperBase { size_t _numThreads{}; uint32_t _numCPPThreads{}; uint32_t _numCUDAThreads{}; - int _queueMode; - // _queueMode 0: Centralized queue for all workers, 1: One queue for every - // physical ID (socket), 2: One queue per CPU + QueueTypeOption _queueMode; int _numQueues; int _stealLogic; int _totalNumaDomains; @@ -120,7 +118,8 @@ template class MTWrapperBase { } void initCPPWorkers(std::vector &qvector, uint32_t batchSize, const bool verbose = false, - int numQueues = 0, int queueMode = 0, bool pinWorkers = false) { + int numQueues = 0, QueueTypeOption queueMode = QueueTypeOption::CENTRALIZED, + bool pinWorkers = false) { cpp_workers.resize(_numCPPThreads); if (numQueues == 0) { throw std::runtime_error("MTWrapper::initCPPWorkers: numQueues is " @@ -205,7 +204,7 @@ template class MTWrapperBase { if (ctx && ctx->useCUDA() && numFunctions > 1) _numCUDAThreads = ctx->cuda_contexts.size(); - _queueMode = 0; + _queueMode = QueueTypeOption::CENTRALIZED; _numQueues = 1; _stealLogic = _ctx->getUserConfig().victimSelection; if (std::thread::hardware_concurrency() < topologyUniqueThreads.size() && _ctx->config.hyperthreadingEnabled) @@ -214,10 +213,10 @@ template class MTWrapperBase { _totalNumaDomains = std::set(topologyPhysicalIds.begin(), topologyPhysicalIds.end()).size(); if (_ctx->getUserConfig().queueSetupScheme == PERGROUP) { - _queueMode = 1; + _queueMode = QueueTypeOption::PERGROUP; _numQueues = _totalNumaDomains; } else if (_ctx->getUserConfig().queueSetupScheme == PERCPU) { - _queueMode = 2; + _queueMode = QueueTypeOption::PERCPU; _numQueues = _numCPPThreads; } diff --git a/src/runtime/local/vectorized/MTWrapper_dense.cpp b/src/runtime/local/vectorized/MTWrapper_dense.cpp index ffed73054..2ff28d4b0 100644 --- a/src/runtime/local/vectorized/MTWrapper_dense.cpp +++ b/src/runtime/local/vectorized/MTWrapper_dense.cpp @@ -37,7 +37,7 @@ template std::vector tmp_q{q.get()}; auto batchSize8M = std::max(100ul, static_cast(std::ceil(8388608 / row_mem))); - this->initCPPWorkers(tmp_q, batchSize8M, verbose, 1, 0, false); + this->initCPPWorkers(tmp_q, batchSize8M, verbose, 1, QueueTypeOption::CENTRALIZED, false); #ifdef USE_CUDA if (this->_numCUDAThreads) { diff --git a/src/runtime/local/vectorized/WorkerCPU.h b/src/runtime/local/vectorized/WorkerCPU.h index fda1cc73c..7ec61728b 100644 --- a/src/runtime/local/vectorized/WorkerCPU.h +++ b/src/runtime/local/vectorized/WorkerCPU.h @@ -31,7 +31,7 @@ class WorkerCPU : public Worker { uint32_t _batchSize; int _threadID; int _numQueues; - int _queueMode; + QueueTypeOption _queueMode; int _stealLogic; bool _pinWorkers; @@ -39,7 +39,7 @@ class WorkerCPU : public Worker { // ToDo: remove compile-time verbose parameter and use logger WorkerCPU(std::vector deques, std::vector physical_ids, std::vector unique_threads, DCTX(dctx), bool verbose, uint32_t fid = 0, uint32_t batchSize = 100, int threadID = 0, int numQueues = 0, - int queueMode = 0, int stealLogic = 0, bool pinWorkers = 0) + QueueTypeOption queueMode = QueueTypeOption::CENTRALIZED, int stealLogic = 0, bool pinWorkers = 0) : Worker(dctx), _q(deques), _physical_ids(physical_ids), _unique_threads(unique_threads), _verbose(verbose), _fid(fid), _batchSize(batchSize), _threadID(threadID), _numQueues(numQueues), _queueMode(queueMode), _stealLogic(stealLogic), _pinWorkers(pinWorkers) { @@ -60,11 +60,11 @@ class WorkerCPU : public Worker { int currentDomain = _physical_ids[_threadID]; int targetQueue = _threadID; - if (_queueMode == 0) { + if (_queueMode == QueueTypeOption::CENTRALIZED) { targetQueue = 0; - } else if (_queueMode == 1) { + } else if (_queueMode == QueueTypeOption::PERGROUP) { targetQueue = currentDomain; - } else if (_queueMode == 2) { + } else if (_queueMode == QueueTypeOption::PERCPU) { targetQueue = _threadID; } else { ctx->logger->error("WorkerCPU: queue not found"); @@ -103,7 +103,7 @@ class WorkerCPU : public Worker { } } else if (_stealLogic == 1) { // Stealing in sequential order from same domain first - if (_queueMode == 2) { + if (_queueMode == QueueTypeOption::PERCPU) { targetQueue = (targetQueue + 1) % _numQueues; while (targetQueue != startingQueue) { @@ -163,7 +163,7 @@ class WorkerCPU : public Worker { queuesThisDomain++; } } - if (_queueMode == 2) { + if (_queueMode == QueueTypeOption::PERCPU) { while (std::accumulate(eofWorkers.begin(), eofWorkers.end(), 0) < queuesThisDomain) { targetQueue = rand() % _numQueues; if (_physical_ids[targetQueue] == currentDomain) { From 38e0f0fd71228f04a1f3cfd8ef4b3ad6be11e156 Mon Sep 17 00:00:00 2001 From: philipportner Date: Thu, 31 Oct 2024 23:19:32 +0100 Subject: [PATCH 2/7] [#731] scheduling options changed to enum class This patch changes the scheduling options available in LoadPartitioningDefs.h to be of type `enum class` instead of `enum`. With that, usage of these options is considerably improved. - stronger type safety - prevents accidental comparisons with integer or other enum types - no implicit conversion to integers --- .../local/vectorized/LoadPartitioningDefs.h | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/runtime/local/vectorized/LoadPartitioningDefs.h b/src/runtime/local/vectorized/LoadPartitioningDefs.h index 7b71cd55b..ab26312a4 100644 --- a/src/runtime/local/vectorized/LoadPartitioningDefs.h +++ b/src/runtime/local/vectorized/LoadPartitioningDefs.h @@ -16,23 +16,23 @@ #pragma once -enum QueueTypeOption { CENTRALIZED = 0, PERGROUP, PERCPU }; +enum class QueueTypeOption { CENTRALIZED, PERGROUP, PERCPU }; -enum VictimSelectionLogic { SEQ = 0, SEQPRI, RANDOM, RANDOMPRI }; +enum class VictimSelectionLogic { SEQ, SEQPRI, RANDOM, RANDOMPRI }; -enum SelfSchedulingScheme { - STATIC = 0, - SS, - GSS, - TSS, - FAC2, - TFSS, - FISS, - VISS, - PLS, +enum class SelfSchedulingScheme { + INVALID = -1, + STATIC, + SS, // self-scheduling + GSS, // guided self-scheduling + TSS, // trapezoid self-scheduling + FAC2, // factoring + TFSS, // trapezoid factoring self-scheduling (TFSS) + FISS, // fixed increase self-scheduling + VISS, // variable increase self-scheduling + PLS, // performance-based loop self-scheduling + PSS, // probabilistic self-scheduling MSTATIC, - MFSC, - PSS, + MFSC, // modifed fixed-size chunk self-scheduling AUTO, - INVALID = -1 /* only for JSON enum conversion */ }; From 4be7dfa3a0660612b5bf8b87e738d880b13128cf Mon Sep 17 00:00:00 2001 From: philipportner Date: Thu, 31 Oct 2024 23:25:37 +0100 Subject: [PATCH 3/7] [#731] refactor scheduling code to use enum class This patch refactors the code using the scheduling options in the vectorized pipeline to the enum class types instead of hard-coded integers. Closes #731 --- src/api/cli/DaphneUserConfig.h | 6 ++-- src/api/internal/daphne_internal.cpp | 5 ++++ src/parser/config/ConfigParser.h | 26 ++++++++-------- .../local/vectorized/LoadPartitioning.h | 30 +++++++++---------- src/runtime/local/vectorized/MTWrapper.h | 27 +++++++++-------- .../local/vectorized/MTWrapper_dense.cpp | 20 ++++++------- .../local/vectorized/MTWrapper_sparse.cpp | 10 +++---- src/runtime/local/vectorized/WorkerCPU.h | 15 +++++----- .../vectorized/MultiThreadedKernelTest.cpp | 2 +- 9 files changed, 75 insertions(+), 66 deletions(-) diff --git a/src/api/cli/DaphneUserConfig.h b/src/api/cli/DaphneUserConfig.h index d33a18e30..13bfdf4b1 100644 --- a/src/api/cli/DaphneUserConfig.h +++ b/src/api/cli/DaphneUserConfig.h @@ -78,9 +78,9 @@ struct DaphneUserConfig { bool force_cuda = false; - SelfSchedulingScheme taskPartitioningScheme = STATIC; - QueueTypeOption queueSetupScheme = CENTRALIZED; - VictimSelectionLogic victimSelection = SEQPRI; + SelfSchedulingScheme taskPartitioningScheme = SelfSchedulingScheme::STATIC; + QueueTypeOption queueSetupScheme = QueueTypeOption::CENTRALIZED; + VictimSelectionLogic victimSelection = VictimSelectionLogic::SEQPRI; ALLOCATION_TYPE distributedBackEndSetup = ALLOCATION_TYPE::DIST_MPI; // default value size_t max_distributed_serialization_chunk_size = std::numeric_limits::max() - 1024; // 2GB (-1KB to make up for gRPC headers etc.) - which is the diff --git a/src/api/internal/daphne_internal.cpp b/src/api/internal/daphne_internal.cpp index 5b533601b..b63be6f0b 100644 --- a/src/api/internal/daphne_internal.cpp +++ b/src/api/internal/daphne_internal.cpp @@ -164,6 +164,9 @@ int startDAPHNE(int argc, const char **argv, DaphneLibResult *daphneLibRes, int init("")); // Scheduling options + using enum SelfSchedulingScheme; + using enum QueueTypeOption; + using enum VictimSelectionLogic; static opt taskPartitioningScheme( "partitioning", cat(schedulingOptions), desc("Choose task partitioning scheme:"), @@ -179,11 +182,13 @@ int startDAPHNE(int argc, const char **argv, DaphneLibResult *daphneLibRes, int "i.e., MFSC does not require profiling information as FSC"), clEnumVal(PSS, "Probabilistic self-scheduling"), clEnumVal(AUTO, "Automatic partitioning")), init(STATIC)); + static opt queueSetupScheme( "queue_layout", cat(schedulingOptions), desc("Choose queue setup scheme:"), values(clEnumVal(CENTRALIZED, "One queue (default)"), clEnumVal(PERGROUP, "One queue per CPU group"), clEnumVal(PERCPU, "One queue per CPU core")), init(CENTRALIZED)); + static opt victimSelection( "victim_selection", cat(schedulingOptions), desc("Choose work stealing victim selection logic:"), values(clEnumVal(SEQ, "Steal from next adjacent worker (default)"), diff --git a/src/parser/config/ConfigParser.h b/src/parser/config/ConfigParser.h index 111cb8191..04638a05a 100644 --- a/src/parser/config/ConfigParser.h +++ b/src/parser/config/ConfigParser.h @@ -22,19 +22,19 @@ #include // must be in the same namespace as the enum SelfSchedulingScheme -NLOHMANN_JSON_SERIALIZE_ENUM(SelfSchedulingScheme, {{INVALID, nullptr}, - {STATIC, "STATIC"}, - {SS, "SS"}, - {GSS, "GSS"}, - {TSS, "TSS"}, - {FAC2, "FAC2"}, - {TFSS, "TFSS"}, - {FISS, "FISS"}, - {VISS, "VISS"}, - {PLS, "PLS"}, - {MSTATIC, "MSTATIC"}, - {MFSC, "MFSC"}, - {PSS, "PSS"}}) +NLOHMANN_JSON_SERIALIZE_ENUM(SelfSchedulingScheme, {{SelfSchedulingScheme::INVALID, nullptr}, + {SelfSchedulingScheme::STATIC, "STATIC"}, + {SelfSchedulingScheme::SS, "SS"}, + {SelfSchedulingScheme::GSS, "GSS"}, + {SelfSchedulingScheme::TSS, "TSS"}, + {SelfSchedulingScheme::FAC2, "FAC2"}, + {SelfSchedulingScheme::TFSS, "TFSS"}, + {SelfSchedulingScheme::FISS, "FISS"}, + {SelfSchedulingScheme::VISS, "VISS"}, + {SelfSchedulingScheme::PLS, "PLS"}, + {SelfSchedulingScheme::MSTATIC, "MSTATIC"}, + {SelfSchedulingScheme::MFSC, "MFSC"}, + {SelfSchedulingScheme::PSS, "PSS"}}) class ConfigParser { public: diff --git a/src/runtime/local/vectorized/LoadPartitioning.h b/src/runtime/local/vectorized/LoadPartitioning.h index 1b18fa95a..c9609bee6 100644 --- a/src/runtime/local/vectorized/LoadPartitioning.h +++ b/src/runtime/local/vectorized/LoadPartitioning.h @@ -26,7 +26,7 @@ class LoadPartitioning { private: - int schedulingMethod; + SelfSchedulingScheme schedulingMethod; uint64_t totalTasks; uint64_t chunkParam; uint64_t scheduledTasks; @@ -52,7 +52,7 @@ class LoadPartitioning { } public: - LoadPartitioning(int method, uint64_t tasks, uint64_t chunk, uint32_t workers, bool autoChunk) { + LoadPartitioning(SelfSchedulingScheme method, uint64_t tasks, uint64_t chunk, uint32_t workers, bool autoChunk) { schedulingMethod = method; totalTasks = tasks; double tSize = (totalTasks + workers - 1.0) / workers; @@ -64,7 +64,7 @@ class LoadPartitioning { // calculate expertChunk int mul = log2(totalTasks / workers) * 0.618; chunkParam = (totalTasks) / ((2 << mul) * workers); - method = SS; + method = SelfSchedulingScheme::SS; if (chunkParam < 1) { chunkParam = 1; } @@ -81,32 +81,32 @@ class LoadPartitioning { uint64_t getNextChunk() { uint64_t chunkSize = 0; switch (schedulingMethod) { - case STATIC: { // STATIC + case SelfSchedulingScheme::STATIC: { chunkSize = std::ceil(totalTasks / totalWorkers); break; } - case SS: { // self-scheduling (SS) + case SelfSchedulingScheme::SS: { chunkSize = 1; break; } - case GSS: { // guided self-scheduling (GSS) + case SelfSchedulingScheme::GSS: { chunkSize = (uint64_t)ceil((double)remainingTasks / totalWorkers); break; } - case TSS: { // trapezoid self-scheduling (TSS) + case SelfSchedulingScheme::TSS: { chunkSize = tssChunk - tssDelta * schedulingStep; break; } - case FAC2: { // factoring (FAC2) + case SelfSchedulingScheme::FAC2: { uint64_t actualStep = schedulingStep / totalWorkers; // has to be an integer division chunkSize = (uint64_t)ceil(pow(0.5, actualStep + 1) * (totalTasks / totalWorkers)); break; } - case TFSS: { // trapezoid factoring self-scheduling (TFSS) + case SelfSchedulingScheme::TFSS: { chunkSize = (uint64_t)ceil((double)remainingTasks / ((double)2.0 * totalWorkers)); break; } - case FISS: { // fixed increase self-scheduling (FISS) + case SelfSchedulingScheme::FISS: { // TODO uint64_t X = fissStages + 2; uint64_t initChunk = (uint64_t)ceil(totalTasks / ((2.0 + fissStages) * totalWorkers)); @@ -116,14 +116,14 @@ class LoadPartitioning { (fissStages - 1))); // chunksize with increment after init break; } - case VISS: { // variable increase self-scheduling (VISS) + case SelfSchedulingScheme::VISS: { // TODO uint64_t schedulingStepnew = schedulingStep / totalWorkers; uint64_t initChunk = (uint64_t)ceil(totalTasks / ((2.0 + fissStages) * totalWorkers)); chunkSize = initChunk * (uint64_t)ceil((double)(1 - pow(0.5, schedulingStepnew)) / 0.5); break; } - case PLS: { // performance-based loop self-scheduling (PLS) + case SelfSchedulingScheme::PLS: { // TODO double SWR = 0.5; // static workload ratio if (remainingTasks > totalTasks - (totalTasks * SWR)) { @@ -133,7 +133,7 @@ class LoadPartitioning { } break; } - case PSS: { // probabilistic self-scheduling (PSS) + case SelfSchedulingScheme::PSS: { // probabilistic self-scheduling (PSS) // E[P] is the average number of idle processor, for now we use // still totalWorkers double averageIdleProc = (double)totalWorkers; @@ -141,7 +141,7 @@ class LoadPartitioning { // TODO break; } - case MFSC: { // modifed fixed-size chunk self-scheduling (MFSC) + case SelfSchedulingScheme::MFSC: { // modifed fixed-size chunk self-scheduling (MFSC) chunkSize = mfscChunk; break; } @@ -157,4 +157,4 @@ class LoadPartitioning { remainingTasks -= chunkSize; return chunkSize; } -}; \ No newline at end of file +}; diff --git a/src/runtime/local/vectorized/MTWrapper.h b/src/runtime/local/vectorized/MTWrapper.h index ff93cd8f2..2aa50fadc 100644 --- a/src/runtime/local/vectorized/MTWrapper.h +++ b/src/runtime/local/vectorized/MTWrapper.h @@ -53,7 +53,7 @@ template class MTWrapperBase { uint32_t _numCUDAThreads{}; QueueTypeOption _queueMode; int _numQueues; - int _stealLogic; + VictimSelectionLogic _victimSelection; int _totalNumaDomains; DCTX(_ctx); @@ -85,16 +85,19 @@ template class MTWrapperBase { uniqueThreads.push_back(obj->children[i]->os_index); switch (_ctx->getUserConfig().queueSetupScheme) { - case CENTRALIZED: { + case QueueTypeOption::CENTRALIZED: { responsibleThreads.push_back(0); - } break; - case PERGROUP: { + break; + } + case QueueTypeOption::PERGROUP: { if (responsibleThreads.size() == parent_package_id) responsibleThreads.push_back(obj->children[0]->os_index); - } break; - case PERCPU: { + break; + } + case QueueTypeOption::PERCPU: { responsibleThreads.push_back(obj->os_index); - } break; + break; + } } } } @@ -129,7 +132,7 @@ template class MTWrapperBase { int i = 0; for (auto &w : cpp_workers) { w = std::make_unique(qvector, topologyPhysicalIds, topologyUniqueThreads, _ctx, verbose, 0, - batchSize, i, numQueues, queueMode, this->_stealLogic, pinWorkers); + batchSize, i, numQueues, queueMode, this->_victimSelection, pinWorkers); i++; } } @@ -190,7 +193,7 @@ template class MTWrapperBase { else _numCPPThreads = topologyPhysicalIds.size(); - if (_ctx->getUserConfig().queueSetupScheme != CENTRALIZED) + if (_ctx->getUserConfig().queueSetupScheme != QueueTypeOption::CENTRALIZED) _numCPPThreads = topologyUniqueThreads.size(); // If the available CPUs from Slurm is less than the configured num @@ -206,16 +209,16 @@ template class MTWrapperBase { _queueMode = QueueTypeOption::CENTRALIZED; _numQueues = 1; - _stealLogic = _ctx->getUserConfig().victimSelection; + _victimSelection = _ctx->getUserConfig().victimSelection; if (std::thread::hardware_concurrency() < topologyUniqueThreads.size() && _ctx->config.hyperthreadingEnabled) topologyUniqueThreads.resize(_numCPPThreads); _numThreads = _numCPPThreads + _numCUDAThreads; _totalNumaDomains = std::set(topologyPhysicalIds.begin(), topologyPhysicalIds.end()).size(); - if (_ctx->getUserConfig().queueSetupScheme == PERGROUP) { + if (_ctx->getUserConfig().queueSetupScheme == QueueTypeOption::PERGROUP) { _queueMode = QueueTypeOption::PERGROUP; _numQueues = _totalNumaDomains; - } else if (_ctx->getUserConfig().queueSetupScheme == PERCPU) { + } else if (_ctx->getUserConfig().queueSetupScheme == QueueTypeOption::PERCPU) { _queueMode = QueueTypeOption::PERCPU; _numQueues = _numCPPThreads; } diff --git a/src/runtime/local/vectorized/MTWrapper_dense.cpp b/src/runtime/local/vectorized/MTWrapper_dense.cpp index 2ff28d4b0..ea1dd49da 100644 --- a/src/runtime/local/vectorized/MTWrapper_dense.cpp +++ b/src/runtime/local/vectorized/MTWrapper_dense.cpp @@ -57,14 +57,14 @@ template // create tasks and close input uint64_t startChunk = 0; uint64_t endChunk = 0; - int method = ctx->config.taskPartitioningScheme; + SelfSchedulingScheme schedulingScheme = ctx->config.taskPartitioningScheme; int chunkParam = ctx->config.minimumTaskSize; if (chunkParam <= 0) chunkParam = 1; bool autoChunk = false; - if (method == AUTO) + if (schedulingScheme == SelfSchedulingScheme::AUTO) autoChunk = true; - LoadPartitioning lp(method, len, chunkParam, this->_numThreads, autoChunk); + LoadPartitioning lp(schedulingScheme, len, chunkParam, this->_numThreads, autoChunk); while (lp.hasNextChunk()) { endChunk += lp.getNextChunk(); q->enqueueTask(new CompiledPipelineTask>( @@ -122,7 +122,7 @@ template uint64_t endChunk = 0; uint64_t currentItr = 0; uint64_t target; - int method = ctx->config.taskPartitioningScheme; + SelfSchedulingScheme schedulingScheme = ctx->config.taskPartitioningScheme; int chunkParam = ctx->config.minimumTaskSize; if (chunkParam <= 0) chunkParam = 1; @@ -130,9 +130,9 @@ template uint64_t oneChunk = len / this->_numQueues; int remainder = len - (oneChunk * this->_numQueues); std::vector lps; - lps.emplace_back(method, oneChunk + remainder, chunkParam, this->_numThreads, false); + lps.emplace_back(schedulingScheme, oneChunk + remainder, chunkParam, this->_numThreads, false); for (int i = 1; i < this->_numQueues; i++) { - lps.emplace_back(method, oneChunk, chunkParam, this->_numThreads, false); + lps.emplace_back(schedulingScheme, oneChunk, chunkParam, this->_numThreads, false); } if (ctx->getUserConfig().pinWorkers) { for (int i = 0; i < this->_numQueues; i++) { @@ -163,9 +163,9 @@ template } } else { bool autoChunk = false; - if (method == AUTO) + if (schedulingScheme == SelfSchedulingScheme::AUTO) autoChunk = true; - LoadPartitioning lp(method, len, chunkParam, this->_numThreads, autoChunk); + LoadPartitioning lp(schedulingScheme, len, chunkParam, this->_numThreads, autoChunk); if (ctx->getUserConfig().pinWorkers) { while (lp.hasNextChunk()) { endChunk += lp.getNextChunk(); @@ -297,12 +297,12 @@ template uint64_t endChunk = device_task_len; uint64_t currentItr = 0; uint64_t target; - int method = ctx->config.taskPartitioningScheme; + SelfSchedulingScheme method = ctx->config.taskPartitioningScheme; int chunkParam = ctx->config.minimumTaskSize; if (chunkParam <= 0) chunkParam = 1; bool autoChunk = false; - if (method == AUTO) + if (method == SelfSchedulingScheme::AUTO) autoChunk = true; LoadPartitioning lp(method, cpu_task_len, chunkParam, this->_numCPPThreads, autoChunk); diff --git a/src/runtime/local/vectorized/MTWrapper_sparse.cpp b/src/runtime/local/vectorized/MTWrapper_sparse.cpp index 91fddcf83..75ad2f840 100644 --- a/src/runtime/local/vectorized/MTWrapper_sparse.cpp +++ b/src/runtime/local/vectorized/MTWrapper_sparse.cpp @@ -68,7 +68,7 @@ void MTWrapper>::executeCpuQueues( uint64_t endChunk = 0; uint64_t currentItr = 0; uint64_t target; - int method = ctx->config.taskPartitioningScheme; + SelfSchedulingScheme schedulingScheme = ctx->config.taskPartitioningScheme; int chunkParam = ctx->config.minimumTaskSize; if (chunkParam <= 0) chunkParam = 1; @@ -76,9 +76,9 @@ void MTWrapper>::executeCpuQueues( uint64_t oneChunk = len / this->_numQueues; int remainder = len - (oneChunk * this->_numQueues); std::vector lps; - lps.emplace_back(method, oneChunk + remainder, chunkParam, this->_numThreads, false); + lps.emplace_back(schedulingScheme, oneChunk + remainder, chunkParam, this->_numThreads, false); for (int i = 1; i < this->_numQueues; i++) { - lps.emplace_back(method, oneChunk, chunkParam, this->_numThreads, false); + lps.emplace_back(schedulingScheme, oneChunk, chunkParam, this->_numThreads, false); } if (ctx->getUserConfig().pinWorkers) { for (int i = 0; i < this->_numQueues; i++) { @@ -109,10 +109,10 @@ void MTWrapper>::executeCpuQueues( } } else { bool autoChunk = false; - if (method == AUTO) + if (schedulingScheme == SelfSchedulingScheme::AUTO) autoChunk = true; - LoadPartitioning lp(method, len, chunkParam, this->_numThreads, autoChunk); + LoadPartitioning lp(schedulingScheme, len, chunkParam, this->_numThreads, autoChunk); if (ctx->getUserConfig().pinWorkers) { while (lp.hasNextChunk()) { endChunk += lp.getNextChunk(); diff --git a/src/runtime/local/vectorized/WorkerCPU.h b/src/runtime/local/vectorized/WorkerCPU.h index 7ec61728b..df33e7147 100644 --- a/src/runtime/local/vectorized/WorkerCPU.h +++ b/src/runtime/local/vectorized/WorkerCPU.h @@ -32,17 +32,18 @@ class WorkerCPU : public Worker { int _threadID; int _numQueues; QueueTypeOption _queueMode; - int _stealLogic; + VictimSelectionLogic _victimSelection; bool _pinWorkers; public: // ToDo: remove compile-time verbose parameter and use logger WorkerCPU(std::vector deques, std::vector physical_ids, std::vector unique_threads, DCTX(dctx), bool verbose, uint32_t fid = 0, uint32_t batchSize = 100, int threadID = 0, int numQueues = 0, - QueueTypeOption queueMode = QueueTypeOption::CENTRALIZED, int stealLogic = 0, bool pinWorkers = 0) + QueueTypeOption queueMode = QueueTypeOption::CENTRALIZED, + VictimSelectionLogic victimSelection = VictimSelectionLogic::SEQ, bool pinWorkers = 0) : Worker(dctx), _q(deques), _physical_ids(physical_ids), _unique_threads(unique_threads), _verbose(verbose), _fid(fid), _batchSize(batchSize), _threadID(threadID), _numQueues(numQueues), _queueMode(queueMode), - _stealLogic(stealLogic), _pinWorkers(pinWorkers) { + _victimSelection(victimSelection), _pinWorkers(pinWorkers) { // at last, start the thread t = std::make_unique(&WorkerCPU::run, this); } @@ -87,7 +88,7 @@ class WorkerCPU : public Worker { // queues. if (_numQueues > 1) { - if (_stealLogic == 0) { + if (_victimSelection == VictimSelectionLogic::SEQ) { // Stealing in sequential order targetQueue = (targetQueue + 1) % _numQueues; @@ -101,7 +102,7 @@ class WorkerCPU : public Worker { delete t; } } - } else if (_stealLogic == 1) { + } else if (_victimSelection == VictimSelectionLogic::SEQPRI) { // Stealing in sequential order from same domain first if (_queueMode == QueueTypeOption::PERCPU) { targetQueue = (targetQueue + 1) % _numQueues; @@ -134,7 +135,7 @@ class WorkerCPU : public Worker { delete t; } } - } else if (_stealLogic == 2) { + } else if (_victimSelection == VictimSelectionLogic::RANDOM) { // stealing from random workers until all workers EOF eofWorkers.fill(false); @@ -153,7 +154,7 @@ class WorkerCPU : public Worker { } } - } else if (_stealLogic == 3) { + } else if (_victimSelection == VictimSelectionLogic::RANDOMPRI) { // stealing from random workers from same socket first int queuesThisDomain = 0; eofWorkers.fill(false); diff --git a/test/runtime/local/vectorized/MultiThreadedKernelTest.cpp b/test/runtime/local/vectorized/MultiThreadedKernelTest.cpp index 1c1103e2e..ed621f02a 100644 --- a/test/runtime/local/vectorized/MultiThreadedKernelTest.cpp +++ b/test/runtime/local/vectorized/MultiThreadedKernelTest.cpp @@ -42,7 +42,7 @@ TEMPLATE_PRODUCT_TEST_CASE("Multi-threaded-scheduling", TAG_VECTORIZED, (DATA_TY using DT = TestType; using VT = typename DT::VT; auto dctx = setupContextAndLogger(); - dctx->config.taskPartitioningScheme = GSS; + dctx->config.taskPartitioningScheme = SelfSchedulingScheme::GSS; dctx->config.minimumTaskSize = 50; DT *m1 = nullptr, *m2 = nullptr; From 7a24ee7f64a1c2bd65da144ae57d0066231756c4 Mon Sep 17 00:00:00 2001 From: philipportner Date: Fri, 1 Nov 2024 00:35:27 +0100 Subject: [PATCH 4/7] [kernels] minor code cleanup in pipeline code - remove unused function - use initializer list - fix uninitialized variables - refactor duplicate code - const qualifier, auto - removed unused imports - std::move --- .../local/vectorized/LoadPartitioning.h | 32 +++++++++---------- src/runtime/local/vectorized/MTWrapper.h | 9 ++---- .../local/vectorized/MTWrapper_dense.cpp | 30 ++++++++--------- .../local/vectorized/MTWrapper_sparse.cpp | 28 ++++++++-------- src/runtime/local/vectorized/TaskQueues.h | 20 ++++-------- .../local/vectorized/VectorizedDataSink.h | 4 +-- src/runtime/local/vectorized/Worker.h | 2 +- src/runtime/local/vectorized/WorkerCPU.h | 13 ++++---- 8 files changed, 62 insertions(+), 76 deletions(-) diff --git a/src/runtime/local/vectorized/LoadPartitioning.h b/src/runtime/local/vectorized/LoadPartitioning.h index c9609bee6..efd951f18 100644 --- a/src/runtime/local/vectorized/LoadPartitioning.h +++ b/src/runtime/local/vectorized/LoadPartitioning.h @@ -19,9 +19,8 @@ #include "LoadPartitioningDefs.h" #include +#include #include -#include -#include class LoadPartitioning { @@ -37,8 +36,8 @@ class LoadPartitioning { uint64_t tssDelta; uint64_t mfscChunk; uint32_t fissStages; - int getMethod(const char *method) { return std::stoi(method); } - int getStages(int tasks, int workers) { + + static int getStages(int tasks, int workers) { int actual_step = 0; int scheduled = 0; int step = 0; @@ -52,12 +51,11 @@ class LoadPartitioning { } public: - LoadPartitioning(SelfSchedulingScheme method, uint64_t tasks, uint64_t chunk, uint32_t workers, bool autoChunk) { - schedulingMethod = method; - totalTasks = tasks; + LoadPartitioning(SelfSchedulingScheme method, uint64_t tasks, uint64_t chunk, uint32_t workers, bool autoChunk) + : schedulingMethod(method), totalTasks(tasks), fissStages(getStages(totalTasks, workers)) { double tSize = (totalTasks + workers - 1.0) / workers; mfscChunk = ceil(tSize * log(2.0) / log((1.0 * tSize))); - fissStages = getStages(totalTasks, workers); + if (!autoChunk) { chunkParam = chunk; } else { @@ -74,10 +72,12 @@ class LoadPartitioning { schedulingStep = 0; scheduledTasks = 0; tssChunk = (uint64_t)ceil((double)totalTasks / ((double)2.0 * totalWorkers)); - uint64_t nTemp = (uint64_t)ceil(2.0 * totalTasks / (tssChunk + 1.0)); + auto nTemp = (uint64_t)ceil(2.0 * totalTasks / (tssChunk + 1.0)); tssDelta = (uint64_t)(tssChunk - 1.0) / (double)(nTemp - 1.0); } - bool hasNextChunk() { return scheduledTasks < totalTasks; } + + [[nodiscard]] bool hasNextChunk() const { return scheduledTasks < totalTasks; } + uint64_t getNextChunk() { uint64_t chunkSize = 0; switch (schedulingMethod) { @@ -98,7 +98,7 @@ class LoadPartitioning { break; } case SelfSchedulingScheme::FAC2: { - uint64_t actualStep = schedulingStep / totalWorkers; // has to be an integer division + const uint64_t actualStep = schedulingStep / totalWorkers; // has to be an integer division chunkSize = (uint64_t)ceil(pow(0.5, actualStep + 1) * (totalTasks / totalWorkers)); break; } @@ -108,8 +108,8 @@ class LoadPartitioning { } case SelfSchedulingScheme::FISS: { // TODO - uint64_t X = fissStages + 2; - uint64_t initChunk = (uint64_t)ceil(totalTasks / ((2.0 + fissStages) * totalWorkers)); + const uint64_t X = fissStages + 2; + auto initChunk = (uint64_t)ceil(totalTasks / ((2.0 + fissStages) * totalWorkers)); chunkSize = initChunk + schedulingStep * (uint64_t)ceil((2.0 * totalTasks * (1.0 - (fissStages / X))) / (totalWorkers * fissStages * @@ -119,13 +119,13 @@ class LoadPartitioning { case SelfSchedulingScheme::VISS: { // TODO uint64_t schedulingStepnew = schedulingStep / totalWorkers; - uint64_t initChunk = (uint64_t)ceil(totalTasks / ((2.0 + fissStages) * totalWorkers)); + auto initChunk = (uint64_t)ceil(totalTasks / ((2.0 + fissStages) * totalWorkers)); chunkSize = initChunk * (uint64_t)ceil((double)(1 - pow(0.5, schedulingStepnew)) / 0.5); break; } case SelfSchedulingScheme::PLS: { // TODO - double SWR = 0.5; // static workload ratio + const double SWR = 0.5; // static workload ratio if (remainingTasks > totalTasks - (totalTasks * SWR)) { chunkSize = (uint64_t)ceil((double)totalTasks * SWR / totalWorkers); } else { @@ -136,7 +136,7 @@ class LoadPartitioning { case SelfSchedulingScheme::PSS: { // probabilistic self-scheduling (PSS) // E[P] is the average number of idle processor, for now we use // still totalWorkers - double averageIdleProc = (double)totalWorkers; + auto averageIdleProc = (double)totalWorkers; chunkSize = (uint64_t)ceil((double)remainingTasks / (1.5 * averageIdleProc)); // TODO break; diff --git a/src/runtime/local/vectorized/MTWrapper.h b/src/runtime/local/vectorized/MTWrapper.h index 2aa50fadc..f823180eb 100644 --- a/src/runtime/local/vectorized/MTWrapper.h +++ b/src/runtime/local/vectorized/MTWrapper.h @@ -28,14 +28,11 @@ #include -#include #include -#include #include #include -// TODO use the wrapper to cache threads // TODO generalize for arbitrary inputs (not just binary) using mlir::daphne::VectorCombine; @@ -104,14 +101,14 @@ template class MTWrapperBase { void get_topology(std::vector &physicalIds, std::vector &uniqueThreads, std::vector &responsibleThreads) { - hwloc_topology_t topology; + hwloc_topology_t topology = nullptr; hwloc_topology_init(&topology); hwloc_topology_load(topology); - hwloc_obj_t package = hwloc_get_next_obj_by_type(topology, HWLOC_OBJ_PACKAGE, NULL); + hwloc_obj_t package = hwloc_get_next_obj_by_type(topology, HWLOC_OBJ_PACKAGE, nullptr); - while (package != NULL) { + while (package != nullptr) { auto package_id = package->os_index; hwloc_recurse_topology(topology, package, package_id, physicalIds, uniqueThreads, responsibleThreads); package = hwloc_get_next_obj_by_type(topology, HWLOC_OBJ_PACKAGE, package); diff --git a/src/runtime/local/vectorized/MTWrapper_dense.cpp b/src/runtime/local/vectorized/MTWrapper_dense.cpp index ea1dd49da..a6b2cadfa 100644 --- a/src/runtime/local/vectorized/MTWrapper_dense.cpp +++ b/src/runtime/local/vectorized/MTWrapper_dense.cpp @@ -121,7 +121,7 @@ template uint64_t startChunk = 0; uint64_t endChunk = 0; uint64_t currentItr = 0; - uint64_t target; + uint64_t target = 0; SelfSchedulingScheme schedulingScheme = ctx->config.taskPartitioningScheme; int chunkParam = ctx->config.minimumTaskSize; if (chunkParam <= 0) @@ -138,13 +138,12 @@ template for (int i = 0; i < this->_numQueues; i++) { while (lps[i].hasNextChunk()) { endChunk += lps[i].getNextChunk(); - qvector[i]->enqueueTaskPinned( - new CompiledPipelineTask>( - CompiledPipelineTaskData>{funcs, isScalar, inputs, numInputs, numOutputs, - outRows, outCols, splits, combines, startChunk, - endChunk, outRows, outCols, 0, ctx}, - resLock, res), - this->topologyResponsibleThreads[i]); + qvector[i]->enqueueTask(new CompiledPipelineTask>( + CompiledPipelineTaskData>{ + funcs, isScalar, inputs, numInputs, numOutputs, outRows, outCols, + splits, combines, startChunk, endChunk, outRows, outCols, 0, ctx}, + resLock, res), + this->topologyResponsibleThreads[i]); startChunk = endChunk; } } @@ -170,13 +169,12 @@ template while (lp.hasNextChunk()) { endChunk += lp.getNextChunk(); target = currentItr % this->_numQueues; - qvector[target]->enqueueTaskPinned( - new CompiledPipelineTask>( - CompiledPipelineTaskData>{funcs, isScalar, inputs, numInputs, numOutputs, - outRows, outCols, splits, combines, startChunk, - endChunk, outRows, outCols, 0, ctx}, - resLock, res), - this->topologyUniqueThreads[target]); + qvector[target]->enqueueTask(new CompiledPipelineTask>( + CompiledPipelineTaskData>{ + funcs, isScalar, inputs, numInputs, numOutputs, outRows, outCols, + splits, combines, startChunk, endChunk, outRows, outCols, 0, ctx}, + resLock, res), + this->topologyUniqueThreads[target]); startChunk = endChunk; currentItr++; } @@ -296,7 +294,7 @@ template uint64_t startChunk = device_task_len; uint64_t endChunk = device_task_len; uint64_t currentItr = 0; - uint64_t target; + uint64_t target = 0; SelfSchedulingScheme method = ctx->config.taskPartitioningScheme; int chunkParam = ctx->config.minimumTaskSize; if (chunkParam <= 0) diff --git a/src/runtime/local/vectorized/MTWrapper_sparse.cpp b/src/runtime/local/vectorized/MTWrapper_sparse.cpp index 75ad2f840..40673eb84 100644 --- a/src/runtime/local/vectorized/MTWrapper_sparse.cpp +++ b/src/runtime/local/vectorized/MTWrapper_sparse.cpp @@ -67,7 +67,7 @@ void MTWrapper>::executeCpuQueues( uint64_t startChunk = 0; uint64_t endChunk = 0; uint64_t currentItr = 0; - uint64_t target; + uint64_t target = 0; SelfSchedulingScheme schedulingScheme = ctx->config.taskPartitioningScheme; int chunkParam = ctx->config.minimumTaskSize; if (chunkParam <= 0) @@ -84,13 +84,12 @@ void MTWrapper>::executeCpuQueues( for (int i = 0; i < this->_numQueues; i++) { while (lps[i].hasNextChunk()) { endChunk += lps[i].getNextChunk(); - qvector[i]->enqueueTaskPinned( - new CompiledPipelineTask>( - CompiledPipelineTaskData>{funcs, isScalar, inputs, numInputs, numOutputs, - outRows, outCols, splits, combines, startChunk, - endChunk, outRows, outCols, 0, ctx}, - dataSinks), - this->topologyResponsibleThreads[i]); + qvector[i]->enqueueTask(new CompiledPipelineTask>( + CompiledPipelineTaskData>{ + funcs, isScalar, inputs, numInputs, numOutputs, outRows, outCols, + splits, combines, startChunk, endChunk, outRows, outCols, 0, ctx}, + dataSinks), + this->topologyResponsibleThreads[i]); startChunk = endChunk; } } @@ -117,13 +116,12 @@ void MTWrapper>::executeCpuQueues( while (lp.hasNextChunk()) { endChunk += lp.getNextChunk(); target = currentItr % this->_numQueues; - qvector[target]->enqueueTaskPinned( - new CompiledPipelineTask>( - CompiledPipelineTaskData>{funcs, isScalar, inputs, numInputs, numOutputs, outRows, - outCols, splits, combines, startChunk, endChunk, - outRows, outCols, 0, ctx}, - dataSinks), - target); + qvector[target]->enqueueTask(new CompiledPipelineTask>( + CompiledPipelineTaskData>{ + funcs, isScalar, inputs, numInputs, numOutputs, outRows, outCols, + splits, combines, startChunk, endChunk, outRows, outCols, 0, ctx}, + dataSinks), + target); startChunk = endChunk; currentItr++; } diff --git a/src/runtime/local/vectorized/TaskQueues.h b/src/runtime/local/vectorized/TaskQueues.h index 82745216b..801fdde64 100644 --- a/src/runtime/local/vectorized/TaskQueues.h +++ b/src/runtime/local/vectorized/TaskQueues.h @@ -29,7 +29,8 @@ class TaskQueue { virtual ~TaskQueue() = default; virtual void enqueueTask(Task *t) = 0; - virtual void enqueueTaskPinned(Task *t, int targetCPU) = 0; + // overload to pin a Task to a certain CPU + virtual void enqueueTask(Task *t, int targetCPU) = 0; virtual Task *dequeueTask() = 0; virtual uint64_t size() = 0; virtual void closeInput() = 0; @@ -46,10 +47,7 @@ class BlockingTaskQueue : public TaskQueue { public: BlockingTaskQueue() : BlockingTaskQueue(DEFAULT_MAX_SIZE) {} - explicit BlockingTaskQueue(uint64_t capacity) { - _closedInput = false; - _capacity = capacity; - } + explicit BlockingTaskQueue(uint64_t capacity) : _capacity(capacity), _closedInput(false) {} ~BlockingTaskQueue() override = default; void enqueueTask(Task *t) override { @@ -65,18 +63,13 @@ class BlockingTaskQueue : public TaskQueue { _cv.notify_one(); } - void enqueueTaskPinned(Task *t, int targetCPU) override { + void enqueueTask(Task *t, int targetCPU) override { // Change CPU pinning before enqueue to utilize NUMA first-touch policy cpu_set_t cpuset; CPU_ZERO(&cpuset); CPU_SET(targetCPU, &cpuset); sched_setaffinity(0, sizeof(cpu_set_t), &cpuset); - std::unique_lock lk(_qmutex); - while (_data.size() + 1 > _capacity) - _cv.wait(lk); - _data.push_back(t); - lk.unlock(); - _cv.notify_one(); + enqueueTask(t); } Task *dequeueTask() override { @@ -86,8 +79,7 @@ class BlockingTaskQueue : public TaskQueue { while (_data.empty()) { if (_closedInput) return &_eof; - else - _cv.wait(lk); + _cv.wait(lk); } // obtain next task Task *t = _data.front(); diff --git a/src/runtime/local/vectorized/VectorizedDataSink.h b/src/runtime/local/vectorized/VectorizedDataSink.h index 730bb8bb7..be8d37979 100644 --- a/src/runtime/local/vectorized/VectorizedDataSink.h +++ b/src/runtime/local/vectorized/VectorizedDataSink.h @@ -56,7 +56,7 @@ template class VectorizedDataSink> { case VectorCombine::COLS: { auto rows = matrix->getNumRows(); auto *off = matrix->getRowOffsets(); - auto *rowNnz = &_rowNnz[0]; + auto *rowNnz = _rowNnz.data(); PRAGMA_LOOP_VECTORIZE for (size_t row = 0; row < rows; ++row) { rowNnz[row] += off[row + 1] - off[row]; @@ -102,7 +102,7 @@ template class VectorizedDataSink> { DataObjectFactory::destroy(currMat); } } else if (_combine == VectorCombine::COLS) { - auto *rowNnz = &_rowNnz[0]; + auto *rowNnz = _rowNnz.data(); PRAGMA_LOOP_VECTORIZE for (size_t row = 0; row < _numRows; ++row) { resRowOff[row + 1] = resRowOff[row] + rowNnz[row]; diff --git a/src/runtime/local/vectorized/Worker.h b/src/runtime/local/vectorized/Worker.h index 62264432c..650032c9c 100644 --- a/src/runtime/local/vectorized/Worker.h +++ b/src/runtime/local/vectorized/Worker.h @@ -30,7 +30,7 @@ class Worker { // Worker only used as derived class, which starts the thread after the // class has been constructed (order matters). - explicit Worker(DCTX(dctx)) : t(), ctx(dctx) {} + explicit Worker(DCTX(dctx)) : ctx(dctx) {} public: // Worker is move only due to std::thread. Therefore, we delete the copy diff --git a/src/runtime/local/vectorized/WorkerCPU.h b/src/runtime/local/vectorized/WorkerCPU.h index df33e7147..b364aee3b 100644 --- a/src/runtime/local/vectorized/WorkerCPU.h +++ b/src/runtime/local/vectorized/WorkerCPU.h @@ -18,14 +18,14 @@ #include "Worker.h" #include - #include +#include class WorkerCPU : public Worker { std::vector _q; std::vector _physical_ids; std::vector _unique_threads; - std::array eofWorkers; + std::array eofWorkers{}; bool _verbose; uint32_t _fid; uint32_t _batchSize; @@ -40,10 +40,11 @@ class WorkerCPU : public Worker { WorkerCPU(std::vector deques, std::vector physical_ids, std::vector unique_threads, DCTX(dctx), bool verbose, uint32_t fid = 0, uint32_t batchSize = 100, int threadID = 0, int numQueues = 0, QueueTypeOption queueMode = QueueTypeOption::CENTRALIZED, - VictimSelectionLogic victimSelection = VictimSelectionLogic::SEQ, bool pinWorkers = 0) - : Worker(dctx), _q(deques), _physical_ids(physical_ids), _unique_threads(unique_threads), _verbose(verbose), - _fid(fid), _batchSize(batchSize), _threadID(threadID), _numQueues(numQueues), _queueMode(queueMode), - _victimSelection(victimSelection), _pinWorkers(pinWorkers) { + VictimSelectionLogic victimSelection = VictimSelectionLogic::SEQ, bool pinWorkers = false) + : Worker(dctx), _q(std::move(deques)), _physical_ids(std::move(physical_ids)), + _unique_threads(std::move(unique_threads)), _verbose(verbose), _fid(fid), _batchSize(batchSize), + _threadID(threadID), _numQueues(numQueues), _queueMode(queueMode), _victimSelection(victimSelection), + _pinWorkers(pinWorkers) { // at last, start the thread t = std::make_unique(&WorkerCPU::run, this); } From ef93617cb7bb848f6d59e10d2b93fedda4c4fe72 Mon Sep 17 00:00:00 2001 From: philipportner Date: Fri, 1 Nov 2024 00:54:56 +0100 Subject: [PATCH 5/7] [#892] explicitly use MSTATIC in LoadPartitioning This patch adds MSTATIC to the switch case in the LoadPartitioning. Before, this was implicitly handled by the default case. Closes #892 --- src/runtime/local/vectorized/LoadPartitioning.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/runtime/local/vectorized/LoadPartitioning.h b/src/runtime/local/vectorized/LoadPartitioning.h index efd951f18..0dd6ae5a9 100644 --- a/src/runtime/local/vectorized/LoadPartitioning.h +++ b/src/runtime/local/vectorized/LoadPartitioning.h @@ -145,6 +145,7 @@ class LoadPartitioning { chunkSize = mfscChunk; break; } + case SelfSchedulingScheme::MSTATIC: default: { chunkSize = (uint64_t)ceil(totalTasks / totalWorkers / 4.0); break; From abc6c38fe8f5e1beaac653a1c257940c91eda2c6 Mon Sep 17 00:00:00 2001 From: philipportner Date: Thu, 31 Oct 2024 11:53:37 +0100 Subject: [PATCH 6/7] [#877] persist hwloc info in different pipelines This patch persists the hwloc topology info used in vectorized pipelines over the execution of multiple pipelines. This significantly reduces overhead in workloads that consint of multiple pipelines. With the tested script this patch reduced overall runtimes from ~6s to ~0.9s. closes #877 closes #890 --- .../local/kernels/VectorizedPipeline.h | 4 +- src/runtime/local/vectorized/MTWrapper.h | 87 +++++-------------- .../local/vectorized/MTWrapper_dense.cpp | 4 +- .../local/vectorized/MTWrapper_sparse.cpp | 2 +- .../local/vectorized/PipelineHWlocInfo.h | 64 ++++++++++++++ src/runtime/local/vectorized/WorkerCPU.h | 2 + .../vectorized/MultiThreadedKernelTest.cpp | 9 +- 7 files changed, 100 insertions(+), 72 deletions(-) create mode 100644 src/runtime/local/vectorized/PipelineHWlocInfo.h diff --git a/src/runtime/local/kernels/VectorizedPipeline.h b/src/runtime/local/kernels/VectorizedPipeline.h index 7daaedc6f..c9a265f97 100644 --- a/src/runtime/local/kernels/VectorizedPipeline.h +++ b/src/runtime/local/kernels/VectorizedPipeline.h @@ -21,6 +21,7 @@ #include #include #include +#include #include @@ -35,7 +36,8 @@ template struct VectorizedPipeline { static void apply(DTRes **outputs, size_t numOutputs, bool *isScalar, Structure **inputs, size_t numInputs, int64_t *outRows, int64_t *outCols, int64_t *splits, int64_t *combines, size_t numFuncs, void **fun, DCTX(ctx)) { - auto wrapper = std::make_unique>(numFuncs, ctx); + static PipelineHWlocInfo topology{ctx}; + auto wrapper = std::make_unique>(numFuncs, topology, ctx); std::vector> funcs; for (auto i = 0ul; i < numFuncs; ++i) { diff --git a/src/runtime/local/vectorized/MTWrapper.h b/src/runtime/local/vectorized/MTWrapper.h index f823180eb..b735df155 100644 --- a/src/runtime/local/vectorized/MTWrapper.h +++ b/src/runtime/local/vectorized/MTWrapper.h @@ -20,16 +20,20 @@ #include "runtime/local/datastructures/AllocationDescriptorCUDA.h" #endif +#include "PipelineHWlocInfo.h" + #include #include #include #include #include +#include #include #include #include +#include #include @@ -42,9 +46,6 @@ template class MTWrapperBase { protected: std::vector> cuda_workers; std::vector> cpp_workers; - std::vector topologyPhysicalIds; - std::vector topologyUniqueThreads; - std::vector topologyResponsibleThreads; size_t _numThreads{}; uint32_t _numCPPThreads{}; uint32_t _numCUDAThreads{}; @@ -52,6 +53,7 @@ template class MTWrapperBase { int _numQueues; VictimSelectionLogic _victimSelection; int _totalNumaDomains; + PipelineHWlocInfo _topology; DCTX(_ctx); std::pair getInputProperties(Structure **inputs, size_t numInputs, VectorSplit *splits) { @@ -68,55 +70,6 @@ template class MTWrapperBase { return std::make_pair(len, mem_required); } - void hwloc_recurse_topology(hwloc_topology_t topo, hwloc_obj_t obj, unsigned int parent_package_id, - std::vector &physicalIds, std::vector &uniqueThreads, - std::vector &responsibleThreads) { - if (obj->type != HWLOC_OBJ_CORE) { - for (unsigned int i = 0; i < obj->arity; i++) { - hwloc_recurse_topology(topo, obj->children[i], parent_package_id, physicalIds, uniqueThreads, - responsibleThreads); - } - } else { - physicalIds.push_back(parent_package_id); - for (unsigned int i = 0; i < obj->arity; i++) - uniqueThreads.push_back(obj->children[i]->os_index); - - switch (_ctx->getUserConfig().queueSetupScheme) { - case QueueTypeOption::CENTRALIZED: { - responsibleThreads.push_back(0); - break; - } - case QueueTypeOption::PERGROUP: { - if (responsibleThreads.size() == parent_package_id) - responsibleThreads.push_back(obj->children[0]->os_index); - break; - } - case QueueTypeOption::PERCPU: { - responsibleThreads.push_back(obj->os_index); - break; - } - } - } - } - - void get_topology(std::vector &physicalIds, std::vector &uniqueThreads, - std::vector &responsibleThreads) { - hwloc_topology_t topology = nullptr; - - hwloc_topology_init(&topology); - hwloc_topology_load(topology); - - hwloc_obj_t package = hwloc_get_next_obj_by_type(topology, HWLOC_OBJ_PACKAGE, nullptr); - - while (package != nullptr) { - auto package_id = package->os_index; - hwloc_recurse_topology(topology, package, package_id, physicalIds, uniqueThreads, responsibleThreads); - package = hwloc_get_next_obj_by_type(topology, HWLOC_OBJ_PACKAGE, package); - } - - hwloc_topology_destroy(topology); - } - void initCPPWorkers(std::vector &qvector, uint32_t batchSize, const bool verbose = false, int numQueues = 0, QueueTypeOption queueMode = QueueTypeOption::CENTRALIZED, bool pinWorkers = false) { @@ -128,7 +81,9 @@ template class MTWrapperBase { int i = 0; for (auto &w : cpp_workers) { - w = std::make_unique(qvector, topologyPhysicalIds, topologyUniqueThreads, _ctx, verbose, 0, + _ctx->logger->debug("creatign worker {} with topology {}, size={}", i, _topology.physicalIds, + _topology.physicalIds.size()); + w = std::make_unique(qvector, _topology.physicalIds, _topology.uniqueThreads, _ctx, verbose, 0, batchSize, i, numQueues, queueMode, this->_victimSelection, pinWorkers); i++; } @@ -181,17 +136,17 @@ template class MTWrapperBase { } public: - explicit MTWrapperBase(uint32_t numFunctions, DCTX(ctx)) : _ctx(ctx) { + explicit MTWrapperBase(uint32_t numFunctions, PipelineHWlocInfo topology, DCTX(ctx)) + : _topology(std::move(topology)), _ctx(ctx) { _ctx->logger->debug("Querying cpu topology"); - get_topology(topologyPhysicalIds, topologyUniqueThreads, topologyResponsibleThreads); if (ctx->config.numberOfThreads > 0) _numCPPThreads = ctx->config.numberOfThreads; else - _numCPPThreads = topologyPhysicalIds.size(); + _numCPPThreads = _topology.physicalIds.size(); if (_ctx->getUserConfig().queueSetupScheme != QueueTypeOption::CENTRALIZED) - _numCPPThreads = topologyUniqueThreads.size(); + _numCPPThreads = _topology.uniqueThreads.size(); // If the available CPUs from Slurm is less than the configured num // threads, use the value from Slurm @@ -207,10 +162,10 @@ template class MTWrapperBase { _queueMode = QueueTypeOption::CENTRALIZED; _numQueues = 1; _victimSelection = _ctx->getUserConfig().victimSelection; - if (std::thread::hardware_concurrency() < topologyUniqueThreads.size() && _ctx->config.hyperthreadingEnabled) - topologyUniqueThreads.resize(_numCPPThreads); + if (std::thread::hardware_concurrency() < _topology.uniqueThreads.size() && _ctx->config.hyperthreadingEnabled) + _topology.uniqueThreads.resize(_numCPPThreads); _numThreads = _numCPPThreads + _numCUDAThreads; - _totalNumaDomains = std::set(topologyPhysicalIds.begin(), topologyPhysicalIds.end()).size(); + _totalNumaDomains = std::set(_topology.physicalIds.begin(), _topology.physicalIds.end()).size(); if (_ctx->getUserConfig().queueSetupScheme == QueueTypeOption::PERGROUP) { _queueMode = QueueTypeOption::PERGROUP; @@ -223,15 +178,15 @@ template class MTWrapperBase { // ToDo: use logger if (_ctx->config.debugMultiThreading) { std::cout << "topologyPhysicalIds:" << std::endl; - for (const auto &topologyEntry : topologyPhysicalIds) { + for (const auto &topologyEntry : _topology.physicalIds) { std::cout << topologyEntry << ','; } std::cout << std::endl << "topologyUniqueThreads:" << std::endl; - for (const auto &topologyEntry : topologyUniqueThreads) { + for (const auto &topologyEntry : _topology.uniqueThreads) { std::cout << topologyEntry << ','; } std::cout << std::endl << "topologyResponsibleThreads:" << std::endl; - for (const auto &topologyEntry : topologyResponsibleThreads) { + for (const auto &topologyEntry : _topology.responsibleThreads) { std::cout << topologyEntry << ','; } std::cout << std::endl << "_totalNumaDomains=" << _totalNumaDomains << std::endl; @@ -250,7 +205,8 @@ template class MTWrapper> : public MTWrapperBase ***, Structure **, DCTX(ctx)); - explicit MTWrapper(uint32_t numFunctions, DCTX(ctx)) : MTWrapperBase>(numFunctions, ctx) {} + explicit MTWrapper(uint32_t numFunctions, PipelineHWlocInfo topology, DCTX(ctx)) + : MTWrapperBase>(numFunctions, topology, ctx) {} [[maybe_unused]] void executeSingleQueue(std::vector> funcs, DenseMatrix ***res, const bool *isScalar, Structure **inputs, size_t numInputs, @@ -276,7 +232,8 @@ template class MTWrapper> : public MTWrapperBase ***, Structure **, DCTX(ctx)); - explicit MTWrapper(uint32_t numFunctions, DCTX(ctx)) : MTWrapperBase>(numFunctions, ctx) {} + explicit MTWrapper(uint32_t numFunctions, PipelineHWlocInfo topology, DCTX(ctx)) + : MTWrapperBase>(numFunctions, topology, ctx) {} [[maybe_unused]] void executeSingleQueue(std::vector> funcs, CSRMatrix ***res, const bool *isScalar, Structure **inputs, size_t numInputs, diff --git a/src/runtime/local/vectorized/MTWrapper_dense.cpp b/src/runtime/local/vectorized/MTWrapper_dense.cpp index a6b2cadfa..87320da08 100644 --- a/src/runtime/local/vectorized/MTWrapper_dense.cpp +++ b/src/runtime/local/vectorized/MTWrapper_dense.cpp @@ -143,7 +143,7 @@ template funcs, isScalar, inputs, numInputs, numOutputs, outRows, outCols, splits, combines, startChunk, endChunk, outRows, outCols, 0, ctx}, resLock, res), - this->topologyResponsibleThreads[i]); + this->_topology.responsibleThreads[i]); startChunk = endChunk; } } @@ -174,7 +174,7 @@ template funcs, isScalar, inputs, numInputs, numOutputs, outRows, outCols, splits, combines, startChunk, endChunk, outRows, outCols, 0, ctx}, resLock, res), - this->topologyUniqueThreads[target]); + this->_topology.uniqueThreads[target]); startChunk = endChunk; currentItr++; } diff --git a/src/runtime/local/vectorized/MTWrapper_sparse.cpp b/src/runtime/local/vectorized/MTWrapper_sparse.cpp index 40673eb84..3e1299152 100644 --- a/src/runtime/local/vectorized/MTWrapper_sparse.cpp +++ b/src/runtime/local/vectorized/MTWrapper_sparse.cpp @@ -89,7 +89,7 @@ void MTWrapper>::executeCpuQueues( funcs, isScalar, inputs, numInputs, numOutputs, outRows, outCols, splits, combines, startChunk, endChunk, outRows, outCols, 0, ctx}, dataSinks), - this->topologyResponsibleThreads[i]); + this->_topology.responsibleThreads[i]); startChunk = endChunk; } } diff --git a/src/runtime/local/vectorized/PipelineHWlocInfo.h b/src/runtime/local/vectorized/PipelineHWlocInfo.h new file mode 100644 index 000000000..f8a77bff1 --- /dev/null +++ b/src/runtime/local/vectorized/PipelineHWlocInfo.h @@ -0,0 +1,64 @@ +#pragma once + +#include "LoadPartitioningDefs.h" +#include +#include +#include + +struct PipelineHWlocInfo { + std::vector physicalIds; + std::vector uniqueThreads; + std::vector responsibleThreads; + bool hwloc_initialized = false; + QueueTypeOption queueSetupScheme = QueueTypeOption::CENTRALIZED; + + PipelineHWlocInfo(DaphneContext *ctx) : PipelineHWlocInfo(ctx->getUserConfig().queueSetupScheme) {} + PipelineHWlocInfo(QueueTypeOption qss) : queueSetupScheme(qss) { get_topology(); } + + void hwloc_recurse_topology(hwloc_topology_t topo, hwloc_obj_t obj, unsigned int parent_package_id) { + if (obj->type != HWLOC_OBJ_CORE) { + for (unsigned int i = 0; i < obj->arity; i++) { + hwloc_recurse_topology(topo, obj->children[i], parent_package_id); + } + } else { + physicalIds.push_back(parent_package_id); + for (unsigned int i = 0; i < obj->arity; i++) + uniqueThreads.push_back(obj->children[i]->os_index); + + switch (queueSetupScheme) { + case QueueTypeOption::CENTRALIZED: { + responsibleThreads.push_back(0); + } break; + case QueueTypeOption::PERGROUP: { + if (responsibleThreads.size() == parent_package_id) + responsibleThreads.push_back(obj->children[0]->os_index); + } break; + case QueueTypeOption::PERCPU: { + responsibleThreads.push_back(obj->os_index); + } break; + } + } + } + + void get_topology() { + if (hwloc_initialized) { + return; + } + + hwloc_topology_t topology = nullptr; + + hwloc_topology_init(&topology); + hwloc_topology_load(topology); + + hwloc_obj_t package = hwloc_get_next_obj_by_type(topology, HWLOC_OBJ_PACKAGE, nullptr); + + while (package != nullptr) { + auto package_id = package->os_index; + hwloc_recurse_topology(topology, package, package_id); + package = hwloc_get_next_obj_by_type(topology, HWLOC_OBJ_PACKAGE, package); + } + + hwloc_topology_destroy(topology); + hwloc_initialized = true; + } +}; diff --git a/src/runtime/local/vectorized/WorkerCPU.h b/src/runtime/local/vectorized/WorkerCPU.h index b364aee3b..d9fdd9578 100644 --- a/src/runtime/local/vectorized/WorkerCPU.h +++ b/src/runtime/local/vectorized/WorkerCPU.h @@ -61,6 +61,8 @@ class WorkerCPU : public Worker { } int currentDomain = _physical_ids[_threadID]; + ctx->logger->debug("Thread{}, _physical_ids.size()={}, capacity={}, currentDomain={}", _threadID, + _physical_ids.size(), _physical_ids.capacity(), currentDomain); int targetQueue = _threadID; if (_queueMode == QueueTypeOption::CENTRALIZED) { targetQueue = 0; diff --git a/test/runtime/local/vectorized/MultiThreadedKernelTest.cpp b/test/runtime/local/vectorized/MultiThreadedKernelTest.cpp index ed621f02a..1283ddf68 100644 --- a/test/runtime/local/vectorized/MultiThreadedKernelTest.cpp +++ b/test/runtime/local/vectorized/MultiThreadedKernelTest.cpp @@ -53,7 +53,8 @@ TEMPLATE_PRODUCT_TEST_CASE("Multi-threaded-scheduling", TAG_VECTORIZED, (DATA_TY ewBinaryMat(BinaryOpCode::ADD, r1, m1, m2, dctx.get()); // single-threaded - auto wrapper = std::make_unique>(1, dctx.get()); + static PipelineHWlocInfo topology{dctx->config.queueSetupScheme}; + auto wrapper = std::make_unique>(1, topology, dctx.get()); DT **outputs[] = {&r2}; bool isScalar[] = {false, false}; Structure *inputs[] = {m1, m2}; @@ -90,7 +91,8 @@ TEMPLATE_PRODUCT_TEST_CASE("Multi-threaded X+Y", TAG_VECTORIZED, (DATA_TYPES), ewBinaryMat(BinaryOpCode::ADD, r1, m1, m2, dctx.get()); // single-threaded - auto wrapper = std::make_unique>(1, dctx.get()); + static PipelineHWlocInfo topology{dctx->config.queueSetupScheme}; + auto wrapper = std::make_unique>(1, topology, dctx.get()); DT **outputs[] = {&r2}; bool isScalar[] = {false, false}; Structure *inputs[] = {m1, m2}; @@ -127,7 +129,8 @@ TEMPLATE_PRODUCT_TEST_CASE("Multi-threaded X*Y", TAG_VECTORIZED, (DATA_TYPES), ewBinaryMat(BinaryOpCode::MUL, r1, m1, m2, dctx.get()); // single-threaded - auto wrapper = std::make_unique>(1, dctx.get()); + static PipelineHWlocInfo topology{dctx->config.queueSetupScheme}; + auto wrapper = std::make_unique>(1, topology, dctx.get()); DT **outputs[] = {&r2}; bool isScalar[] = {false, false}; Structure *inputs[] = {m1, m2}; From 287f4c54f28e3accd9953c90c1144e3bf7eb6116 Mon Sep 17 00:00:00 2001 From: philipportner Date: Tue, 12 Nov 2024 22:30:08 +0100 Subject: [PATCH 7/7] [kernels] boolean scalar equality operators EwBinarySca was missing the implementation for boolean equality operators, meaning that simple equality checks for control flow using booleans was not working. Adds a testcase. Previous error: RewriteToCallKernelOpPass failed with the following message [ no kernel for operation `ewEq` available for the required input types `(i1, i1)` and output types `(i1)` A = rand(1,1, 0.0, 1.0, 1.0, -1); x = as.scalar(A[0,0]); terminate = false; while (terminate == false) { if (x > 0.5) { terminate = true; } x = x + 0.5; } print("done."); --- src/runtime/local/kernels/BinaryOpCode.h | 1 + src/runtime/local/kernels/kernels.json | 1 + test/api/cli/controlflow/ControlFlowTest.cpp | 4 ++-- test/api/cli/controlflow/while_17.daphne | 11 +++++++++++ test/api/cli/controlflow/while_17.txt | 1 + 5 files changed, 16 insertions(+), 2 deletions(-) create mode 100644 test/api/cli/controlflow/while_17.daphne create mode 100644 test/api/cli/controlflow/while_17.txt diff --git a/src/runtime/local/kernels/BinaryOpCode.h b/src/runtime/local/kernels/BinaryOpCode.h index 0a2d2d24d..30023586c 100644 --- a/src/runtime/local/kernels/BinaryOpCode.h +++ b/src/runtime/local/kernels/BinaryOpCode.h @@ -165,6 +165,7 @@ static constexpr bool supportsBinaryOp = false; // Concise specification of which binary operations should be supported on // which value types. +SUPPORT_EQUALITY(bool) SUPPORT_NUMERIC_FP(double) SUPPORT_NUMERIC_FP(float) SUPPORT_NUMERIC_INT(int64_t) diff --git a/src/runtime/local/kernels/kernels.json b/src/runtime/local/kernels/kernels.json index ae6b3c246..3afb4bc9d 100644 --- a/src/runtime/local/kernels/kernels.json +++ b/src/runtime/local/kernels/kernels.json @@ -2158,6 +2158,7 @@ ["int64_t", "int64_t", "int64_t"], ["uint64_t", "uint64_t", "uint64_t"], ["uint32_t", "uint32_t", "uint32_t"], + ["bool", "bool", "bool"], ["size_t", "size_t", "size_t"], ["const char *", "const char *", "const char *"], ["int64_t", "const char *", "const char *"] diff --git a/test/api/cli/controlflow/ControlFlowTest.cpp b/test/api/cli/controlflow/ControlFlowTest.cpp index 46794d2ee..3eb13c04c 100644 --- a/test/api/cli/controlflow/ControlFlowTest.cpp +++ b/test/api/cli/controlflow/ControlFlowTest.cpp @@ -51,7 +51,7 @@ const std::string dirPath = "test/api/cli/controlflow/"; MAKE_TEST_CASE("if", 8) MAKE_TEST_CASE("for", 23) -MAKE_TEST_CASE("while", 16) +MAKE_TEST_CASE("while", 17) MAKE_TEST_CASE("nested", 26) MAKE_FAILURE_TEST_CASE("stop", 2) @@ -70,4 +70,4 @@ TEST_CASE("loop-with-many-iterations_variadic-op", TAG_CONTROLFLOW) { exp << "Frame(1x1, [col_0:int64_t])" << std::endl << i << std::endl; compareDaphneToStr(exp.str(), dirPath + "for_manyiterations_2.daphne"); compareDaphneToStr(exp.str(), dirPath + "while_manyiterations_2.daphne"); -} \ No newline at end of file +} diff --git a/test/api/cli/controlflow/while_17.daphne b/test/api/cli/controlflow/while_17.daphne new file mode 100644 index 000000000..764f71c20 --- /dev/null +++ b/test/api/cli/controlflow/while_17.daphne @@ -0,0 +1,11 @@ +A = rand(1,1, 0.0, 1.0, 1.0, -1); +x = as.scalar(A[0,0]); + +terminate = false; +while (terminate == false) { + if (x > 0.5) { + terminate = true; + } + x = x + 0.5; +} +print("done."); diff --git a/test/api/cli/controlflow/while_17.txt b/test/api/cli/controlflow/while_17.txt new file mode 100644 index 000000000..70ff8e5af --- /dev/null +++ b/test/api/cli/controlflow/while_17.txt @@ -0,0 +1 @@ +done.