diff --git a/bladebit.vcxproj b/bladebit.vcxproj index 7391729c..f96c0fb6 100644 --- a/bladebit.vcxproj +++ b/bladebit.vcxproj @@ -260,6 +260,7 @@ + @@ -344,6 +345,12 @@ true true + + true + true + true + true + @@ -455,6 +462,7 @@ + diff --git a/bladebit.vcxproj.filters b/bladebit.vcxproj.filters index cb9ed00e..1bc262a5 100644 --- a/bladebit.vcxproj.filters +++ b/bladebit.vcxproj.filters @@ -213,6 +213,9 @@ src\util + + src + src @@ -462,6 +465,9 @@ src/util + + src + src @@ -557,6 +563,9 @@ src\test + + src\test + src\threading diff --git a/project.js b/project.js index f52a77ca..fea44ef5 100644 --- a/project.js +++ b/project.js @@ -158,6 +158,7 @@ bladebit.configs.test = () => config({ ,src: [ 'src/test/test_main.cpp' + ,'src/test/test_numa_sort.cpp' // ,'src/test/TestNuma.cpp' ] diff --git a/src/Globals.h b/src/Globals.h index 08ec613a..d172ac83 100644 --- a/src/Globals.h +++ b/src/Globals.h @@ -146,6 +146,10 @@ struct Span : values( values ) , length( length ) {} + + inline T& operator[]( unsigned int index ) const { return this->values[index]; } + inline T& operator[]( int index ) const { return this->values[index]; } + }; typedef Span ByteSpan; diff --git a/src/SysHost.h b/src/SysHost.h index 225e3507..a1ee203a 100644 --- a/src/SysHost.h +++ b/src/SysHost.h @@ -14,9 +14,10 @@ ImplementFlagOps( VProtect ); struct NumaInfo { - uint nodeCount; // How many NUMA nodes in the system - uint cpuCount; // Total cpu count used by nodes - Span* cpuIds; // CPU ids of each node + uint nodeCount; // How many NUMA nodes in the system + uint cpuCount; // Total cpu count used by nodes + Span* cpuIds; // CPU ids of each node + byte* cpuToNodeMap; // Gets the node a CPU belongs to for a given cpu id. }; class SysHost @@ -69,4 +70,10 @@ class SysHost /// Set interleave NUMA mode for the specified memory regions. /// NOTE: Pages must not yet be faulted. static bool NumaSetMemoryInterleavedMode( void* ptr, size_t size ); + + /// Get the node a memory page belongs to. + /// Returns a negative value upon failure. + /// NOTE: Pages must first be faulted on linuz. + static int NumaGetNodeFromPage( void* ptr ); + }; \ No newline at end of file diff --git a/src/algorithm/RadixSort.h b/src/algorithm/RadixSort.h index fee061e5..1efae2d4 100644 --- a/src/algorithm/RadixSort.h +++ b/src/algorithm/RadixSort.h @@ -43,14 +43,19 @@ class RadixSort256 template static void SortWithKey( ThreadPool& pool, T1* input, T1* tmp, TK* keyInput, TK* keyTmp, uint64 length ); + template + static void SortY( ThreadPool& pool, uint64* input, uint64* tmp, uint64 length ); + + template + static void SortYWithKey( ThreadPool& pool, uint64* input, uint64* tmp, uint32* keyInput, uint32* keyTmp, uint64 length ); + private: - template + template static void DoSort( ThreadPool& pool, T1* input, T1* tmp, TK* keyInput, TK* keyTmp, uint64 length ); - template + template static void RadixSortThread( SortJob* job ); - }; @@ -69,7 +74,21 @@ inline void RadixSort256::SortWithKey( ThreadPool& pool, T1* input, T1* tmp, TK* } //----------------------------------------------------------- -template +template +inline void RadixSort256::SortY( ThreadPool& pool, uint64* input, uint64* tmp, uint64 length ) +{ + DoSort( pool, input, tmp, nullptr, nullptr, length ); +} + +//----------------------------------------------------------- +template +inline void RadixSort256::SortYWithKey( ThreadPool& pool, uint64* input, uint64* tmp, uint32* keyInput, uint32* keyTmp, uint64 length ) +{ + DoSort( pool, input, tmp, keyInput, keyTmp, length ); +} + +//----------------------------------------------------------- +template void inline RadixSort256::DoSort( ThreadPool& pool, T1* input, T1* tmp, TK* keyInput, TK* keyTmp, uint64 length ) { const uint threadCount = ThreadCount > pool.ThreadCount() ? pool.ThreadCount() : ThreadCount; @@ -110,22 +129,22 @@ void inline RadixSort256::DoSort( ThreadPool& pool, T1* input, T1* tmp, TK* keyI jobs[threadCount-1].length += trailingEntries; if constexpr ( Mode == SortAndGenKey ) - pool.RunJob( RadixSortThread, jobs, threadCount ); + pool.RunJob( RadixSortThread, jobs, threadCount ); else - pool.RunJob( RadixSortThread, jobs, threadCount ); + pool.RunJob( RadixSortThread, jobs, threadCount ); } #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-but-set-variable" //----------------------------------------------------------- -template +template void RadixSort256::RadixSortThread( SortJob* job ) { constexpr uint Radix = 256; - const uint32 iterations = sizeof( T1 ); - const uint32 shiftBase = 8; + constexpr uint32 iterations = MaxIter > 0 ? MaxIter : sizeof( T1 ); + const uint32 shiftBase = 8; uint32 shift = 0; diff --git a/src/memplot/FxSort.h b/src/memplot/FxSort.h index 25ba3f29..06bfce06 100644 --- a/src/memplot/FxSort.h +++ b/src/memplot/FxSort.h @@ -42,7 +42,7 @@ inline void SortFx( GenSortKey( pool, length, sortKey ); // Sort on y along with the sort key - RadixSort256::SortWithKey( pool, + RadixSort256::SortYWithKey( pool, yBuffer, yTmp, sortKey, sortKeyTmp, length diff --git a/src/memplot/MemPhase1.cpp b/src/memplot/MemPhase1.cpp index 115e3f59..a4d3cbe9 100644 --- a/src/memplot/MemPhase1.cpp +++ b/src/memplot/MemPhase1.cpp @@ -8,7 +8,7 @@ #include #include "DbgHelper.h" - +#include bool DbgVerifySortedY( const uint64 entryCount, const uint64* yBuffer ); @@ -28,6 +28,12 @@ /// struct F1GenJob { + uint node; + uint threadCount; + uint cpuId; + uint startPage; + uint pageCount; + const byte* key; uint32 blockCount; @@ -73,6 +79,7 @@ struct FpFxJob /// Internal Funcs forwards-declares void F1JobThread( F1GenJob* job ); +void F1NumaJobThread( F1GenJob* job ); void FpScanThread( kBCJob* job ); void FpPairThread( kBCJob* job ); @@ -189,11 +196,15 @@ uint64 MemPhase1::GenerateF1() ASSERT( entriesPerBlock * sizeof( uint32 ) == CHACHA_BLOCK_SIZE ); // Must fit exactly within a block // Generate all of the y values to a metabuffer first - byte* blocks = (byte*)cx.metaBuffer1; + byte* blocks = (byte*)cx.yBuffer0; uint64* yBuffer = cx.yBuffer0; uint32* xBuffer = cx.t1XBuffer; + uint64* yTmp = cx.metaBuffer1; + uint32* xTmp = (uint32*)(yTmp + totalEntries); + + ASSERT( numThreads <= MAX_THREADS ); - ASSERT( numThreads < MAX_THREADS ); + // const NumaInfo* numa = SysHost::GetNUMAInfo(); // Gen all raw f1 values { @@ -205,21 +216,59 @@ uint64 MemPhase1::GenerateF1() uint64 blockOffset = i * blocksPerThread * CHACHA_BLOCK_SIZE; F1GenJob& job = jobs[i]; + job.cpuId = i; + job.threadCount = numThreads; + job.key = key; job.blockCount = blocksPerThread; job.entryCount = (uint32)entriesPerThread; job.x = (uint32)offset; job.blocks = blocks + blockOffset; - job.yBuffer = yBuffer + offset; - job.xBuffer = xBuffer + offset; + job.yBuffer = yTmp + offset; + job.xBuffer = xTmp + offset; } jobs[numThreads-1].entryCount += trailingEntries; jobs[numThreads-1].blockCount += trailingBlocks; + // Initialize NUMA pages + // if( numa ) + // { + // const uint pageSize = (uint)SysHost::GetPageSize(); + // const uint blocksPerPage = (uint)( pageSize / CHACHA_BLOCK_SIZE ); + // const uint pageCount = (uint)( totalBlocks / blocksPerPage ); + // const uint pagesPerThread = pageCount / numThreads; + // const uint nodeStride = numa->nodeCount; + + // for( uint i = 0; i < numa->nodeCount; i++ ) + // { + // const auto& nodeCpus = numa->cpuIds[i]; + // const uint cpuCount = nodeCpus.length; + + // // #TODO: Remove this. For now we hard-code it to + // // the node of the first page. + // // const int pageOffset = (i + 1) & 1; + + // for( uint j = 0; j < cpuCount; j++ ) + // { + // const uint cpuId = nodeCpus[j]; + + // auto& job = jobs[cpuId]; + + // job.node = i; + // job.startPage = nodeStride * j; + // job.pageCount = pagesPerThread; + // job.blocks = blocks; + // job.yBuffer = yBuffer; + // job.xBuffer = xBuffer; + // } + // } + // } + Log::Line( "Generating F1..." ); auto timeStart = TimerBegin(); + // cx.threadPool->RunJob( numa ? F1NumaJobThread : F1JobThread, jobs, numThreads ); cx.threadPool->RunJob( F1JobThread, jobs, numThreads ); double elapsed = TimerEnd( timeStart ); @@ -229,12 +278,9 @@ uint64 MemPhase1::GenerateF1() Log::Line( "Sorting F1..." ); auto timeStart = TimerBegin(); - uint64* yTmp = cx.metaBuffer1; - uint32* xTmp = (uint32*)(yTmp + totalEntries); - - RadixSort256::SortWithKey( *cx.threadPool, - yBuffer, yTmp, - xBuffer, xTmp, + RadixSort256::SortYWithKey( *cx.threadPool, + yTmp, yBuffer, + xTmp, xBuffer, totalEntries ); double elapsed = TimerEnd( timeStart ); @@ -395,14 +441,15 @@ uint64 MemPhase1::FpComputeSingleTable( // Use table 7's buffers as a temporary buffer uint32* sortKey = cx.t7YBuffer; - uint64* yTmp = metaBuffer.write; + // uint64* yTmp = metaBuffer.write; uint32* sortKeyTmp = (uint32*)( metaBuffer.write + ENTRIES_PER_TABLE ); // Use the output metabuffer for now as // the temporary sortkey buffer. SortFx( *cx.threadPool, pairCount, - (uint64*)yBuffer.read, yTmp, - sortKey, sortKeyTmp + (uint64*)yBuffer.read, yBuffer.write, + sortKeyTmp, sortKey ); + yBuffer.Swap(); // DbgVerifyPairsKBCGroups( pairCount, yBuffer.write, unsortedPairBuffer ); @@ -414,10 +461,6 @@ uint64 MemPhase1::FpComputeSingleTable( // DbgVerifyPairsKBCGroups( pairCount, yBuffer.write, pairBuffer ); - // #NOTE: We don't swap the y buffer because - // sorting leaves yBuffer.read sorted in its places. - // So it is already swapped for the next table to read it. - // Use the sorted metabuffer as the read buffer for the next table metaBuffer.Swap(); @@ -489,6 +532,116 @@ void F1JobThread( F1GenJob* job ) } +//----------------------------------------------------------- +void F1NumaJobThread( F1GenJob* job ) +{ + // const NumaInfo* numa = SysHost::GetNUMAInfo(); + + const uint32 pageSize = (uint32)SysHost::GetPageSize(); + + // const uint k = _K; + const size_t CHACHA_BLOCK_SIZE = kF1BlockSizeBits / 8; + // const uint64 totalEntries = 1ull << k; + const uint32 entriesPerBlock = (uint32)( CHACHA_BLOCK_SIZE / sizeof( uint32 ) ); + const uint32 blocksPerPage = pageSize / CHACHA_BLOCK_SIZE; + const uint32 entriesPerPage32 = entriesPerBlock * blocksPerPage; + const uint32 entriesPerPage64 = entriesPerPage32 / 2; + + const uint pageOffset = job->startPage; + const uint pageCount = job->pageCount; + + const uint32 pageStride = job->threadCount; + const uint32 blockStride = pageSize * pageStride; + const uint32 entryStride32 = entriesPerPage32 * pageStride; + const uint32 entryStride64 = entriesPerPage64 * pageStride; + + // #TODO: Get proper offset depending on node count. Or, figure out if we can always have + // the pages of the buffers simply start at the same location + const uint32 blockStartPage = SysHost::NumaGetNodeFromPage( job->blocks ) == job->node ? 0 : 1; + const uint32 yStartPage = SysHost::NumaGetNodeFromPage( job->yBuffer ) == job->node ? 0 : 1; + const uint32 xStartPage = SysHost::NumaGetNodeFromPage( job->xBuffer ) == job->node ? 0 : 1; + + // const uint64 x = job->x; + const uint64 x = (blockStartPage + pageOffset) * entriesPerPage32; + // const uint32 xStride = pageStride * entriesPerPage; + + byte* blockBytes = job->blocks + (blockStartPage + pageOffset) * pageSize; + uint32* blocks = (uint32*)blockBytes; + + uint64* yBuffer = job->yBuffer + ( yStartPage + pageOffset ) * entriesPerPage64; + uint32* xBuffer = job->xBuffer + ( xStartPage + pageOffset ) * entriesPerPage32; + + chacha8_ctx chacha; + ZeroMem( &chacha ); + + chacha8_keysetup( &chacha, job->key, 256, NULL ); + + for( uint64 p = 0; p < pageCount/4; p+=4 ) + { + ASSERT( SysHost::NumaGetNodeFromPage( blockBytes ) == job->node ); + + // blockBytes + // Which block are we generating? + const uint64 blockIdx = ( x + p * entryStride32 ) * _K / kF1BlockSizeBits; + + chacha8_get_keystream( &chacha, blockIdx, blocksPerPage, blockBytes ); + chacha8_get_keystream( &chacha, blockIdx + blocksPerPage, blocksPerPage, blockBytes + blockStride ); + chacha8_get_keystream( &chacha, blockIdx + blocksPerPage*2, blocksPerPage, blockBytes + blockStride * 2 ); + chacha8_get_keystream( &chacha, blockIdx + blocksPerPage*3, blocksPerPage, blockBytes + blockStride * 3 ); + blockBytes += blockStride * 4; + } + + for( uint64 p = 0; p < pageCount; p++ ) + { + ASSERT( SysHost::NumaGetNodeFromPage( yBuffer ) == job->node ); + ASSERT( SysHost::NumaGetNodeFromPage( blocks ) == job->node ); + + const uint64 curX = x + p * entryStride32; + + for( uint64 i = 0; i < entriesPerPage32; i++ ) + { + // chacha output is treated as big endian, therefore swap, as required by chiapos + const uint64 y = Swap32( blocks[i] ); + yBuffer[i] = ( y << kExtraBits ) | ( (curX+i) >> (_K - kExtraBits) ); + } + + // for( uint64 i = 0; i < 64; i++ ) + // { + // yBuffer[0] = ( Swap32( blocks[0] ) << kExtraBits ) | ( (curX+0) >> (_K - kExtraBits) ); + // yBuffer[1] = ( Swap32( blocks[1] ) << kExtraBits ) | ( (curX+1) >> (_K - kExtraBits) ); + // yBuffer[2] = ( Swap32( blocks[2] ) << kExtraBits ) | ( (curX+2) >> (_K - kExtraBits) ); + // yBuffer[3] = ( Swap32( blocks[3] ) << kExtraBits ) | ( (curX+3) >> (_K - kExtraBits) ); + // yBuffer[4] = ( Swap32( blocks[4] ) << kExtraBits ) | ( (curX+4) >> (_K - kExtraBits) ); + // yBuffer[5] = ( Swap32( blocks[5] ) << kExtraBits ) | ( (curX+5) >> (_K - kExtraBits) ); + // yBuffer[6] = ( Swap32( blocks[6] ) << kExtraBits ) | ( (curX+6) >> (_K - kExtraBits) ); + // yBuffer[7] = ( Swap32( blocks[7] ) << kExtraBits ) | ( (curX+7) >> (_K - kExtraBits) ); + + // yBuffer += 8; + // blocks += 8; + // } + + // #TODO: This is wrong. We need to fill more y's before w go to the next block page. + yBuffer += entryStride64; + blocks += entryStride32; + } + + // Gen the x that generated the y + for( uint64 p = 0; p < pageCount; p++ ) + { + ASSERT( SysHost::NumaGetNodeFromPage( xBuffer ) == job->node ); + + const uint32 curX = (uint32)(x + p * entryStride32); + + for( uint32 i = 0; i < entriesPerPage32; i++ ) + xBuffer[i] = curX + i; + + xBuffer += entryStride32; + } + + // #TODO: Process last part +} + + /// /// kBC groups & matching /// diff --git a/src/memplot/MemPlotter.cpp b/src/memplot/MemPlotter.cpp index 8f777713..338730da 100644 --- a/src/memplot/MemPlotter.cpp +++ b/src/memplot/MemPlotter.cpp @@ -21,17 +21,11 @@ MemPlotter::MemPlotter( uint threadCount, bool warmStart, bool noNUMA ) if( numa && numa->nodeCount < 2 ) numa = nullptr; - - if( numa ) - { - if( !SysHost::NumaSetThreadInterleavedMode() ) - Log::Error( "Warning: Failed to set NUMA interleaved mode." ); - } _context.threadCount = threadCount; // Create a thread pool - _context.threadPool = new ThreadPool( threadCount, ThreadPool::Mode::Greedy ); + _context.threadPool = new ThreadPool( threadCount, ThreadPool::Mode::Fixed );//ThreadPool::Mode::Greedy ); // Allocate buffers { diff --git a/src/platform/linux/SysHost_Linux.cpp b/src/platform/linux/SysHost_Linux.cpp index 34bb356c..8cc299e5 100644 --- a/src/platform/linux/SysHost_Linux.cpp +++ b/src/platform/linux/SysHost_Linux.cpp @@ -9,9 +9,9 @@ #include #include -#if _DEBUG +// #if _DEBUG #include "util/Log.h" -#endif +// #endif std::atomic _crashed = false; @@ -92,7 +92,7 @@ void SysHost::VirtualFree( void* ptr ) const size_t pageSize = GetPageSize(); - byte* realPtr = ((byte*)ptr) - pageSize; + byte* realPtr = ((byte*)ptr) - pageSize; const size_t size = *((size_t*)realPtr); munmap( realPtr, size ); @@ -295,16 +295,30 @@ const NumaInfo* SysHost::GetNUMAInfo() // #TODO BUG: This is a memory leak, // but we're getting crashes releasing it or - // using it multiple times with numa_node_to_cpus on - // a signle allocations. + // using it multiple times with numa_node_to_cpus on a single allocation. // Fix it. (Not fatal as it is a small allocation, and this has re-entry protection) // numa_free_cpumask( cpuMask ); } + // Map cpus to nodes + if( nodeCount > 255 ) + Fatal( "Too many NUMA nodes." ); + + byte* cpuMap = (byte*)malloc( totalCpuCount ); + + for( uint i = 0; i < nodeCount; i++ ) + { + const auto& cpuId = cpuIds[i]; + + for( uint j = 0; j < cpuId.length; j++ ) + cpuMap[cpuId[j]] = (byte)i; + } + // Save instance - _info.nodeCount = nodeCount; - _info.cpuCount = totalCpuCount; - _info.cpuIds = cpuIds; + _info.nodeCount = nodeCount; + _info.cpuCount = totalCpuCount; + _info.cpuIds = cpuIds; + _info.cpuToNodeMap = cpuMap; info = &_info; } @@ -364,9 +378,33 @@ bool SysHost::NumaSetMemoryInterleavedMode( void* ptr, size_t size ) if( r ) { int err = errno; - Log::Error( "Warning: set_mempolicy() failed with error %d (0x%x).", err, err ); + Log::Error( "Warning: mbind() failed with error %d (0x%x).", err, err ); } #endif return r == 0; +} + +//----------------------------------------------------------- +int SysHost::NumaGetNodeFromPage( void* ptr ) +{ + const NumaInfo* numa = GetNUMAInfo(); + if( !numa ) + return -1; + + int node = -1; + int r = numa_move_pages( 0, 1, &ptr, nullptr, &node, 0 ); + + if( r ) + { + int err = errno; + Log::Error( "Warning: numa_move_pages() failed with error %d (0x%x).", err, err ); + } + else if( node < 0 ) + { + int err = std::abs( node ); + Log::Error( "Warning: numa_move_pages() node retrieval failed with error %d (0x%x).", err, err ); + } + + return node; } \ No newline at end of file diff --git a/src/platform/macos/SysHost_Macos.cpp b/src/platform/macos/SysHost_Macos.cpp index 3141823a..52991e84 100644 --- a/src/platform/macos/SysHost_Macos.cpp +++ b/src/platform/macos/SysHost_Macos.cpp @@ -53,8 +53,10 @@ void* SysHost::VirtualAlloc( size_t size, bool initialize ) // #TODO: Use vm_allocate // #TODO: Consider initialize + const size_t pageSize = (size_t)getpagesize(); + // Align size to page boundary - size = RoundUpToNextBoundary( size, (int)GetPageSize() ); + size = RoundUpToNextBoundary( size, (int)pageSize ); void* ptr = mmap( NULL, size, PROT_READ | PROT_WRITE, @@ -72,6 +74,22 @@ void* SysHost::VirtualAlloc( size_t size, bool initialize ) return nullptr; } + else if( initialize ) + { + // Initialize memory + // (since physical pages are not allocated until the actual pages are accessed) + + byte* page = (byte*)ptr; + + const size_t pageCount = size / pageSize; + const byte* endPage = page + pageCount * pageSize; + + do + { + *page = 0; + page += pageSize; + } while( page < endPage ); + } return ptr; } diff --git a/src/test/test_main.cpp b/src/test/test_main.cpp index 906f5c99..9d117c6d 100644 --- a/src/test/test_main.cpp +++ b/src/test/test_main.cpp @@ -4,7 +4,7 @@ void TestNuma( int argc, const char* argv[] ); //----------------------------------------------------------- int main( int argc, const char* argv[] ) { - // TestNuma( argc-1, argv+1 ); + TestNuma( argc-1, argv+1 ); return 0; } \ No newline at end of file diff --git a/src/threading/Thread.cpp b/src/threading/Thread.cpp index 772a51e6..a1eafbe9 100644 --- a/src/threading/Thread.cpp +++ b/src/threading/Thread.cpp @@ -10,24 +10,47 @@ typedef void* (*PthreadFunc)( void* param ); //----------------------------------------------------------- Thread::Thread( size_t stackSize ) { - // Configure stack size - if( stackSize < 1024 * 4 ) + Init( nullptr, stackSize ); +} + +//----------------------------------------------------------- +Thread::Thread( void* stack, size_t stackSize ) +{ + if( !stack ) + Fatal( "Null stack." ); + + Init( stack, stackSize ); +} + +//----------------------------------------------------------- +void Thread::Init( void* stack, size_t stackSize ) +{ + if( !stack && stackSize < 1024 * 4 ) Fatal( "Thread stack size is too small." ); - - // Align to 8 bytes - stackSize = RoundUpToNextBoundary( stackSize, 8 ); #if PLATFORM_IS_UNIX _state.store( ThreadState::ReadyToRun, std::memory_order_release ); - pthread_attr_t attr; + pthread_attr_t attr; int r = pthread_attr_init( &attr ); if( r ) Fatal( "pthread_attr_init() failed." ); + + if( stack ) + { + r = pthread_attr_setstack( &attr, stack, stackSize ); + if( r ) Fatal( "pthread_attr_setstack() failed." ); + } + else + { + const size_t pageSize = SysHost::GetPageSize(); + stackSize = RoundUpToNextBoundary( stackSize, (int)pageSize ); + + r = pthread_attr_setstacksize( &attr, stackSize ); + if( r ) Fatal( "pthread_attr_setstacksize() failed." ); + } - r = pthread_attr_setstacksize( &attr, stackSize ); - if( r ) Fatal( "pthread_attr_setstacksize() failed." ); // Initialize suspended mode signal r = pthread_cond_init( &_launchCond, NULL ); @@ -236,4 +259,5 @@ void* Thread::ThreadStarter( Thread* t ) #endif return nullptr; -} \ No newline at end of file +} + diff --git a/src/threading/Thread.h b/src/threading/Thread.h index 51940ba3..92a58b90 100644 --- a/src/threading/Thread.h +++ b/src/threading/Thread.h @@ -8,6 +8,7 @@ class Thread { public: + Thread( void* stack, size_t stackSize ); Thread( size_t stackSize ); Thread(); ~Thread(); @@ -25,6 +26,8 @@ class Thread bool HasExited() const; private: + void Init( void* stack, size_t stackSize ); + static void* ThreadStarter( Thread* thread ); private: diff --git a/src/threading/ThreadPool.cpp b/src/threading/ThreadPool.cpp index f89b20a0..a41a0c03 100644 --- a/src/threading/ThreadPool.cpp +++ b/src/threading/ThreadPool.cpp @@ -14,9 +14,15 @@ ThreadPool::ThreadPool( uint threadCount, Mode mode ) if( threadCount < 1 ) Fatal( "threadCount must be greater than 0." ); - _threads = new Thread [threadCount]; + _threads = (Thread*)malloc( sizeof(Thread) * threadCount ); _threadData = new ThreadData[threadCount]; + // #TODO: Allocate a stack for NUMA systems on greedy mode? + const NumaInfo* numa = SysHost::GetNUMAInfo(); + const size_t pageSize = SysHost::GetPageSize(); + const bool allocStack = numa && numa->nodeCount > 1 && mode == Mode::Fixed; + const size_t stackSize = 8 MB; + auto threadRunner = mode == Mode::Fixed ? FixedThreadRunner : GreedyThreadRunner; for( uint i = 0; i < threadCount; i++ ) @@ -24,10 +30,27 @@ ThreadPool::ThreadPool( uint threadCount, Mode mode ) _threadData[i].index = (int)i; _threadData[i].cpuId = i; _threadData[i].pool = this; - - Thread& t = _threads[i]; - t.Run( threadRunner, &_threadData[i] ); + Thread* t; + + if( allocStack ) + { + // Allocate a stack on the thread's NUMA node + void* stack = SysHost::VirtualAlloc( stackSize, false ); + if( !stack ) + Fatal( "Failed to allocate stack for thread." ); + + const uint node = numa->cpuToNodeMap[i]; ASSERT( node < numa->nodeCount ); + SysHost::NumaAssignPages( stack, stackSize, node ); + + t = new ( (void*) (_threads+i) ) Thread( stack, stackSize ); + } + else + { + t = new ( (void*) (_threads+i) ) Thread( stackSize ); + } + + t->Run( threadRunner, &_threadData[i] ); } } @@ -53,7 +76,10 @@ ThreadPool::~ThreadPool() // #TODO: Signal thread for exit. // #TODO: Wait for all threads to exit - delete[] _threads; + for( uint i = 0; i < _threadCount; i++ ) + _threads[i].~Thread(); + + free( _threads ); delete[] _threadData; _threads = nullptr; @@ -147,6 +173,13 @@ void ThreadPool::FixedThreadRunner( void* tParam ) const uint index = (uint)d.index; + #if _DEBUG + const NumaInfo* numa = SysHost::GetNUMAInfo(); + uint node = numa->cpuToNodeMap[d.cpuId]; + ASSERT( node >= 0 && node < numa->nodeCount ); + ASSERT( SysHost::NumaGetNodeFromPage( (void*)&index ) == node ); + #endif + std::atomic& exitSignal = pool._exitSignal; Semaphore& poolSignal = pool._poolSignal; Semaphore& jobSignal = d.jobSignal;