You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
voidPublishMessage() { // Write to shared memory non-atomically. strcpy(message,"I pity the fool!");
<span>// Release fence: The only way to safely pass non-atomic data between threads using Mintomic.</span>
<span>mint_thread_fence_release</span><span>();</span>
<span>// Set a flag to indicate to other threads that the message is ready.</span>
<span>collection</span><span>.</span><span>SetItem</span><span>(</span><span>SHARED_FLAG_KEY</span><span>,</span> <span>1</span><span>)</span>
preshing对SetItem有一个优化:减少不必要的CAS操作。在原来的实现中会遍历所有的元素去执行CAS操作,其实只有key == 0 or key == my_key的时候我们才需要去做CAS。所以这里的优化就是预先作一次load,发现可以去set的时候才去CAS。
voidArrayOfItems::SetItem(uint32_tkey,uint32_tvalue){for(uint32_tidx=0;;idx++){// Load the key that was there.uint32_tprobedKey=mint_load_32_relaxed(&m_entries[idx].key);if(probedKey!=key){// The entry was either free, or contains another key.if(probedKey!=0)continue;// Usually, it contains another key. Keep probing.
<span>// The entry was free. Now let's try to take it using a CAS.</span>
<span>uint32_t</span> <span>prevKey</span> <span>=</span> <span>mint_compare_exchange_strong_32_relaxed</span><span>(</span><span>&</span><span>m_entries</span><span>[</span><span>idx</span><span>].</span><span>key</span><span>,</span> <span>0</span><span>,</span> <span>key</span><span>);</span>
<span>if</span> <span>((</span><span>prevKey</span> <span>!=</span> <span>0</span><span>)</span> <span>&&</span> <span>(</span><span>prevKey</span> <span>!=</span> <span>key</span><span>))</span>
<span>continue</span><span>;</span> <span>// Another thread just stole it from underneath us.</span>
<span>// Either we just added the key, or another thread did.</span>
<span>}</span>
<span>// Store the value in this array entry.</span>
<span>mint_store_32_relaxed</span><span>(</span><span>&</span><span>m_entries</span><span>[</span><span>idx</span><span>].</span><span>value</span><span>,</span> <span>value</span><span>);</span>
<span>return</span><span>;</span>
<span>}</span>
}
naive lockfree hashtable
在上面的lockfree linear scan的基础上,做一个lockfree hashtable还是比较简单的。这里定义了三个函数intergerHash, SetItem, GetItem:
inline static uint32_t integerHash(uint32_t h)
{
h ^= h >> 16;
h *= 0x85ebca6b;
h ^= h >> 13;
h *= 0xc2b2ae35;
h ^= h >> 16;
return h;
}
<span>// Load the key that was there.</span>
<span>uint32_t</span> <span>probedKey</span> <span>=</span> <span>mint_load_32_relaxed</span><span>(</span><span>&</span><span>m_entries</span><span>[</span><span>idx</span><span>].</span><span>key</span><span>);</span>
<span>if</span> <span>(</span><span>probedKey</span> <span>!=</span> <span>key</span><span>)</span>
<span>{</span>
<span>// The entry was either free, or contains another key.</span>
<span>if</span> <span>(</span><span>probedKey</span> <span>!=</span> <span>0</span><span>)</span>
<span>continue</span><span>;</span> <span>// Usually, it contains another key. Keep probing.</span>
<span>// The entry was free. Now let's try to take it using a CAS.</span>
<span>uint32_t</span> <span>prevKey</span> <span>=</span> <span>mint_compare_exchange_strong_32_relaxed</span><span>(</span><span>&</span><span>m_entries</span><span>[</span><span>idx</span><span>].</span><span>key</span><span>,</span> <span>0</span><span>,</span> <span>key</span><span>);</span>
<span>if</span> <span>((</span><span>prevKey</span> <span>!=</span> <span>0</span><span>)</span> <span>&&</span> <span>(</span><span>prevKey</span> <span>!=</span> <span>key</span><span>))</span>
<span>continue</span><span>;</span> <span>// Another thread just stole it from underneath us.</span>
<span>// Either we just added the key, or another thread did.</span>
<span>}</span>
<span>// Store the value in this array entry.</span>
<span>mint_store_32_relaxed</span><span>(</span><span>&</span><span>m_entries</span><span>[</span><span>idx</span><span>].</span><span>value</span><span>,</span> <span>value</span><span>);</span>
<span>return</span><span>;</span>
<span>}</span>
voidPublishMessage() { // Write to shared memory non-atomically. strcpy(message,"I pity the fool!");
<span>// Release fence: The only way to safely pass non-atomic data between threads using Mintomic.</span>
<span>mint_thread_fence_release</span><span>();</span>
<span>// Set a flag to indicate to other threads that the message is ready.</span>
<span>collection</span><span>.</span><span>SetItem</span><span>(</span><span>SHARED_FLAG_KEY</span><span>,</span> <span>1</span><span>)</span>
<span>auto</span> <span>mainHash</span> <span>=</span> <span>implicitProducerHash</span><span>.</span><span>load</span><span>(</span><span>std</span><span>::</span><span>memory_order_acquire</span><span>);</span>
<span>for</span> <span>(</span><span>auto</span> <span>hash</span> <span>=</span> <span>mainHash</span><span>;</span> <span>hash</span> <span>!=</span> <span>nullptr</span><span>;</span> <span>hash</span> <span>=</span> <span>hash</span><span>-></span><span>prev</span><span>)</span> <span>{</span>
<span>// Look for the id in this hash</span>
<span>auto</span> <span>index</span> <span>=</span> <span>hashedId</span><span>;</span>
<span>while</span> <span>(</span><span>true</span><span>)</span> <span>{</span> <span>// Not an infinite loop because at least one slot is free in the hash table</span>
<span>index</span> <span>&=</span> <span>hash</span><span>-></span><span>capacity</span> <span>-</span> <span>1</span><span>;</span>
<span>auto</span> <span>probedKey</span> <span>=</span> <span>hash</span><span>-></span><span>entries</span><span>[</span><span>index</span><span>].</span><span>key</span><span>.</span><span>load</span><span>(</span><span>std</span><span>::</span><span>memory_order_relaxed</span><span>);</span>
<span>if</span> <span>(</span><span>probedKey</span> <span>==</span> <span>id</span><span>)</span> <span>{</span>
<span>// Found it! If we had to search several hashes deep, though, we should lazily add it</span>
<span>// to the current main hash table to avoid the extended search next time.</span>
<span>// Note there's guaranteed to be room in the current hash table since every subsequent</span>
<span>// table implicitly reserves space for all previous tables (there's only one</span>
<span>// implicitProducerHashCount).</span>
<span>auto</span> <span>value</span> <span>=</span> <span>hash</span><span>-></span><span>entries</span><span>[</span><span>index</span><span>].</span><span>value</span><span>;</span>
<span>if</span> <span>(</span><span>hash</span> <span>!=</span> <span>mainHash</span><span>)</span> <span>{</span>
<span>index</span> <span>=</span> <span>hashedId</span><span>;</span>
<span>while</span> <span>(</span><span>true</span><span>)</span> <span>{</span>
<span>index</span> <span>&=</span> <span>mainHash</span><span>-></span><span>capacity</span> <span>-</span> <span>1</span><span>;</span>
<span>probedKey</span> <span>=</span> <span>mainHash</span><span>-></span><span>entries</span><span>[</span><span>index</span><span>].</span><span>key</span><span>.</span><span>load</span><span>(</span><span>std</span><span>::</span><span>memory_order_relaxed</span><span>);</span>
<span>auto</span> <span>empty</span> <span>=</span> <span>details</span><span>::</span><span>invalid_thread_id</span><span>;</span>
// Insert!autonewCount=1+implicitProducerHashCount.fetch_add(1,std::memory_order_relaxed);while(true){if(newCount>=(mainHash->capacity>>1)&&!implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire)){// We've acquired the resize lock, try to allocate a bigger hash table.// Note the acquire fence synchronizes with the release fence at the end of this block, and hence when// we reload implicitProducerHash it must be the most recent version (it only gets changed within this// locked block).mainHash=implicitProducerHash.load(std::memory_order_acquire);if(newCount>=(mainHash->capacity>>1)){autonewCapacity=mainHash->capacity<<1;while(newCount>=(newCapacity>>1)){newCapacity<<=1;}autoraw=static_cast<char*>((Traits::malloc)(sizeof(ImplicitProducerHash)+std::alignment_of<ImplicitProducerKVP>::value-1+sizeof(ImplicitProducerKVP)*newCapacity));if(raw==nullptr){// Allocation failedimplicitProducerHashCount.fetch_add(-1,std::memory_order_relaxed);implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);returnnullptr;}
<span>auto</span> <span>newHash</span> <span>=</span> <span>new</span> <span>(</span><span>raw</span><span>)</span> <span>ImplicitProducerHash</span><span>;</span>
<span>newHash</span><span>-></span><span>capacity</span> <span>=</span> <span>newCapacity</span><span>;</span>
<span>newHash</span><span>-></span><span>entries</span> <span>=</span> <span>reinterpret_cast</span><span><</span><span>ImplicitProducerKVP</span><span>*></span><span>(</span><span>details</span><span>::</span><span>align_for</span><span><</span><span>ImplicitProducerKVP</span><span>></span><span>(</span><span>raw</span> <span>+</span> <span>sizeof</span><span>(</span><span>ImplicitProducerHash</span><span>)));</span>
<span>for</span> <span>(</span><span>size_t</span> <span>i</span> <span>=</span> <span>0</span><span>;</span> <span>i</span> <span>!=</span> <span>newCapacity</span><span>;</span> <span>++</span><span>i</span><span>)</span>
<span>{</span>
<span>new</span> <span>(</span><span>newHash</span><span>-></span><span>entries</span> <span>+</span> <span>i</span><span>)</span> <span>ImplicitProducerKVP</span><span>;</span>
<span>newHash</span><span>-></span><span>entries</span><span>[</span><span>i</span><span>].</span><span>key</span><span>.</span><span>store</span><span>(</span><span>details</span><span>::</span><span>invalid_thread_id</span><span>,</span> <span>std</span><span>::</span><span>memory_order_relaxed</span><span>);</span>
<span>}</span>
<span>newHash</span><span>-></span><span>prev</span> <span>=</span> <span>mainHash</span><span>;</span>
<span>implicitProducerHash</span><span>.</span><span>store</span><span>(</span><span>newHash</span><span>,</span> <span>std</span><span>::</span><span>memory_order_release</span><span>);</span>
<span>implicitProducerHashResizeInProgress</span><span>.</span><span>clear</span><span>(</span><span>std</span><span>::</span><span>memory_order_release</span><span>);</span>
<span>mainHash</span> <span>=</span> <span>newHash</span><span>;</span>
<span>}</span>
<span>else</span>
<span>{</span>
<span>implicitProducerHashResizeInProgress</span><span>.</span><span>clear</span><span>(</span><span>std</span><span>::</span><span>memory_order_release</span><span>);</span>
<span>}</span>
<span>}</span>
<span>// If it's < three-quarters full, add to the old one anyway so that we don't have to wait for the next table</span>
<span>// to finish being allocated by another thread (and if we just finished allocating above, the condition will</span>
<span>// always be true)</span>
<span>if</span> <span>(</span><span>newCount</span> <span><</span> <span>(</span><span>mainHash</span><span>-></span><span>capacity</span> <span>>></span> <span>1</span><span>)</span> <span>+</span> <span>(</span><span>mainHash</span><span>-></span><span>capacity</span> <span>>></span> <span>2</span><span>))</span>
<span>{</span>
<span>bool</span> <span>recycled</span><span>;</span>
<span>auto</span> <span>producer</span> <span>=</span> <span>static_cast</span><span><</span><span>ImplicitProducer</span><span>*></span><span>(</span><span>recycle_or_create_producer</span><span>(</span><span>false</span><span>,</span> <span>recycled</span><span>));</span>
<span>if</span> <span>(</span><span>producer</span> <span>==</span> <span>nullptr</span><span>)</span>
<span>{</span>
<span>implicitProducerHashCount</span><span>.</span><span>fetch_add</span><span>(</span><span>-</span><span>1</span><span>,</span> <span>std</span><span>::</span><span>memory_order_relaxed</span><span>);</span>
<span>return</span> <span>nullptr</span><span>;</span>
<span>}</span>
<span>if</span> <span>(</span><span>recycled</span><span>)</span>
<span>{</span>
<span>implicitProducerHashCount</span><span>.</span><span>fetch_add</span><span>(</span><span>-</span><span>1</span><span>,</span> <span>std</span><span>::</span><span>memory_order_relaxed</span><span>);</span>
<span>}</span>
<span>// Hmm, the old hash is quite full and somebody else is busy allocating a new one.</span>
<span>// We need to wait for the allocating thread to finish (if it succeeds, we add, if not,</span>
<span>// we try to allocate ourselves).</span>
<span>mainHash</span> <span>=</span> <span>implicitProducerHash</span><span>.</span><span>load</span><span>(</span><span>std</span><span>::</span><span>memory_order_acquire</span><span>);</span>
<span>}</span>
Lockfree Hashtable - spirits away
https://ift.tt/QMWdvgR
对于让hashtable变成无锁,主要要解决如下几个问题:
lockfree linear search
在preshing的一篇文章里面谈到了无锁线性扫描的实现,这里定义了一个基本的
Entry
:在这个
Entry
里,我们规定如果key
的值为0,则代表这个entry
还没有被使用,所以插入的时候禁止传入为0的key
。在此结构之下,定义的
setItem
操作如下:类似的
getItem
的操作如下:现在的疑问在于,这里的原子操作使用的都是
relaxed
语义,这个语义在x86
上基本等于没有任何作用,如何在使得SetItem
里的第8行能够被GetItem
的第7行可见。事实上这压根做不到,因为一个线程在执行到SetItem
的第8行之前被换出, 然后另外一个线程执行到了GetItem
的第7行,这里读取的还是老的值。除了这种情况之外,还可能出现SetItem
里的CAS
操作并没有将数据更新的通知发放到其他的core
上去,然而第8行的store
操作已经被另外一个执行GetItem
的线程可见的情况,此时GetItem
会返回0。这两种情况都是合法的,因为在多线程中读取数据的时机是不确定的,因此读取老数据也是正常的。甚至可以说在没有通知机制的情况下,是不是最新根本没有意义。如果要实现publish-listen
的机制,则需要在SetItem
的时候将一个原子的bool
变量设置为True
,同时这个Store
操作要使用Release
语义,同时另外一个线程在CAS
这个值的时候,要使用Acquire
语义。这样才能使得数据更改完并通知完之后,另外一方能够得到最新数据。因此,当前设计的无锁
hashtable
在多线程上唯一做的事情就是防止了多个线程对同一个entry
同时做SetItem
操作。preshing对
SetItem
有一个优化:减少不必要的CAS
操作。在原来的实现中会遍历所有的元素去执行CAS
操作,其实只有key == 0 or key == my_key
的时候我们才需要去做CAS
。所以这里的优化就是预先作一次load
,发现可以去set
的时候才去CAS
。naive lockfree hashtable
在上面的
lockfree linear scan
的基础上,做一个lockfree hashtable
还是比较简单的。这里定义了三个函数intergerHash, SetItem, GetItem
:这个
hash
函数的来源是MurmurHash3’s integer finalizer , 据说这样可以让每一位都起到差不多的作用。这里的
SetItem
正确工作有一个前提:整个hashtable
不是满的,是满的一定会出错。GetItem
还是老样子:所谓的
publish
函数也是一样:至于
delete
操作,我们可以规定value
是某个值的时候代表当前entry
是被删除的,这样就可以用SetItem(key, 0)
来模拟delete
操作了。what about full
上面的无锁
hashtable
有一个致命缺陷,他没有处理整个hashtable
满了的情况。为了处理满的情况,我们需要设置最大探查数量为当前hashtable
的容量, 同时维护多个独立的hashtable
,用一个无锁的链表将所有的hashtable
的指针串联起来。如果最大探查数量达到上限,且当前hashtable
没有下一个hashtable
的指针,且则先建立一个新的hashtable
,并挂载到无锁链表上,回到了有下一个hashtable
的情况,然后对下一个hashtable
做递归遍历。这样做的确解决了扩容的问题,但是会出现性能下降的问题。后面过来的
key
在查询的时候会变得越来越慢,因为经常需要查询多层的hashtable
。为了避免这个问题,出现了一种新的设计:每次添加一层hashtable
的时候,都将容量扩大一倍,然后将上一个hashtable
的内容拷贝到新的hashtable
里。这个新的hashtable
也叫做main_hashtable
,由于我们无法在无锁的情况下把整个hashtable
拷贝过去,所以采用lazy
的方式,这个方式的步骤如下:hashtable
的无锁链表,链表的头节点就叫做main_hashtable
,所有的hashtable
通过一个next
指针相连;main_hashtable
的装载因子(这个装载因子考虑了所有的key
)已经大于0.5,则新建一个hashtable
,然后插入到新的hashtable
里;hashtable
是空的,大小为当前main_hashtable
的两倍,每次新加入一个hashtable
的时候都插入到头部,使之成为新的main_hashtable
;next
指针一直查询,直到最后一个hashtable
;hashtable
并不是main_hashtable
,则把当前的key value
对插入到main_hashtable
里,这就是核心的lazy copy
的过程这个
lazy
的过程代码如下:现在的核心则转移到了扩容的过程,扩容涉及到了动态内存分配和初始内容的填充,是比较耗的操作,所以要避免多个线程在争抢扩容的控制权。在moodycamel的设计里,是这样处理扩容的。
上面的第五行就是抢夺控制权的过程,进入扩容的条件就是当前装载因子已经大于0.5,且扩容标志位没有设置。
扩容的时候设置一个标志位,相当于一个锁,扩容完成之后清空标志位。但是由于线程换出的存在,这个标志位可能导致其他线程永远抢不到控制权,进入无限死循环。所以这里又对没有抢夺到扩容控制权的线程,还有另外的一个判断,如果装载因子小于0.75,则直接尝试插入,不用管。
这个分支里还做了一些事情,就是当真正的获得了一个
implicit producer
之后,注册一个线程退出的callback
,这个callback
会把当前producer
销毁,并在hashtable
里删除对应的key
。最后剩下的一种情况就是:拿不到扩容所有权,且当前装载因子已经上了0.75,此时除了死循环没有办法,约等于死锁。这种情况很罕见,但是仍然可以构造出来:正在扩容的线程被换出。不知原作者如何处理这个情况。
via spiritsaway.info https://ift.tt/xtvPQyO
November 13, 2024 at 09:40AM
The text was updated successfully, but these errors were encountered: