From 7c8c456a4331c35544a2656571e1d87df54d90e2 Mon Sep 17 00:00:00 2001 From: Maxim Egorushkin Date: Mon, 12 Feb 2024 22:04:08 +0000 Subject: [PATCH] Stateful allocator support added. Resolves #45. --- include/atomic_queue/atomic_queue.h | 105 ++++++++++++++++------------ src/huge_pages.h | 12 +++- 2 files changed, 69 insertions(+), 48 deletions(-) diff --git a/include/atomic_queue/atomic_queue.h b/include/atomic_queue/atomic_queue.h index 432b914..94d7a56 100644 --- a/include/atomic_queue/atomic_queue.h +++ b/include/atomic_queue/atomic_queue.h @@ -420,8 +420,9 @@ class AtomicQueue2 : public AtomicQueueCommon, T NIL = details::nil(), bool MAXIMIZE_THROUGHPUT = true, bool TOTAL_ORDER = false, bool SPSC = false> -class AtomicQueueB : public AtomicQueueCommon>, - private std::allocator_traits::template rebind_alloc> { +class AtomicQueueB : private std::allocator_traits::template rebind_alloc>, + public AtomicQueueCommon> { + using AllocatorElements = typename std::allocator_traits::template rebind_alloc>; using Base = AtomicQueueCommon>; friend Base; @@ -429,8 +430,6 @@ class AtomicQueueB : public AtomicQueueCommon::template rebind_alloc>; - static constexpr auto ELEMENTS_PER_CACHE_LINE = CACHE_LINE_SIZE / sizeof(std::atomic); static_assert(ELEMENTS_PER_CACHE_LINE, "Unexpected ELEMENTS_PER_CACHE_LINE."); @@ -454,11 +453,13 @@ class AtomicQueueB : public AtomicQueueCommon{NIL}.is_lock_free()); // Queue element type T is not atomic. Use AtomicQueue2/AtomicQueueB2 for such element types. for(auto p = elements_, q = elements_ + size_; p < q; ++p) @@ -466,13 +467,11 @@ class AtomicQueueB : public AtomicQueueCommon(b)) - , AllocatorElements(static_cast(b)) // TODO: This must be noexcept, static_assert that. - , size_(b.size_) - , elements_(b.elements_) { - b.size_ = 0; - b.elements_ = 0; - } + : AllocatorElements(static_cast(b)) // TODO: This must be noexcept, static_assert that. + , Base(static_cast(b)) + , size_(std::exchange(b.size_, 0)) + , elements_(std::exchange(b.elements_, nullptr)) + {} AtomicQueueB& operator=(AtomicQueueB&& b) noexcept { b.swap(*this); @@ -484,10 +483,14 @@ class AtomicQueueB : public AtomicQueueCommonBase::swap(b); swap(static_cast(*this), static_cast(b)); + Base::swap(b); swap(size_, b.size_); swap(elements_, b.elements_); } @@ -500,27 +503,25 @@ class AtomicQueueB : public AtomicQueueCommon, bool MAXIMIZE_THROUGHPUT = true, bool TOTAL_ORDER = false, bool SPSC = false> -class AtomicQueueB2 : public AtomicQueueCommon>, - private A, - private std::allocator_traits::template rebind_alloc> { +class AtomicQueueB2 : private std::allocator_traits::template rebind_alloc, + public AtomicQueueCommon> { + using StorageAllocator = typename std::allocator_traits::template rebind_alloc; using Base = AtomicQueueCommon>; using State = typename Base::State; + using AtomicState = std::atomic; friend Base; static constexpr bool total_order_ = TOTAL_ORDER; static constexpr bool spsc_ = SPSC; static constexpr bool maximize_throughput_ = MAXIMIZE_THROUGHPUT; - using AllocatorElements = A; - using AllocatorStates = typename std::allocator_traits::template rebind_alloc>; - // AtomicQueueCommon members are stored into by readers and writers. // Allocate these immutable members on another cache line which never gets invalidated by stores. alignas(CACHE_LINE_SIZE) unsigned size_; - std::atomic* states_; + AtomicState* states_; T* elements_; - static constexpr auto STATES_PER_CACHE_LINE = CACHE_LINE_SIZE / sizeof(State); + static constexpr auto STATES_PER_CACHE_LINE = CACHE_LINE_SIZE / sizeof(AtomicState); static_assert(STATES_PER_CACHE_LINE, "Unexpected STATES_PER_CACHE_LINE."); static constexpr auto SHUFFLE_BITS = details::GetCacheLineIndexBits::value; @@ -537,34 +538,43 @@ class AtomicQueueB2 : public AtomicQueueCommon(element), states_[index], elements_[index]); } + template + U* allocate_() { + U* p = reinterpret_cast(StorageAllocator::allocate(size_ * sizeof(U))); + assert(reinterpret_cast(p) % alignof(U) == 0); // Allocated storage must be suitably aligned for U. + return p; + } + + template + void deallocate_(U* p) noexcept { + StorageAllocator::deallocate(reinterpret_cast(p), size_ * sizeof(U)); // TODO: This must be noexcept, static_assert that. + } + public: using value_type = T; + using allocator_type = A; // The special member functions are not thread-safe. - AtomicQueueB2(unsigned size) - : size_(std::max(details::round_up_to_power_of_2(size), 1u << (SHUFFLE_BITS * 2))) - , states_(AllocatorStates::allocate(size_)) - , elements_(AllocatorElements::allocate(size_)) { + AtomicQueueB2(unsigned size, A const& allocator = A{}) + : StorageAllocator(allocator) + , size_(std::max(details::round_up_to_power_of_2(size), 1u << (SHUFFLE_BITS * 2))) + , states_(allocate_()) + , elements_(allocate_()) { for(auto p = states_, q = states_ + size_; p < q; ++p) p->store(Base::EMPTY, X); - - AllocatorElements& ae = *this; + A a = get_allocator(); for(auto p = elements_, q = elements_ + size_; p < q; ++p) - std::allocator_traits::construct(ae, p); + std::allocator_traits::construct(a, p); } AtomicQueueB2(AtomicQueueB2&& b) noexcept - : Base(static_cast(b)) - , AllocatorElements(static_cast(b)) // TODO: This must be noexcept, static_assert that. - , AllocatorStates(static_cast(b)) // TODO: This must be noexcept, static_assert that. - , size_(b.size_) - , states_(b.states_) - , elements_(b.elements_) { - b.size_ = 0; - b.states_ = 0; - b.elements_ = 0; - } + : StorageAllocator(static_cast(b)) // TODO: This must be noexcept, static_assert that. + , Base(static_cast(b)) + , size_(std::exchange(b.size_, 0)) + , states_(std::exchange(b.states_, nullptr)) + , elements_(std::exchange(b.elements_, nullptr)) + {} AtomicQueueB2& operator=(AtomicQueueB2&& b) noexcept { b.swap(*this); @@ -573,19 +583,22 @@ class AtomicQueueB2 : public AtomicQueueCommon::destroy(ae, p); - AllocatorElements::deallocate(elements_, size_); // TODO: This must be noexcept, static_assert that. - AllocatorStates::deallocate(states_, size_); // TODO: This must be noexcept, static_assert that. + std::allocator_traits::destroy(a, p); + deallocate_(elements_); + deallocate_(states_); } } + A get_allocator() const noexcept { + return *this; + } + void swap(AtomicQueueB2& b) noexcept { using std::swap; - this->Base::swap(b); - swap(static_cast(*this), static_cast(b)); - swap(static_cast(*this), static_cast(b)); + swap(static_cast(*this), static_cast(b)); + Base::swap(b); swap(size_, b.size_); swap(states_, b.states_); swap(elements_, b.elements_); diff --git a/src/huge_pages.h b/src/huge_pages.h index 8e1e9ae..fa8806f 100644 --- a/src/huge_pages.h +++ b/src/huge_pages.h @@ -143,6 +143,12 @@ struct HugePageAllocator : HugePageAllocatorBase using value_type = T; + HugePageAllocator() noexcept = default; + + template + HugePageAllocator(HugePageAllocator) noexcept + {} + T* allocate(size_t n) const { return static_cast(hp->allocate(n * sizeof(T))); } @@ -151,11 +157,13 @@ struct HugePageAllocator : HugePageAllocatorBase hp->deallocate(p, n * sizeof(T)); } - bool operator==(HugePageAllocator b) const { + template + bool operator==(HugePageAllocator b) const { return hp == b.hp; } - bool operator!=(HugePageAllocator b) const { + template + bool operator!=(HugePageAllocator b) const { return hp != b.hp; } };