You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Herb Sutter在DDJ pillars of concurrency一文中抛出并行编程的三个简单论点,一是分离任务,使用更细粒度的锁或者无锁编程;二是尽量通过并行任务使用CPU资源,以提高系统吞吐量及扩展性;三是保证对共享资源访问的一致性。第三点已经被atomic、mutex、lock、condition_variable解决了,第一点和第二点则可以归结为如何对任务进行粒度划分并投递到任务的执行单元中去调度执行。任务划分依赖于各种不同业务的理解,例如网络和渲染,很难抽取出其共性。而任务的调度执行则是一种通用的结构,可以分为四个部分:
// load with 'consume' (data-dependent) memory orderingtemplate<typenameT>Tload_consume(Tconst*addr){// hardware fence is implicit on x86Tv=*const_cast<Tconstvolatile*>(addr);__memory_barrier();// compiler fencereturnv;}
// store with 'release' memory ordering template<typenameT> voidstore_release(Taddr,Tv) { // hardware fence is implicit on x86 __memory_barrier();// compiler fence const_cast<Tvolatile*>(addr)=v; }
// cache line size on modern x86 processors (in bytes) size_tconstcache_line_size=64;
// consumer part // accessed mainly by consumer, infrequently be producer node*tail_;// tail of the queue
// delimiter between consumer part and producer part, // so that they situated on different cache lines charcache_line_pad_[cache_line_size];
// producer part // accessed only by producer nodehead_;// head of the queue nodefirst_;// last unused node (tail of node cache) node*tail_copy_;// helper (points somewhere between first_ and tail_)
node*alloc_node() { // first tries to allocate node from internal node cache, // if attempt fails, allocates node via ::operator new()
/* * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
/* * ProducerConsumerQueue is a one producer and one consumer queue * without locks. */ template<classT> structProducerConsumerQueue{ typedefTvalue_type;
// size must be >= 2. // // Also, note that the number of usable slots in the queue at any // given time is actually (size-1), so if you start with an empty queue, // isFull() will return true after size-1 insertions. explicitProducerConsumerQueue(uint32_tsize) :size_(size) ,records_(static_cast<T>(std::malloc(sizeof(T)size))) ,readIndex_(0) ,writeIndex_(0) { assert(size>=2); if(!records_){ throwstd::bad_alloc(); } }
ProducerConsumerQueue(){ // We need to destruct anything that may still exist in our queue. // (No real synchronization needed at destructor time: only one // thread can be doing this.) if(!std::is_trivially_destructible<T>::value){ size_treadIndex=readIndex_; size_tendIndex=writeIndex_; while(readIndex!=endIndex){ records_[readIndex].T(); if(++readIndex==size_){ readIndex=0; } } }
<span>// queue is full</span>
<span>return</span> <span>false</span><span>;</span>
}
// move (or copy) the value at the front of the queue to given variable boolread(T&record){ autoconstcurrentRead=readIndex_.load(std::memory_order_relaxed); if(currentRead==writeIndex_.load(std::memory_order_acquire)){ // queue is empty returnfalse; }
// pointer to the value at the front of the queue (for use in-place) or // nullptr if empty. T*frontPtr(){ autoconstcurrentRead=readIndex_.load(std::memory_order_relaxed); if(currentRead==writeIndex_.load(std::memory_order_acquire)){ // queue is empty returnnullptr; } return&records_[currentRead]; }
// queue must not be empty voidpopFront(){ autoconstcurrentRead=readIndex_.load(std::memory_order_relaxed); assert(currentRead!=writeIndex_.load(std::memory_order_acquire));
boolisFull()const{ autonextRecord=writeIndex_.load(std::memory_order_acquire)+1; if(nextRecord==size_){ nextRecord=0; } if(nextRecord!=readIndex_.load(std::memory_order_acquire)){ returnfalse; } // queue is full returntrue; }
// * If called by consumer, then true size may be more (because producer may // be adding items concurrently). // * If called by producer, then true size may be less (because consumer may // be removing items concurrently). // * It is undefined to call this from any other thread. size_tsizeGuess()const{ intret=writeIndex_.load(std::memory_order_acquire)- readIndex_.load(std::memory_order_acquire); if(ret<0){ ret+=size_; } returnret; }
moody camel 在上面的基础上做了一些改进:在支持无大小限制的情况下,将动态内存分配的需求降得很低,同时支持了容量的动态增长。其容器结构是两层的queue of queue,第一层是循环链表,第二层是循环队列。第一层循环链表的控制基本等价于intel的spsc里的代码,而第二层的循环队列的控制基本等价于folly的代码。当enqueue的时候,发现没有空闲内存的时候会调用malloc,不过这种动态内存分配比起intel的每个新node都分配来说简单多了,总的来说还是比较符合wait_free的。这个的代码我就不分析了,直接贴作者的解释吧。
# Enqueue
If room in tail block, add to tail
Else check next block
If next block is not the head block, enqueue on next block
Else create a new block and enqueue there
Advance tail to the block we just enqueued to
Dequeue
Remember where the tail block is
If the front block has an element in it, dequeue it
Else
If front block was the tail block when we entered the function, return false
Else advance to next block and dequeue the item there
node(Tconst&v,handle_typenull_handle): data(v)//, next(tagged_node_handle(0, 0)) { /* increment tag to avoid ABA problem */ tagged_node_handleold_next=next.load(memory_order_relaxed); tagged_node_handlenew_next(null_handle,old_next.get_next_tag()); next.store(new_next,memory_order_release); }
typedeftypenamedetail::queue_signature::bind<A0,A1,A2>::typebound_args;staticconstboolhas_capacity=detail::extract_capacity<bound_args>::has_capacity;staticconstsize_tcapacity=detail::extract_capacity<bound_args>::capacity+1;// the queue uses one dummy nodestaticconstboolfixed_sized=detail::extract_fixed_sized<bound_args>::value;staticconstboolnode_based=!(has_capacity||fixed_sized);staticconstboolcompile_time_sized=has_capacity;typedeftypenamedetail::extract_allocator<bound_args,node>::typenode_allocator;typedeftypenamedetail::select_freelist<node,node_allocator,compile_time_sized,fixed_sized,capacity>::typepool_t;typedeftypenamepool_t::tagged_node_handletagged_node_handle;typedeftypenamedetail::select_tagged_handle<node,node_based>::handle_typehandle_type;
/** Check if the queue is empty** \return true, if the queue is empty, false otherwise* \note The result is only accurate, if no other thread modifies the queue. Therefore it is rarely practical to use this* value in program logic.* */
boolempty(void){returnpool.get_handle(head_.load())==pool.get_handle(tail_.load());}
/** Pushes object t to the queue.** \post object will be pushed to the queue, if internal node can be allocated* \returns true, if the push operation is successful.** \note Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node will be allocated* from the OS. This may not be lock-free.* */
boolpush(Tconst&t){returndo_push<false>(t);}
/** Pushes object t to the queue. \post object will be pushed to the queue, if internal node can be allocated * \returns true, if the push operation is successful. \note Thread-safe and non-blocking. If internal memory pool is exhausted, operation will fail * \throws if memory allocator throws * */
boolbounded_push(Tconst&t) { returndo_push<true>(t); }
/** Pops object from queue. * * \pre type U must be constructible by T and copyable, or T must be convertible to U * \post if pop operation is successful, object will be copied to ret. * \returns true, if the pop operation is successful, false if queue was empty. * * \note Thread-safe and non-blocking * */template<typenameU>boolpop(U&ret){usingdetail::likely;for(;;){tagged_node_handlehead=head_.load(memory_order_acquire);node*head_ptr=pool.get_pointer(head);
<span>tagged_node_handle</span> <span>tail</span> <span>=</span> <span>tail_</span><span>.</span><span>load</span><span>(</span><span>memory_order_acquire</span><span>);</span>
<span>tagged_node_handle</span> <span>next</span> <span>=</span> <span>head_ptr</span><span>-></span><span>next</span><span>.</span><span>load</span><span>(</span><span>memory_order_acquire</span><span>);</span>
<span>node</span> <span>*</span> <span>next_ptr</span> <span>=</span> <span>pool</span><span>.</span><span>get_pointer</span><span>(</span><span>next</span><span>);</span>
<span>tagged_node_handle</span> <span>head2</span> <span>=</span> <span>head_</span><span>.</span><span>load</span><span>(</span><span>memory_order_acquire</span><span>);</span>
<span>if</span> <span>(</span><span>likely</span><span>(</span><span>head</span> <span>==</span> <span>head2</span><span>))</span> <span>{</span>
<span>if</span> <span>(</span><span>pool</span><span>.</span><span>get_handle</span><span>(</span><span>head</span><span>)</span> <span>==</span> <span>pool</span><span>.</span><span>get_handle</span><span>(</span><span>tail</span><span>))</span> <span>{</span>
<span>if</span> <span>(</span><span>next_ptr</span> <span>==</span> <span>0</span><span>)</span>
<span>return</span> <span>false</span><span>;</span>
<span>tagged_node_handle</span> <span>new_tail</span><span>(</span><span>pool</span><span>.</span><span>get_handle</span><span>(</span><span>next</span><span>),</span> <span>tail</span><span>.</span><span>get_next_tag</span><span>());</span>
<span>tail_</span><span>.</span><span>compare_exchange_strong</span><span>(</span><span>tail</span><span>,</span> <span>new_tail</span><span>);</span>
<span>}</span> <span>else</span> <span>{</span>
<span>if</span> <span>(</span><span>next_ptr</span> <span>==</span> <span>0</span><span>)</span>
<span>/* this check is not part of the original algorithm as published by michael and scott</span>
* * however we reuse the tagged_ptr part for the freelist and clear the next part during node * allocation. we can observe a null-pointer here. * */ continue; detail::copy_payload(next_ptr->data,ret);
处理完了多生产者多消费者之间的映射,现在剩下的内容就是如何更高效的处理单生产者多消费者。moodycamel这里的主要改进就是单个queue的存储结构,这里采取的是两层的循环队列,第一层循环队列存储的是第二层循环队列的指针。一个队列只需要维护四个索引,考虑到原子性修改可以把消费者的两个索引合并为一个uint64或者uint32t,因为只有消费者会发生数据竞争,为了方便比较,也顺便把生产者的两个索引合并为一个uint64t or uint32t,这样就可以直接使用整数比较了。在enqueue的时候,数据复制完成之后,直接对生产者的索引自增即可。而dequeue的时候则没这么容易,此时首先自增消费者索引,然后判断当前消费者索引是否已经越过生产者索引,如果越过了,则对则对另外一个overcommit计数器进行自增,三个计数器合用才能获得真正的容量。
Concurrent Queue - spirits away
https://ift.tt/KtvABrp
Herb Sutter在DDJ
pillars of concurrency
一文中抛出并行编程的三个简单论点,一是分离任务,使用更细粒度的锁或者无锁编程;二是尽量通过并行任务使用CPU资源,以提高系统吞吐量及扩展性;三是保证对共享资源访问的一致性。第三点已经被atomic
、mutex
、lock
、condition_variable
解决了,第一点和第二点则可以归结为如何对任务进行粒度划分并投递到任务的执行单元中去调度执行。任务划分依赖于各种不同业务的理解,例如网络和渲染,很难抽取出其共性。而任务的调度执行则是一种通用的结构,可以分为四个部分:c++11
里提供了三种最基本的任务封装形式future, promise,packaged_task
c++17
里补全了任务结构控制,主要是提供了then, when_all, when_any
这三个用来关联多个future
的函数在整个并发任务系统中,在任务容器集合之上的任务调度结构是核心。现在使用的最广泛的任务容器是concurrent queue,下面我们来对concurrent queue来做一下分析。
naive concurrent queue
queue是一个维持先进先出(FIFO)队列的结构,在很多STL的实现之中采取的是多块连续内存的双向链表来维持其先进先出结构。为了在多线程中使用
std::queue
,最简单的方法就是使用锁来解决data race
,同时修改原始提供的接口,使得这个数据结构不会被用错。上述代码的主要考量如下:
empty
之后再pop
的处理流程是错误的,这两个操作必须封装在一起,所以这里提供了try_pop
和wait_and_pop
这两个接口来获取数据。shared_ptr
。这样就保证了如果在拷贝构造front
的时候出了trace也能维持整个queue
的结构完整。这个
concurrent_queue
并不是很高效,主要的drawback
包括如下三个方面:yield
,从而导致线程切换对应的常见解决方案:
mutex
,同时由于无锁最大的问题是内存分配,有些并发队列通过预先设置最大大小的方式来预分配内存,从而绕过了著名的ABA
问题queue
,这样我们就可以分离头节点和尾节点的访问;如果是固定大小的队列则可以采取ring buffer
的形式来维持队列结构。yield
,也就是对condition_variable
封装了一层。事实上,在这是一个并发
queue
的时候,首先要明确如下几个问题:这个
queue
的生产者和消费者各有多少个,常见的有单生产者单消费者(SPSC)、单生产者多消费者(SPMC)、多生产者单消费者(MPSC)和多生产者多消费者(MPMC)这个queue的最大元素大小是否确定,如果可以确定最大大小,则动态内存分配就可以避免,直接采取环形队列当作容器即可;如果无法确定最大大小,则只能通过动态内存分配的形式去处理,这里的难度加大了很多,因为要处理多线程的内存分配。
下面我们来看一下现在主流的几种
concurrent_queue
的实现,来分析一下他们对concurrent_queue
的实现优化。intel spsc concurrent queue
intel官方网站上提供了一个
SPSC queue
,但是这个queue
没有限制最大元素大小,如果临时内存不够的话会调用new
,可能会触发锁。这个代码的实现很简单粗暴,核心是一个单链表,对于单链表的任何操作都是
wait_free
的,这个链表有四个指针:tail
指针,指向下一个应该dequeue
的位置head
指针,指向最新的一个enqueue
的位置first_
指针,指向第一个可以回收node
的位置tail_copy
指针,指向一个安全的可以回收的node
的next
位置,他不一定指向tail
。在这个链表里,指针之间有如下关系:\(first \le tail\_copy \le tail \le head\) 。这里做的核心优化就是按需去更新
tail_copy
,没必要每次更新tail
的时候都把tail_copy
更新一遍,只有发现first == tail_copy
的时候才去更新一下。每个操作都没有使用到CAS
,因此都是wait_free
的,当然那一行调用了new
的除外。这里为了避免
False Sharing
使用了padding
。由于读线程只需要更改tail
,所以只需要在tail
之后加个padding
即可。facebook spsc concurrent queue
facebook 提供了固定大小的
SPSC queue
,代码在folly
的ProducerConsumerQueue
里。这里就是使用环形队列来作为容器,双指针来作为头尾,读线程读取
readIndex
直接采用relaxed
,写线程读取writeIndex
的时候也是采取relaxed
,因为这两个变量只会在对应的线程内修改,可以认为是对应线程的私有变量,如果要读取另外一个线程的变量则需要采取acquire
,当然前提是修改的时候使用了release
。为了避免False Sharing
这里也使用了padding
,只不过是用宏做的。其实这里也可以做一点优化,就像前面
intel
的延迟处理tail_copy
一样,首次读取另外一个线程变量的时候先用relaxed
,如果发现不能操作了,则再使用acquire
。总的来说,这个无锁
spsc queue
也是wait_free
的。moodycamel spsc concurrent queue
moody camel 在上面的基础上做了一些改进:在支持无大小限制的情况下,将动态内存分配的需求降得很低,同时支持了容量的动态增长。其容器结构是两层的
queue of queue
,第一层是循环链表,第二层是循环队列。第一层循环链表的控制基本等价于intel
的spsc
里的代码,而第二层的循环队列的控制基本等价于folly
的代码。当enqueue
的时候,发现没有空闲内存的时候会调用malloc
,不过这种动态内存分配比起intel
的每个新node
都分配来说简单多了,总的来说还是比较符合wait_free
的。这个的代码我就不分析了,直接贴作者的解释吧。naive spmc concurrent queue
在这前面介绍的
spsc
并发队列的基础上,我们可以比较容易的构建出一个spmc
的并发队列,而构造一个mpsc
的并发队列则难很多。其原因主要是在enqueue
的时候,可能会涉及到动态内存分配,如果有好几个线程都抢着进行动态内存分配的话,就会出现malloc
的锁征用。而多个线程抢占dequeue
的时候,只需要采取CAS
来保持tail
的更新即可,虽说这个不是waitfree
的,但是lockfree
还是可以基本保证的。boost mpmc concurrent queue
boost concurrent queue
通过模板参数的方式来支持固定大小的队列和不定大小的队列。如果是固定大小队列,则会使用一个带
dummy head
的ring buffer
来存储内容,同时使用一个头节点索引和一个尾节点索引来标记队列的头尾位置。为了一次性的修改头尾节点索引,这里将队列大小的上限设置为了\(2^{16} - 2\) ,这样两个索引就可以合并为一个int32
来处理,修改的时候可以使用compare_exchange_
来同时修改。如果在支持int64
类型的compare_exchange_
操作的平台,队列大小的上限可以放到\(2^{32} -2\) ,同时两个索引会被压缩为一个int64
来做同时修改。如果是不定大小的队列,则会使用链表的形式来维持队列结构, 代码见下。
这里比较有意思的就是第九行的注释:对指针的
tag
位置进行自增来避免ABA
问题。这里的next
指针是一个tagged_pointer
,其分配位置是内存对齐的,对齐的大小由BOOST_LOCKFREE_CACHELINE_BYTES
定义,在WIN
平台下,这个宏定义如下:当这个指针是64字节对齐时,最底的6位是没有意义的,所以这6位我们可以用来存储额外的数据,这种指针就叫做
tagged_pointer
,在llvm
里这个指针结构也很常见。在
boost lockfree queue
里,数据成员定义如下:因为
atomic<T*>
内部只包含一个T*
作为成员变量,所以atomic<T*>
与T*
的内存布局是一样的,所以这里的padding_size
才会这样计算出来。这里的padding
的意义在于让poll
的开始地址是BOOST_LOCKFREE_CACHELINE_BYTES
对齐的,同时这里分为了两个padding
而不是一个padding
主要是考虑到将tail head
分离在两个cache_line
上,避免不同线程之间的缓存竞争。现在我们来看这个
lockfree queue
提供的接口。首先查看
empty
。注释里写的很清楚了,这个函数的返回值是不准确的,因为在没有锁的情况下无法同时获得
head tail
的准确值。现在来看
push
,这里分为了两个接口push bounded_push
,区分在于如果内存池已经用完,第一个push
在当前队列是大小固定的情况下会返回false
,不固定的情况下会向操作系统尝试申请更多的内存并返回;而第二个bounded_push
则直接返回false
。这两个函数都调用了
do_push
,这个函数的定义如下:这里比较难理解的一点就是
tail tail2
,以及最后30行的compare_exchange_strong
。这里在19行使用判断的意义是避免在内部做无用功,虽然不使用19行的判断,tail
改变之后,20行所在分支的检测都会fail
掉,对正确性没影响,对性能上来说提升很大。在一个完整的成功push
流程中有两个cas
操作,我们需要担心的是在两个cas
操作之间线程被换出之后会出现何种结果,也就是在24行之前被换出。此时老的tail
的next
已经被修正为了新数据,而新tail
却没有更新。在下一个线程进来的时候会发现tail->next != 0
, 因此会进28号的分支,在此分支之内会尝试将tail->next
更新为tail
,这样就避免了数据更新到一半的尴尬局面。对于
pop
则只有一个函数:这里就简单多了,成功的
pop
只需要一个compare_exchange_weak
即可,所以就不需要担心数据更改到一半的问题,这里的28行处理的还是tail
数据更新到一半的问题。这里比较有意思的一点就是42行的
.template
,这个叫做template disambiguator, 其作用就是通知编译器destruct<true>
是一个模板,而不是destruct < true
。总的来说,
boost lockfree queue
的注意点完全在lock free
上,并没有采取每个生产者单独一个queue
的方式来解决争用,虽然我们可以在lockfree queue
的基础上做一个这样的东西。intel tbb concurrent queue
其实这个的总体实现与boost类似。占坑,以后填。粗看起来,这个东西的实现很具有
STL
的风格。moodycamel concurrent queue
这个
concurrent queue
的实现被很多项目使用过, 值得重点分析。这个实现的突出之处在于,每个生产者都维持自己的专属queue
,而不同的消费者会以不同的顺序去访问各个生产者的queue
,直到遇到一个不为空的queue
。简而言之,他所实现的MPMC(multiple producer multiple consumer)
的队列建立在了SPMC
的多线程队列的基础上。这个SPMC
的实现是lockfree
的,同时还增加了bulk
操作。下面来慢慢介绍这个的设计。首先就是在构建消费者的时候,尽可能的让消费者与生产者均衡绑定,内部实现是通过使用一个
token
来维持消费者与生产者之间的亲和性。其实最简单的亲和性分配的方法就是每个消费者分配一个生产者的编号,dequeue
的时候采取轮询的方式,每次开始轮询的时候都以上次dequeue
成功的生产者queue
开始。处理完了多生产者多消费者之间的映射,现在剩下的内容就是如何更高效的处理单生产者多消费者。moodycamel这里的主要改进就是单个
queue
的存储结构,这里采取的是两层的循环队列,第一层循环队列存储的是第二层循环队列的指针。一个队列只需要维护四个索引,考虑到原子性修改可以把消费者的两个索引合并为一个uint64
或者uint32t
,因为只有消费者会发生数据竞争,为了方便比较,也顺便把生产者的两个索引合并为一个uint64t or uint32t
,这样就可以直接使用整数比较了。在enqueue
的时候,数据复制完成之后,直接对生产者的索引自增即可。而dequeue
的时候则没这么容易,此时首先自增消费者索引,然后判断当前消费者索引是否已经越过生产者索引,如果越过了,则对则对另外一个overcommit
计数器进行自增,三个计数器合用才能获得真正的容量。这里使用环形缓冲来扩容而不是采取列表来扩容,主要是因为连续的空间操作可以支持批量的
enqueue
和dequeue
操作,直接预先占据一些索引就行了。via spiritsaway.info https://ift.tt/xtvPQyO
November 13, 2024 at 09:40AM
The text was updated successfully, but these errors were encountered: