From 4b4d9712c834e46accd3bd05c8e1081c43aeec72 Mon Sep 17 00:00:00 2001 From: Ben Manes Date: Sun, 30 Oct 2016 16:40:03 -0700 Subject: [PATCH] Update shaded JCTools queue due to live-lock (fixes #127) In #127, a live-lock occurs in the write buffer due to a race when the consumer tries to transition to the last buffer. This is a shaded copy of JCTools' MpscGrowableLinkedArrayQueue and fixed in 2.0 (see https://github.com/JCTools/JCTools/issues/135). This is a difficult to reproduce bug, except for the reporter due to having thousands of instances on a loaded system. The shaded copy is more verbose than the previous one, but that is to minimize the divergency and introducing bugs by cutting too deep. Updating to Guava 20 required minor changes due to that cache now supporting write-through entries. The adapter now mirrors the new behavior. Thanks to everyones patience and help in fixing the bug. --- caffeine/build.gradle | 1 - .../caffeine/cache/Specifications.java | 3 +- .../benmanes/caffeine/base/UnsafeAccess.java | 1 + .../caffeine/cache/BoundedLocalCache.java | 3 +- .../cache/MpscGrowableArrayQueue.java | 658 ++++++++++++++++++ .../benmanes/caffeine/cache/WriteBuffer.java | 357 ---------- .../benmanes/caffeine/cache/EvictionTest.java | 2 +- ...t.java => MpscGrowableArrayQueueTest.java} | 28 +- config/findbugs/exclude.xml | 2 +- gradle/dependencies.gradle | 27 +- gradle/wrapper/gradle-wrapper.properties | 2 +- guava/build.gradle | 1 + .../caffeine/guava/CaffeinatedGuavaCache.java | 23 - .../cache/MpscGrowableQueueSanityTest.java | 43 ++ .../caffeine/cache/QueueSanityTest.java | 317 +++++++++ .../common/cache/PopulatedCachesTest.java | 22 +- 16 files changed, 1067 insertions(+), 423 deletions(-) create mode 100644 caffeine/src/main/java/com/github/benmanes/caffeine/cache/MpscGrowableArrayQueue.java delete mode 100644 caffeine/src/main/java/com/github/benmanes/caffeine/cache/WriteBuffer.java rename caffeine/src/test/java/com/github/benmanes/caffeine/cache/{WriteBufferTest.java => MpscGrowableArrayQueueTest.java} (82%) create mode 100644 guava/src/test/java/com/github/benmanes/caffeine/cache/MpscGrowableQueueSanityTest.java create mode 100644 guava/src/test/java/com/github/benmanes/caffeine/cache/QueueSanityTest.java diff --git a/caffeine/build.gradle b/caffeine/build.gradle index 876b8523ae..5ca45be13e 100644 --- a/caffeine/build.gradle +++ b/caffeine/build.gradle @@ -41,7 +41,6 @@ dependencies { jmh benchmark_libraries.infinispan jmh benchmark_libraries.jackrabbit jmh benchmark_libraries.elastic_search - jmh benchmark_libraries.jctools_experimental jmh benchmark_libraries.concurrentlinkedhashmap javaPoetCompile libraries.guava diff --git a/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/Specifications.java b/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/Specifications.java index 8b57026c8d..9b9d043aab 100644 --- a/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/Specifications.java +++ b/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/Specifications.java @@ -99,7 +99,8 @@ public final class Specifications { public static final TypeName WRITE_ORDER_DEQUE = ParameterizedTypeName.get( ClassName.get(PACKAGE_NAME, "WriteOrderDeque"), NODE); - public static final ClassName WRITE_QUEUE_TYPE = ClassName.get(PACKAGE_NAME, "WriteBuffer"); + public static final ClassName WRITE_QUEUE_TYPE = + ClassName.get(PACKAGE_NAME, "MpscGrowableArrayQueue"); public static final TypeName WRITE_QUEUE = ParameterizedTypeName.get( WRITE_QUEUE_TYPE, ClassName.get(Runnable.class)); diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/base/UnsafeAccess.java b/caffeine/src/main/java/com/github/benmanes/caffeine/base/UnsafeAccess.java index c2ab6e3e19..8a748e0717 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/base/UnsafeAccess.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/base/UnsafeAccess.java @@ -29,6 +29,7 @@ * * @author ben.manes@gmail.com (Ben Manes) */ +@SuppressWarnings("restriction") public final class UnsafeAccess { static final String ANDROID = "THE_ONE"; static final String OPEN_JDK = "theUnsafe"; diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index 5514142a2b..015a19edd6 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -235,7 +235,7 @@ protected boolean buffersWrites() { return false; } - protected WriteBuffer writeBuffer() { + protected MpscGrowableArrayQueue writeBuffer() { throw new UnsupportedOperationException(); } @@ -634,6 +634,7 @@ void evictFromMain(int candidates) { * @param victimKey the key for the entry chosen by the eviction policy for replacement * @return if the candidate should be admitted and the victim ejected */ + @GuardedBy("evictionLock") boolean admit(K candidateKey, K victimKey) { int victimFreq = frequencySketch().frequency(victimKey); int candidateFreq = frequencySketch().frequency(candidateKey); diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/MpscGrowableArrayQueue.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/MpscGrowableArrayQueue.java new file mode 100644 index 0000000000..d248e6a187 --- /dev/null +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/MpscGrowableArrayQueue.java @@ -0,0 +1,658 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache; + +import static com.github.benmanes.caffeine.base.UnsafeAccess.UNSAFE; +import static com.github.benmanes.caffeine.cache.BoundedLocalCache.ceilingPowerOfTwo; +import static com.github.benmanes.caffeine.cache.UnsafeRefArrayAccess.REF_ARRAY_BASE; +import static com.github.benmanes.caffeine.cache.UnsafeRefArrayAccess.REF_ELEMENT_SHIFT; +import static com.github.benmanes.caffeine.cache.UnsafeRefArrayAccess.lvElement; +import static com.github.benmanes.caffeine.cache.UnsafeRefArrayAccess.soElement; + +import java.lang.reflect.Field; +import java.util.AbstractQueue; +import java.util.Iterator; + +/** + * An MPSC array queue which starts at initialCapacity and grows to maxCapacity in + * linked chunks of the initial size. The queue grows only when the current buffer is full and + * elements are not copied on resize, instead a link to the new buffer is stored in the old buffer + * for the consumer to follow.
+ *

+ * This is a shaded copy of MpscGrowableArrayQueue provided by + * JCTools from version 2.0. + * + * @author nitsanw@yahoo.com (Nitsan Wakart) + */ +final class MpscGrowableArrayQueue extends MpscChunkedArrayQueue { + + /** + * @param initialCapacity the queue initial capacity. If chunk size is fixed this will be the + * chunk size. Must be 2 or more. + * @param maxCapacity the maximum capacity will be rounded up to the closest power of 2 and will + * be the upper limit of number of elements in this queue. Must be 4 or more and round up + * to a larger power of 2 than initialCapacity. + */ + public MpscGrowableArrayQueue(int initialCapacity, int maxCapacity) { + super(initialCapacity, maxCapacity); + } + + @Override + protected int getNextBufferSize(E[] buffer) { + long maxSize = maxQueueCapacity / 2; + if (buffer.length > maxSize) { + throw new IllegalStateException(); + } + final int newSize = 2 * (buffer.length - 1); + return newSize + 1; + } + + @Override + protected long getCurrentBufferCapacity(long mask) { + return (mask + 2 == maxQueueCapacity) ? maxQueueCapacity : mask; + } +} + +abstract class MpscChunkedArrayQueue extends MpscChunkedArrayQueueColdProducerFields { + long p0, p1, p2, p3, p4, p5, p6, p7; + long p10, p11, p12, p13, p14, p15, p16, p17; + + public MpscChunkedArrayQueue(int initialCapacity, int maxCapacity) { + super(initialCapacity, maxCapacity); + } + + @Override + protected long availableInQueue(long pIndex, long cIndex) { + return maxQueueCapacity - (pIndex - cIndex); + } + + @Override + public int capacity() { + return (int) (maxQueueCapacity / 2); + } + + @Override + protected int getNextBufferSize(E[] buffer) { + return buffer.length; + } + + @Override + protected long getCurrentBufferCapacity(long mask) { + return mask; + } +} + + +abstract class MpscChunkedArrayQueueColdProducerFields extends BaseMpscLinkedArrayQueue { + protected final long maxQueueCapacity; + + public MpscChunkedArrayQueueColdProducerFields(int initialCapacity, int maxCapacity) { + super(initialCapacity); + if (maxCapacity < 4) { + throw new IllegalArgumentException("Max capacity must be 4 or more"); + } + if (ceilingPowerOfTwo(initialCapacity) >= ceilingPowerOfTwo(maxCapacity)) { + throw new IllegalArgumentException( + "Initial capacity cannot exceed maximum capacity(both rounded up to a power of 2)"); + } + maxQueueCapacity = ((long) ceilingPowerOfTwo(maxCapacity)) << 1; + } +} + +abstract class BaseMpscLinkedArrayQueuePad1 extends AbstractQueue { + long p01, p02, p03, p04, p05, p06, p07; + long p10, p11, p12, p13, p14, p15, p16, p17; +} + + +abstract class BaseMpscLinkedArrayQueueProducerFields extends BaseMpscLinkedArrayQueuePad1 { + protected long producerIndex; +} + +abstract class BaseMpscLinkedArrayQueuePad2 extends BaseMpscLinkedArrayQueueProducerFields { + long p01, p02, p03, p04, p05, p06, p07; + long p10, p11, p12, p13, p14, p15, p16, p17; +} + +abstract class BaseMpscLinkedArrayQueueConsumerFields extends BaseMpscLinkedArrayQueuePad2 { + protected long consumerMask; + protected E[] consumerBuffer; + protected long consumerIndex; +} + +abstract class BaseMpscLinkedArrayQueuePad3 extends BaseMpscLinkedArrayQueueConsumerFields { + long p0, p1, p2, p3, p4, p5, p6, p7; + long p10, p11, p12, p13, p14, p15, p16, p17; +} + +abstract class BaseMpscLinkedArrayQueueColdProducerFields + extends BaseMpscLinkedArrayQueuePad3 { + protected volatile long producerLimit; + protected long producerMask; + protected E[] producerBuffer; +} + +@SuppressWarnings({"PMD", "restriction"}) +abstract class BaseMpscLinkedArrayQueue extends BaseMpscLinkedArrayQueueColdProducerFields { + // No post padding here, subclasses must add + + private final static long P_INDEX_OFFSET; + private final static long C_INDEX_OFFSET; + private final static long P_LIMIT_OFFSET; + + static { + try { + Field iField = BaseMpscLinkedArrayQueueProducerFields.class.getDeclaredField("producerIndex"); + P_INDEX_OFFSET = UNSAFE.objectFieldOffset(iField); + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } + try { + Field iField = BaseMpscLinkedArrayQueueConsumerFields.class.getDeclaredField("consumerIndex"); + C_INDEX_OFFSET = UNSAFE.objectFieldOffset(iField); + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } + try { + Field iField = + BaseMpscLinkedArrayQueueColdProducerFields.class.getDeclaredField("producerLimit"); + P_LIMIT_OFFSET = UNSAFE.objectFieldOffset(iField); + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } + } + + private final static Object JUMP = new Object(); + + /** + * @param initialCapacity the queue initial capacity. If chunk size is fixed this will be the + * chunk size. Must be 2 or more. + */ + public BaseMpscLinkedArrayQueue(final int initialCapacity) { + if (initialCapacity < 2) { + throw new IllegalArgumentException("Initial capacity must be 2 or more"); + } + + int p2capacity = ceilingPowerOfTwo(initialCapacity); + // leave lower bit of mask clear + long mask = (p2capacity - 1) << 1; + // need extra element to point at next array + E[] buffer = allocate(p2capacity + 1); + producerBuffer = buffer; + producerMask = mask; + consumerBuffer = buffer; + consumerMask = mask; + soProducerLimit(mask); // we know it's all empty to start with + } + + @Override + public final Iterator iterator() { + throw new UnsupportedOperationException(); + } + + @Override + public String toString() { + return getClass().getName() + "@" + Integer.toHexString(hashCode()); + } + + @Override + public boolean offer(final E e) { + if (null == e) { + throw new NullPointerException(); + } + + long mask; + E[] buffer; + long pIndex; + + while (true) { + long producerLimit = lvProducerLimit(); + pIndex = lvProducerIndex(); + // lower bit is indicative of resize, if we see it we spin until it's cleared + if ((pIndex & 1) == 1) { + continue; + } + // pIndex is even (lower bit is 0) -> actual index is (pIndex >> 1) + + // mask/buffer may get changed by resizing -> only use for array access after successful CAS. + mask = this.producerMask; + buffer = this.producerBuffer; + // a successful CAS ties the ordering, lv(pIndex)-[mask/buffer]->cas(pIndex) + + // assumption behind this optimization is that queue is almost always empty or near empty + if (producerLimit <= pIndex) { + int result = offerSlowPath(mask, pIndex, producerLimit); + switch (result) { + case 0: + break; + case 1: + continue; + case 2: + return false; + case 3: + resize(mask, buffer, pIndex, e); + return true; + } + } + + if (casProducerIndex(pIndex, pIndex + 2)) { + break; + } + } + // INDEX visible before ELEMENT, consistent with consumer expectation + final long offset = modifiedCalcElementOffset(pIndex, mask); + soElement(buffer, offset, e); + return true; + } + + /** + * We do not inline resize into this method because we do not resize on fill. + */ + private int offerSlowPath(long mask, long pIndex, long producerLimit) { + int result; + final long cIndex = lvConsumerIndex(); + long bufferCapacity = getCurrentBufferCapacity(mask); + result = 0;// 0 - goto pIndex CAS + if (cIndex + bufferCapacity > pIndex) { + if (!casProducerLimit(producerLimit, cIndex + bufferCapacity)) { + result = 1;// retry from top + } + } + // full and cannot grow + else if (availableInQueue(pIndex, cIndex) <= 0) { + result = 2;// -> return false; + } + // grab index for resize -> set lower bit + else if (casProducerIndex(pIndex, pIndex + 1)) { + result = 3;// -> resize + } else { + result = 1;// failed resize attempt, retry from top + } + return result; + } + + /** + * @return available elements in queue * 2 + */ + protected abstract long availableInQueue(long pIndex, final long cIndex); + + /** + * This method assumes index is actually (index << 1) because lower bit is used for resize. This + * is compensated for by reducing the element shift. The computation is constant folded, so + * there's no cost. + */ + private static long modifiedCalcElementOffset(long index, long mask) { + return REF_ARRAY_BASE + ((index & mask) << (REF_ELEMENT_SHIFT - 1)); + } + + /** + * {@inheritDoc} + *

+ * This implementation is correct for single consumer thread use only. + */ + @SuppressWarnings("unchecked") + @Override + public E poll() { + final E[] buffer = consumerBuffer; + final long index = consumerIndex; + final long mask = consumerMask; + + final long offset = modifiedCalcElementOffset(index, mask); + Object e = lvElement(buffer, offset);// LoadLoad + if (e == null) { + if (index != lvProducerIndex()) { + // poll() == null iff queue is empty, null element is not strong enough indicator, so we + // must + // check the producer index. If the queue is indeed not empty we spin until element is + // visible. + do { + e = lvElement(buffer, offset); + } while (e == null); + } else { + return null; + } + } + if (e == JUMP) { + final E[] nextBuffer = getNextBuffer(buffer, mask); + return newBufferPoll(nextBuffer, index); + } + soElement(buffer, offset, null); + soConsumerIndex(index + 2); + return (E) e; + } + + /** + * {@inheritDoc} + *

+ * This implementation is correct for single consumer thread use only. + */ + @SuppressWarnings("unchecked") + @Override + public E peek() { + final E[] buffer = consumerBuffer; + final long index = consumerIndex; + final long mask = consumerMask; + + final long offset = modifiedCalcElementOffset(index, mask); + Object e = lvElement(buffer, offset);// LoadLoad + if (e == null && index != lvProducerIndex()) { + // peek() == null iff queue is empty, null element is not strong enough indicator, so we must + // check the producer index. If the queue is indeed not empty we spin until element is + // visible. + while ((e = lvElement(buffer, offset)) == null) { + ; + } + } + if (e == JUMP) { + return newBufferPeek(getNextBuffer(buffer, mask), index); + } + return (E) e; + } + + @SuppressWarnings("unchecked") + private E[] getNextBuffer(final E[] buffer, final long mask) { + final long nextArrayOffset = nextArrayOffset(mask); + final E[] nextBuffer = (E[]) lvElement(buffer, nextArrayOffset); + soElement(buffer, nextArrayOffset, null); + return nextBuffer; + } + + private long nextArrayOffset(final long mask) { + return modifiedCalcElementOffset(mask + 2, Long.MAX_VALUE); + } + + private E newBufferPoll(E[] nextBuffer, final long index) { + final long offsetInNew = newBufferAndOffset(nextBuffer, index); + final E n = lvElement(nextBuffer, offsetInNew);// LoadLoad + if (n == null) { + throw new IllegalStateException("new buffer must have at least one element"); + } + soElement(nextBuffer, offsetInNew, null);// StoreStore + soConsumerIndex(index + 2); + return n; + } + + private E newBufferPeek(E[] nextBuffer, final long index) { + final long offsetInNew = newBufferAndOffset(nextBuffer, index); + final E n = lvElement(nextBuffer, offsetInNew);// LoadLoad + if (null == n) { + throw new IllegalStateException("new buffer must have at least one element"); + } + return n; + } + + private long newBufferAndOffset(E[] nextBuffer, final long index) { + consumerBuffer = nextBuffer; + consumerMask = (nextBuffer.length - 2) << 1; + final long offsetInNew = modifiedCalcElementOffset(index, consumerMask); + return offsetInNew; + } + + @Override + public final int size() { + // NOTE: because indices are on even numbers we cannot use the size util. + + /* + * It is possible for a thread to be interrupted or reschedule between the read of the producer + * and consumer indices, therefore protection is required to ensure size is within valid range. + * In the event of concurrent polls/offers to this method the size is OVER estimated as we read + * consumer index BEFORE the producer index. + */ + long after = lvConsumerIndex(); + long size; + while (true) { + final long before = after; + final long currentProducerIndex = lvProducerIndex(); + after = lvConsumerIndex(); + if (before == after) { + size = ((currentProducerIndex - after) >> 1); + break; + } + } + // Long overflow is impossible, so size is always positive. Integer overflow is possible for the + // unbounded + // indexed queues. + if (size > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } else { + return (int) size; + } + } + + @Override + public final boolean isEmpty() { + // Order matters! + // Loading consumer before producer allows for producer increments after consumer index is read. + // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there + // is + // nothing we can do to make this an exact method. + return (this.lvConsumerIndex() == this.lvProducerIndex()); + } + + private long lvProducerIndex() { + return UNSAFE.getLongVolatile(this, P_INDEX_OFFSET); + } + + private long lvConsumerIndex() { + return UNSAFE.getLongVolatile(this, C_INDEX_OFFSET); + } + + private void soProducerIndex(long v) { + UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, v); + } + + private boolean casProducerIndex(long expect, long newValue) { + return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue); + } + + private void soConsumerIndex(long v) { + UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, v); + } + + private long lvProducerLimit() { + return producerLimit; + } + + private boolean casProducerLimit(long expect, long newValue) { + return UNSAFE.compareAndSwapLong(this, P_LIMIT_OFFSET, expect, newValue); + } + + private void soProducerLimit(long v) { + UNSAFE.putOrderedLong(this, P_LIMIT_OFFSET, v); + } + + public long currentProducerIndex() { + return lvProducerIndex() / 2; + } + + public long currentConsumerIndex() { + return lvConsumerIndex() / 2; + } + + public abstract int capacity(); + + public boolean relaxedOffer(E e) { + return offer(e); + } + + @SuppressWarnings("unchecked") + public E relaxedPoll() { + final E[] buffer = consumerBuffer; + final long index = consumerIndex; + final long mask = consumerMask; + + final long offset = modifiedCalcElementOffset(index, mask); + Object e = lvElement(buffer, offset);// LoadLoad + if (e == null) { + return null; + } + if (e == JUMP) { + final E[] nextBuffer = getNextBuffer(buffer, mask); + return newBufferPoll(nextBuffer, index); + } + soElement(buffer, offset, null); + soConsumerIndex(index + 2); + return (E) e; + } + + @SuppressWarnings("unchecked") + public E relaxedPeek() { + final E[] buffer = consumerBuffer; + final long index = consumerIndex; + final long mask = consumerMask; + + final long offset = modifiedCalcElementOffset(index, mask); + Object e = lvElement(buffer, offset);// LoadLoad + if (e == JUMP) { + return newBufferPeek(getNextBuffer(buffer, mask), index); + } + return (E) e; + } + + private void resize(long oldMask, E[] oldBuffer, long pIndex, final E e) { + int newBufferLength = getNextBufferSize(oldBuffer); + final E[] newBuffer = allocate(newBufferLength); + + producerBuffer = newBuffer; + final int newMask = (newBufferLength - 2) << 1; + producerMask = newMask; + + final long offsetInOld = modifiedCalcElementOffset(pIndex, oldMask); + final long offsetInNew = modifiedCalcElementOffset(pIndex, newMask); + + + soElement(newBuffer, offsetInNew, e);// element in new array + soElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);// buffer linked + + // ASSERT code + final long cIndex = lvConsumerIndex(); + final long availableInQueue = availableInQueue(pIndex, cIndex); + if (availableInQueue <= 0) { + throw new IllegalStateException(); + } + + // Invalidate racing CASs + // We never set the limit beyond the bounds of a buffer + soProducerLimit(pIndex + Math.min(newMask, availableInQueue)); + + // make resize visible to the other producers + soProducerIndex(pIndex + 2); + + // INDEX visible before ELEMENT, consistent with consumer expectation + + // make resize visible to consumer + soElement(oldBuffer, offsetInOld, JUMP); + } + + @SuppressWarnings("unchecked") + public static E[] allocate(int capacity) { + return (E[]) new Object[capacity]; + } + + /** + * @return next buffer size(inclusive of next array pointer) + */ + protected abstract int getNextBufferSize(E[] buffer); + + /** + * @return current buffer capacity for elements (excluding next pointer and jump entry) * 2 + */ + protected abstract long getCurrentBufferCapacity(long mask); +} + + +/** + * A concurrent access enabling class used by circular array based queues this class exposes an + * offset computation method along with differently memory fenced load/store methods into the + * underlying array. The class is pre-padded and the array is padded on either side to help with + * False sharing prvention. It is expected theat subclasses handle post padding. + *

+ * Offset calculation is separate from access to enable the reuse of a give compute offset. + *

+ * Load/Store methods using a buffer parameter are provided to allow the prevention of final + * field reload after a LoadLoad barrier. + *

+ */ +@SuppressWarnings("restriction") +final class UnsafeRefArrayAccess { + public static final long REF_ARRAY_BASE; + public static final int REF_ELEMENT_SHIFT; + static { + final int scale = UNSAFE.arrayIndexScale(Object[].class); + if (4 == scale) { + REF_ELEMENT_SHIFT = 2; + } else if (8 == scale) { + REF_ELEMENT_SHIFT = 3; + } else { + throw new IllegalStateException("Unknown pointer size"); + } + REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class); + } + + private UnsafeRefArrayAccess() {} + + /** + * A plain store (no ordering/fences) of an element to a given offset + * + * @param buffer this.buffer + * @param offset computed via {@link UnsafeRefArrayAccess#calcElementOffset(long)} + * @param e an orderly kitty + */ + public static void spElement(E[] buffer, long offset, E e) { + UNSAFE.putObject(buffer, offset, e); + } + + /** + * An ordered store(store + StoreStore barrier) of an element to a given offset + * + * @param buffer this.buffer + * @param offset computed via {@link UnsafeRefArrayAccess#calcElementOffset} + * @param e an orderly kitty + */ + public static void soElement(E[] buffer, long offset, E e) { + UNSAFE.putOrderedObject(buffer, offset, e); + } + + /** + * A plain load (no ordering/fences) of an element from a given offset. + * + * @param buffer this.buffer + * @param offset computed via {@link UnsafeRefArrayAccess#calcElementOffset(long)} + * @return the element at the offset + */ + @SuppressWarnings("unchecked") + public static E lpElement(E[] buffer, long offset) { + return (E) UNSAFE.getObject(buffer, offset); + } + + /** + * A volatile load (load + LoadLoad barrier) of an element from a given offset. + * + * @param buffer this.buffer + * @param offset computed via {@link UnsafeRefArrayAccess#calcElementOffset(long)} + * @return the element at the offset + */ + @SuppressWarnings("unchecked") + public static E lvElement(E[] buffer, long offset) { + return (E) UNSAFE.getObjectVolatile(buffer, offset); + } + + /** + * @param index desirable element index + * @return the offset in bytes within the array for a given index. + */ + public static long calcElementOffset(long index) { + return REF_ARRAY_BASE + (index << REF_ELEMENT_SHIFT); + } +} diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/WriteBuffer.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/WriteBuffer.java deleted file mode 100644 index 6b1da59bc7..0000000000 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/WriteBuffer.java +++ /dev/null @@ -1,357 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.github.benmanes.caffeine.cache; - -import static com.github.benmanes.caffeine.cache.BoundedLocalCache.ceilingPowerOfTwo; -import static java.util.Objects.requireNonNull; - -import com.github.benmanes.caffeine.base.UnsafeAccess; -import com.github.benmanes.caffeine.cache.WBHeader.ConsumerRef; - -/** - * An MPSC array queue which starts at initialCapacity and grows to maxCapacity in - * linked chunks of the initial size. The queue grows only when the current buffer is full and - * elements are not copied on resize, instead a link to the new buffer is stored in the old buffer - * for the consumer to follow. - *

- * This is a trimmed down version of MpscChunkedArrayQueue provided by - * JCTools. - * - * @author nitsanw@yahoo.com (Nitsan Wakart) - * @author ben.manes@gmail.com (Ben Manes) - */ -@SuppressWarnings("restriction") -final class WriteBuffer extends ConsumerRef { - long p60, p61, p62, p63, p64, p65, p66, p67; - long p70, p71, p72, p73, p74, p75, p76; - - static final long ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class); - static final Object JUMP = new Object(); - static final int ELEMENT_SHIFT; - - /** - * @param initialCapacity the queue initial capacity. If chunk size is fixed this will be the - * chunk size. Must be 2 or more. - * @param maxCapacity the maximum capacity will be rounded up to the closest power of 2 and will - * be the upper limit of number of elements in this queue. Must be 4 or more and round up - * to a larger power of 2 than initialCapacity. - */ - public WriteBuffer(int initialCapacity, int maxCapacity) { - Caffeine.requireArgument(maxCapacity > 4); - Caffeine.requireArgument(initialCapacity > 2); - if (ceilingPowerOfTwo(initialCapacity) >= ceilingPowerOfTwo(maxCapacity)) { - throw new IllegalArgumentException( - "Initial capacity cannot exceed maximum capacity(both rounded up to a power of 2)"); - } - - int p2capacity = ceilingPowerOfTwo(initialCapacity); - // leave lower bit of mask clear - long mask = (p2capacity - 1) << 1; - // need extra element to point at next array - @SuppressWarnings("unchecked") - E[] buffer = (E[]) new Object[p2capacity + 1]; - producerBuffer = buffer; - producerMask = mask; - consumerBuffer = buffer; - consumerMask = mask; - maxQueueCapacity = ((long) ceilingPowerOfTwo(maxCapacity)) << 1; - soProducerLimit(mask); // we know it's all empty to start with - } - - @SuppressWarnings({"PMD.SwitchStmtsShouldHaveDefault", "PMD.MissingBreakInSwitch"}) - public boolean offer(E e) { - requireNonNull(e); - - long mask; - E[] buffer; - long producerIndex; - for (;;) { - long producerLimit = lvProducerLimit(); - producerIndex = lvProducerIndex(); - - // lower bit is indicative of resize, if we see it we spin until it's cleared - if ((producerIndex & 1) == 1) { - continue; - } - // producerIndex is even (lower bit is 0) -> actual index is (producerIndex >> 1) - - // mask/buffer may get changed by resizing -> only use for array access after successful CAS - mask = producerMask; - buffer = producerBuffer; - // a successful CAS ties the ordering, lv(producerIndex)-[mask/buffer]->cas(producerIndex) - - // assumption behind this optimization is that queue is almost always empty or near empty - if (producerLimit <= producerIndex) { - int result = offerSlowPath(e, mask, buffer, producerIndex, producerLimit); - switch (result) { - case 0: break; - case 1: continue; - case 2: return false; - case 3: return true; - } - } - - if (casProducerIndex(producerIndex, producerIndex + 2)) { - break; - } - } - - long offset = modifiedCalcElementOffset(producerIndex, mask); - UnsafeAccess.UNSAFE.putOrderedObject(buffer, offset, e); - return true; - } - - private int offerSlowPath(E e, long mask, E[] buffer, long producerIndex, long producerLimit) { - int result; - long consumerIndex = lvConsumerIndex(); - long maxQueueCapacity = this.maxQueueCapacity; - long bufferCapacity = ((mask + 2) == maxQueueCapacity) ? maxQueueCapacity : mask; - - if (consumerIndex + bufferCapacity > producerIndex) { - if (!casProducerLimit(producerLimit, consumerIndex + bufferCapacity)) { - result = 1;// retry from top - } - result = 0;// 0 - goto producerIndex CAS - } - // full and cannot grow - else if (consumerIndex == (producerIndex - maxQueueCapacity)) { - result = 2;// -> return false; - } - // grab index for resize -> set lower bit - else if (casProducerIndex(producerIndex, producerIndex + 1)) { - // resize will adjust the consumerIndexCache - int newBufferLength = buffer.length; - if (buffer.length - 1 == maxQueueCapacity) { - throw new IllegalStateException(); - } - newBufferLength = 2 * buffer.length - 1; - - @SuppressWarnings("unchecked") - E[] newBuffer = (E[]) new Object[newBufferLength]; - - producerBuffer = newBuffer; - producerMask = (newBufferLength - 2) << 1; - - long offsetInOld = modifiedCalcElementOffset(producerIndex, mask); - long offsetInNew = modifiedCalcElementOffset(producerIndex, producerMask); - UnsafeAccess.UNSAFE.putOrderedObject(newBuffer, offsetInNew, e); - UnsafeAccess.UNSAFE.putOrderedObject(buffer, nextArrayOffset(mask), newBuffer); - long available = maxQueueCapacity - (producerIndex - consumerIndex); - - if (available <= 0) { - throw new IllegalStateException(); - } - // invalidate racing CASs - soProducerLimit(producerIndex + Math.min(mask, available)); - - // make resize visible to consumer - UnsafeAccess.UNSAFE.putOrderedObject(buffer, offsetInOld, JUMP); - - // make resize visible to the other producers - soProducerIndex(producerIndex + 2); - result = 3;// -> return true - } else { - result = 1;// failed resize attempt, retry from top - } - return result; - } - - @SuppressWarnings("PMD.ConfusingTernary") - public E poll() { - E[] buffer = consumerBuffer; - long index = consumerIndex; - long mask = consumerMask; - - long offset = modifiedCalcElementOffset(index, mask); - Object e = lvElement(buffer, offset);// LoadLoad - if (e == null) { - if (index != lvProducerIndex()) { - // poll() == null iff queue is empty, null element is not strong enough indicator, so we - // must check the producer index. If the queue is indeed not empty we spin until element is - // visible. - do { - e = lvElement(buffer, offset); - } while (e == null); - } else { - return null; - } - } - if (e == JUMP) { - final E[] nextBuffer = getNextBuffer(buffer, mask); - return newBufferPoll(nextBuffer, index); - } - soElement(buffer, offset, null); - soConsumerIndex(index + 2); - - @SuppressWarnings("unchecked") - E casted = (E) e; - return casted; - } - - public int size() { - /* - * It is possible for a thread to be interrupted or reschedule between the read of the producer - * and consumer indices, therefore protection is required to ensure size is within valid range. - * In the event of concurrent polls/offers to this method the size is OVER estimated as we read - * consumer index BEFORE the producer index. - */ - long after = lvConsumerIndex(); - for (;;) { - long before = after; - long currentProducerIndex = lvProducerIndex(); - after = lvConsumerIndex(); - if (before == after) { - return (int) (currentProducerIndex - after) >> 1; - } - } - } - - /** - * This method assumes index is actually (index << 1) because lower bit is used for resize. This - * is compensated for by reducing the element shift. The computation is constant folded, so - * there's no cost. - */ - private static long modifiedCalcElementOffset(long index, long mask) { - return ARRAY_BASE + ((index & mask) << (ELEMENT_SHIFT - 1)); - } - - @SuppressWarnings("unchecked") - private E[] getNextBuffer(E[] buffer, long mask) { - long nextArrayOffset = nextArrayOffset(mask); - E[] nextBuffer = (E[]) UnsafeAccess.UNSAFE.getObjectVolatile(buffer, nextArrayOffset); - UnsafeAccess.UNSAFE.putOrderedObject(buffer, nextArrayOffset, null); - return nextBuffer; - } - - private long nextArrayOffset(long mask) { - return modifiedCalcElementOffset(mask + 2, Long.MAX_VALUE); - } - - @SuppressWarnings("unchecked") - private E newBufferPoll(E[] nextBuffer, long index) { - long offsetInNew = newBufferAndOffset(nextBuffer, index); - E n = (E) UnsafeAccess.UNSAFE.getObjectVolatile(nextBuffer, offsetInNew);// LoadLoad - if (n == null) { - throw new IllegalStateException("new buffer must have at least one element"); - } - UnsafeAccess.UNSAFE.putOrderedObject(nextBuffer, offsetInNew, null);// StoreStore - soConsumerIndex(index + 2); - return n; - } - - @SuppressWarnings("PMD.ArrayIsStoredDirectly") - private long newBufferAndOffset(E[] nextBuffer, long index) { - consumerBuffer = nextBuffer; - consumerMask = (nextBuffer.length - 2) << 1; - long offsetInNew = modifiedCalcElementOffset(index, consumerMask); - return offsetInNew; - } - - @SuppressWarnings("unchecked") - public static E lvElement(E[] buffer, long offset) { - return (E) UnsafeAccess.UNSAFE.getObjectVolatile(buffer, offset); - } - - public static void soElement(E[] buffer, long offset, E e) { - UnsafeAccess.UNSAFE.putOrderedObject(buffer, offset, e); - } - - private long lvProducerIndex() { - return UnsafeAccess.UNSAFE.getLongVolatile(this, P_INDEX_OFFSET); - } - - private long lvConsumerIndex() { - return UnsafeAccess.UNSAFE.getLongVolatile(this, C_INDEX_OFFSET); - } - - private void soProducerIndex(long v) { - UnsafeAccess.UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, v); - } - - private boolean casProducerIndex(long expect, long newValue) { - return UnsafeAccess.UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue); - } - - private void soConsumerIndex(long v) { - UnsafeAccess.UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, v); - } - - private long lvProducerLimit() { - return producerLimit; - } - - private boolean casProducerLimit(long expect, long newValue) { - return UnsafeAccess.UNSAFE.compareAndSwapLong(this, P_LIMIT_OFFSET, expect, newValue); - } - - private void soProducerLimit(long v) { - UnsafeAccess.UNSAFE.putOrderedLong(this, P_LIMIT_OFFSET, v); - } - - static { - int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class); - if (scale == 4) { - ELEMENT_SHIFT = 2; - } else if (scale == 8) { - ELEMENT_SHIFT = 3; - } else { - throw new IllegalStateException("Unknown pointer size"); - } - } -} - -/** The namespace for field padding through inheritance. */ -final class WBHeader { - @SuppressWarnings("PMD.AbstractClassWithoutAbstractMethod") - static abstract class PadProducerIndex { - long p00, p01, p02, p03, p04, p05, p06, p07; - long p10, p11, p12, p13, p14, p15, p16; - } - - static abstract class ProducerIndexRef extends PadProducerIndex { - static final long P_INDEX_OFFSET = - UnsafeAccess.objectFieldOffset(ProducerIndexRef.class, "producerIndex"); - - long producerIndex; - } - - static abstract class PadColdProducer extends ProducerIndexRef { - long p20, p21, p22, p23, p24, p25, p26, p27; - long p30, p31, p32, p33, p34, p35, p36; - } - - static abstract class ColdProducerRef extends PadColdProducer { - static final long P_LIMIT_OFFSET = - UnsafeAccess.objectFieldOffset(ColdProducerRef.class, "producerLimit"); - - long maxQueueCapacity; - long producerMask; - E[] producerBuffer; - volatile long producerLimit; - } - - static abstract class PadConsumer extends ColdProducerRef { - long p40, p41, p42, p43, p44, p45, p46, p47; - long p50, p51, p52, p53, p54, p55, p56; - } - - static abstract class ConsumerRef extends PadConsumer { - static final long C_INDEX_OFFSET = - UnsafeAccess.objectFieldOffset(ConsumerRef.class, "consumerIndex"); - - long consumerMask; - E[] consumerBuffer; - long consumerIndex; - } -} diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/EvictionTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/EvictionTest.java index 6fbcabc0f4..0ad0ba9e98 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/EvictionTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/EvictionTest.java @@ -656,7 +656,7 @@ public void coldest_snapshot(Cache cache, CacheContext context @CacheSpec(implementation = Implementation.Caffeine, maximumSize = Maximum.FULL) @Test(dataProvider = "caches", expectedExceptions = UnsupportedOperationException.class) public void hottest_unmodifiable(CacheContext context, Eviction eviction) { - eviction.hottest(Integer.MAX_VALUE).clear();; + eviction.hottest(Integer.MAX_VALUE).clear(); } @CacheSpec(implementation = Implementation.Caffeine, maximumSize = Maximum.FULL) diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/WriteBufferTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MpscGrowableArrayQueueTest.java similarity index 82% rename from caffeine/src/test/java/com/github/benmanes/caffeine/cache/WriteBufferTest.java rename to caffeine/src/test/java/com/github/benmanes/caffeine/cache/MpscGrowableArrayQueueTest.java index 17108fe298..4b8f089003 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/WriteBufferTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MpscGrowableArrayQueueTest.java @@ -31,7 +31,7 @@ /** * @author ben.manes@gmail.com (Ben Manes) */ -public final class WriteBufferTest { +public final class MpscGrowableArrayQueueTest { private static final int NUM_PRODUCERS = 10; private static final int PRODUCE = 100; @@ -41,31 +41,31 @@ public final class WriteBufferTest { /* ---------------- Size -------------- */ @Test(dataProvider = "empty") - public void size_whenEmpty(WriteBuffer buffer) { + public void size_whenEmpty(MpscGrowableArrayQueue buffer) { assertThat(buffer.size(), is(0)); } @Test(dataProvider = "populated") - public void size_whenPopulated(WriteBuffer buffer) { + public void size_whenPopulated(MpscGrowableArrayQueue buffer) { assertThat(buffer.size(), is(POPULATED_SIZE)); } /* ---------------- Offer -------------- */ @Test(dataProvider = "empty") - public void offer_whenEmpty(WriteBuffer buffer) { + public void offer_whenEmpty(MpscGrowableArrayQueue buffer) { assertThat(buffer.offer(1), is(true)); assertThat(buffer.size(), is(1)); } @Test(dataProvider = "populated") - public void offer_whenPopulated(WriteBuffer buffer) { + public void offer_whenPopulated(MpscGrowableArrayQueue buffer) { assertThat(buffer.offer(1), is(true)); assertThat(buffer.size(), is(POPULATED_SIZE + 1)); } @Test(dataProvider = "full") - public void offer_whenFull(WriteBuffer buffer) { + public void offer_whenFull(MpscGrowableArrayQueue buffer) { assertThat(buffer.offer(1), is(false)); assertThat(buffer.size(), is(FULL_SIZE)); } @@ -73,18 +73,18 @@ public void offer_whenFull(WriteBuffer buffer) { /* ---------------- Poll -------------- */ @Test(dataProvider = "empty") - public void poll_whenEmpty(WriteBuffer buffer) { + public void poll_whenEmpty(MpscGrowableArrayQueue buffer) { assertThat(buffer.poll(), is(nullValue())); } @Test(dataProvider = "populated") - public void poll_whenPopulated(WriteBuffer buffer) { + public void poll_whenPopulated(MpscGrowableArrayQueue buffer) { assertThat(buffer.poll(), is(not(nullValue()))); assertThat(buffer.size(), is(POPULATED_SIZE - 1)); } @Test(dataProvider = "full") - public void poll_toEmpty(WriteBuffer buffer) { + public void poll_toEmpty(MpscGrowableArrayQueue buffer) { while (buffer.poll() != null) {} assertThat(buffer.size(), is(0)); } @@ -92,7 +92,7 @@ public void poll_toEmpty(WriteBuffer buffer) { /* ---------------- Concurrency -------------- */ @Test(dataProvider = "empty") - public void oneProducer_oneConsumer(WriteBuffer buffer) { + public void oneProducer_oneConsumer(MpscGrowableArrayQueue buffer) { AtomicInteger started = new AtomicInteger(); AtomicInteger finished = new AtomicInteger(); @@ -118,7 +118,7 @@ public void oneProducer_oneConsumer(WriteBuffer buffer) { } @Test(dataProvider = "empty") - public void manyProducers_noConsumer(WriteBuffer buffer) { + public void manyProducers_noConsumer(MpscGrowableArrayQueue buffer) { AtomicInteger count = new AtomicInteger(); ConcurrentTestHarness.timeTasks(NUM_PRODUCERS, () -> { for (int i = 0; i < PRODUCE; i++) { @@ -131,7 +131,7 @@ public void manyProducers_noConsumer(WriteBuffer buffer) { } @Test(dataProvider = "empty") - public void manyProducers_oneConsumer(WriteBuffer buffer) { + public void manyProducers_oneConsumer(MpscGrowableArrayQueue buffer) { AtomicInteger started = new AtomicInteger(); AtomicInteger finished = new AtomicInteger(); @@ -174,8 +174,8 @@ public Object[][] providesFull() { return new Object[][] {{ makePopulated(FULL_SIZE) }}; } - static WriteBuffer makePopulated(int items) { - WriteBuffer buffer = new WriteBuffer<>(4, FULL_SIZE); + static MpscGrowableArrayQueue makePopulated(int items) { + MpscGrowableArrayQueue buffer = new MpscGrowableArrayQueue<>(4, FULL_SIZE); for (int i = 0; i < items; i++) { buffer.offer(i); } diff --git a/config/findbugs/exclude.xml b/config/findbugs/exclude.xml index 1d92a6dff2..0b423e84f2 100644 --- a/config/findbugs/exclude.xml +++ b/config/findbugs/exclude.xml @@ -34,7 +34,7 @@ - + diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 9302478fff..323bada727 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -25,19 +25,19 @@ */ ext { versions = [ - akka: '2.4.11', + akka: '2.4.12', commons_compress: '1.12', - commons_lang3: '3.4', + commons_lang3: '3.5', config: '1.3.1', error_prone_annotations: '2.0.13', flip_tables: '1.0.2', - guava: '19.0', + guava: '20.0', javapoet: '1.7.0', jcache: '1.0.0', jsr305: '3.0.1', jsr330: '1', stream: '2.9.5', - univocity_parsers: '2.2.2', + univocity_parsers: '2.2.3', ycsb: '0.11.0', xz: '1.5', ] @@ -46,10 +46,10 @@ ext { easymock: '3.4', hamcrest: '2.0.0.0', jcache_tck: '1.0.1', - jctools: '1.2.1', + jctools: '2.0', junit: '4.12', - mockito: '2.2.1', - pax_exam: '4.9.1', + mockito: '2.2.9', + pax_exam: '4.9.2', testng: '6.9.12', truth: '0.24', ] @@ -57,16 +57,16 @@ ext { cache2k: '0.28-BETA', collision: '0.2.4', concurrentlinkedhashmap: '1.4.2', - ehcache2: '2.10.2.2.21', + ehcache2: '2.10.3', ehcache3: '3.1.3', - elastic_search: '5.0.0-rc1', + elastic_search: '5.0.0', infinispan: '9.0.0.Alpha4', - jackrabbit: '1.5.11', + jackrabbit: '1.5.12', jamm: '0.3.1', java_object_layout: '0.6', koloboke: '0.6.8', slf4j: '1.7.21', - tcache: '0.9.5', + tcache: '0.9.8', ] plugin_versions = [ checkstyle: '7.1.2', @@ -118,13 +118,13 @@ ext { exclude group: 'org.hamcrest' }, osgi_compile: [ - 'org.apache.felix:org.apache.felix.framework:5.6.0', + 'org.apache.felix:org.apache.felix.framework:5.6.1', "org.ops4j.pax.exam:pax-exam-junit4:${test_versions.pax_exam}", ], osgi_runtime: [ "org.ops4j.pax.exam:pax-exam-container-native:${test_versions.pax_exam}", "org.ops4j.pax.exam:pax-exam-link-mvn:${test_versions.pax_exam}", - "org.ops4j.pax.url:pax-url-aether:2.5.0", + "org.ops4j.pax.url:pax-url-aether:2.5.1", ], testng: [ dependencies.create("org.testng:testng:${test_versions.testng}") { @@ -150,7 +150,6 @@ ext { }, jamm: "com.github.jbellis:jamm:${benchmark_versions.jamm}", java_object_layout: "org.openjdk.jol:jol-cli:${benchmark_versions.java_object_layout}", - jctools_experimental: "com.github.JCTools.JCTools:jctools-experimental:v${test_versions.jctools}", koloboke: [ "net.openhft:koloboke-api-jdk8:${benchmark_versions.koloboke}", "net.openhft:koloboke-impl-jdk8:${benchmark_versions.koloboke}", diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 4c464819bd..5577a703c6 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-3.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-3.2-rc-1-bin.zip diff --git a/guava/build.gradle b/guava/build.gradle index 656ebcc539..ade2cfb2d8 100644 --- a/guava/build.gradle +++ b/guava/build.gradle @@ -9,6 +9,7 @@ dependencies { testCompile test_libraries.junit testCompile test_libraries.truth + testCompile test_libraries.jctools testCompile test_libraries.easymock testCompile test_libraries.guava_testlib } diff --git a/guava/src/main/java/com/github/benmanes/caffeine/guava/CaffeinatedGuavaCache.java b/guava/src/main/java/com/github/benmanes/caffeine/guava/CaffeinatedGuavaCache.java index 060ef809ba..b3013177db 100644 --- a/guava/src/main/java/com/github/benmanes/caffeine/guava/CaffeinatedGuavaCache.java +++ b/guava/src/main/java/com/github/benmanes/caffeine/guava/CaffeinatedGuavaCache.java @@ -19,7 +19,6 @@ import java.io.Serializable; import java.util.Collection; -import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; @@ -36,8 +35,6 @@ import com.google.common.cache.CacheStats; import com.google.common.collect.ForwardingCollection; import com.google.common.collect.ForwardingConcurrentMap; -import com.google.common.collect.ForwardingIterator; -import com.google.common.collect.ForwardingMapEntry; import com.google.common.collect.ForwardingSet; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ExecutionError; @@ -198,26 +195,6 @@ public ConcurrentMap asMap() { @Override public boolean removeIf(Predicate> filter) { return delegate().removeIf(filter); } - @Override - public Iterator> iterator() { - Iterator> iterator = delegate().iterator(); - return new ForwardingIterator>() { - @Override public Entry next() { - Entry entry = delegate().next(); - return new ForwardingMapEntry() { - @Override public V setValue(V value) { - throw new UnsupportedOperationException(); - } - @Override protected Entry delegate() { - return entry; - } - }; - } - @Override protected Iterator> delegate() { - return iterator; - } - }; - } @Override protected Set> delegate() { return cache.asMap().entrySet(); } diff --git a/guava/src/test/java/com/github/benmanes/caffeine/cache/MpscGrowableQueueSanityTest.java b/guava/src/test/java/com/github/benmanes/caffeine/cache/MpscGrowableQueueSanityTest.java new file mode 100644 index 0000000000..3adc38ebcf --- /dev/null +++ b/guava/src/test/java/com/github/benmanes/caffeine/cache/MpscGrowableQueueSanityTest.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Queue; + +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * @author nitsanw@yahoo.com (Nitsan Wakart) + */ +@RunWith(Parameterized.class) +public final class MpscGrowableQueueSanityTest extends QueueSanityTest { + @Parameterized.Parameters + public static Collection parameters() { + ArrayList list = new ArrayList(); + // MPSC size 1 + list.add(makeQueue(0, 1, 4, Ordering.FIFO, new MpscGrowableArrayQueue<>(2, 4))); + // MPSC size SIZE + list.add(makeQueue(0, 1, SIZE, Ordering.FIFO, new MpscGrowableArrayQueue<>(8, SIZE))); + return list; + } + + public MpscGrowableQueueSanityTest(ConcurrentQueueSpec spec, Queue queue) { + super(spec, queue); + } +} diff --git a/guava/src/test/java/com/github/benmanes/caffeine/cache/QueueSanityTest.java b/guava/src/test/java/com/github/benmanes/caffeine/cache/QueueSanityTest.java new file mode 100644 index 0000000000..15ebecd9c9 --- /dev/null +++ b/guava/src/test/java/com/github/benmanes/caffeine/cache/QueueSanityTest.java @@ -0,0 +1,317 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache; + +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.sameInstance; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeThat; + +import java.util.Collection; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.hamcrest.Matcher; +import org.jctools.queues.QueueFactory; +import org.jctools.queues.atomic.AtomicQueueFactory; +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.jctools.queues.spec.Preference; +import org.jctools.util.Pow2; +import org.junit.Before; +import org.junit.Test; + +/** + * @author nitsanw@yahoo.com (Nitsan Wakart) + */ +public abstract class QueueSanityTest { + + public static final int SIZE = 8192 * 2; + + private final Queue queue; + private final ConcurrentQueueSpec spec; + + public QueueSanityTest(ConcurrentQueueSpec spec, Queue queue) { + this.queue = queue; + this.spec = spec; + } + + @Before + public void clear() { + queue.clear(); + } + + @Test + public void sanity() { + for (int i = 0; i < SIZE; i++) { + assertNull(queue.poll()); + assertThat(queue, emptyAndZeroSize()); + } + int i = 0; + while (i < SIZE && queue.offer(i)) { + i++; + } + int size = i; + assertEquals(size, queue.size()); + if (spec.ordering == Ordering.FIFO) { + // expect FIFO + i = 0; + Integer p; + Integer e; + while ((p = queue.peek()) != null) { + e = queue.poll(); + assertEquals(p, e); + assertEquals(size - (i + 1), queue.size()); + assertEquals(i++, e.intValue()); + } + assertEquals(size, i); + } else { + // expect sum of elements is (size - 1) * size / 2 = 0 + 1 + .... + (size - 1) + int sum = (size - 1) * size / 2; + i = 0; + Integer e; + while ((e = queue.poll()) != null) { + assertEquals(--size, queue.size()); + sum -= e; + } + assertEquals(0, sum); + } + assertNull(queue.poll()); + assertThat(queue, emptyAndZeroSize()); + } + + @Test + public void testSizeIsTheNumberOfOffers() { + int currentSize = 0; + while (currentSize < SIZE && queue.offer(currentSize)) { + currentSize++; + assertThat(queue, hasSize(currentSize)); + } + } + + @Test + public void whenFirstInThenFirstOut() { + assumeThat(spec.ordering, is(Ordering.FIFO)); + + // Arrange + int i = 0; + while (i < SIZE && queue.offer(i)) { + i++; + } + final int size = queue.size(); + + // Act + i = 0; + Integer prev; + while ((prev = queue.peek()) != null) { + final Integer item = queue.poll(); + + assertThat(item, is(prev)); + assertThat(queue, hasSize(size - (i + 1))); + assertThat(item, is(i)); + i++; + } + + // Assert + assertThat(i, is(size)); + } + + @Test(expected = NullPointerException.class) + public void offerNullResultsInNPE() { + queue.offer(null); + } + + @Test + public void whenOfferItemAndPollItemThenSameInstanceReturnedAndQueueIsEmpty() { + assertThat(queue, emptyAndZeroSize()); + + // Act + final Integer e = new Integer(1876876); + queue.offer(e); + assertFalse(queue.isEmpty()); + assertEquals(1, queue.size()); + + final Integer oh = queue.poll(); + assertEquals(e, oh); + + // Assert + assertThat(oh, sameInstance(e)); + assertThat(queue, emptyAndZeroSize()); + } + + @Test + public void testPowerOf2Capacity() { + assumeThat(spec.isBounded(), is(true)); + int n = Pow2.roundToPowerOfTwo(spec.capacity); + + for (int i = 0; i < n; i++) { + assertTrue("Failed to insert:" + i, queue.offer(i)); + } + assertFalse(queue.offer(n)); + } + + static final class Val { + public int value; + } + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testHappensBefore() throws Exception { + final AtomicBoolean stop = new AtomicBoolean(); + final Queue q = queue; + final Val fail = new Val(); + Thread t1 = new Thread(new Runnable() { + @Override public void run() { + while (!stop.get()) { + for (int i = 1; i <= 10; i++) { + Val v = new Val(); + v.value = i; + q.offer(v); + } + // slow down the producer, this will make the queue mostly empty encouraging visibility + // issues. + Thread.yield(); + } + } + }); + Thread t2 = new Thread(new Runnable() { + @Override + public void run() { + while (!stop.get()) { + for (int i = 0; i < 10; i++) { + Val v = (Val) q.peek(); + if (v != null && v.value == 0) { + fail.value = 1; + stop.set(true); + } + q.poll(); + } + } + } + }); + + t1.start(); + t2.start(); + Thread.sleep(1000); + stop.set(true); + t1.join(); + t2.join(); + assertEquals("reordering detected", 0, fail.value); + + } + + @Test + public void testSize() throws Exception { + final AtomicBoolean stop = new AtomicBoolean(); + final Queue q = queue; + final Val fail = new Val(); + Thread t1 = new Thread(new Runnable() { + @Override + public void run() { + while (!stop.get()) { + q.offer(1); + q.poll(); + } + } + }); + Thread t2 = new Thread(new Runnable() { + @Override + public void run() { + while (!stop.get()) { + int size = q.size(); + if (size != 0 && size != 1) { + fail.value++; + } + } + } + }); + + t1.start(); + t2.start(); + Thread.sleep(1000); + stop.set(true); + t1.join(); + t2.join(); + assertEquals("Unexpected size observed", 0, fail.value); + + } + + @Test + public void testPollAfterIsEmpty() throws Exception { + final AtomicBoolean stop = new AtomicBoolean(); + final Queue q = queue; + final Val fail = new Val(); + Thread t1 = new Thread(new Runnable() { + @Override + public void run() { + while (!stop.get()) { + q.offer(1); + // slow down the producer, this will make the queue mostly empty encouraging visibility + // issues. + Thread.yield(); + } + } + }); + Thread t2 = new Thread(new Runnable() { + @Override + public void run() { + while (!stop.get()) { + if (!q.isEmpty() && q.poll() == null) { + fail.value++; + } + } + } + }); + + t1.start(); + t2.start(); + Thread.sleep(1000); + stop.set(true); + t1.join(); + t2.join(); + assertEquals("Observed no element in non-empty queue", 0, fail.value); + + } + + public static Object[] makeQueue(int producers, int consumers, int capacity, Ordering ordering, + Queue q) { + ConcurrentQueueSpec spec = + new ConcurrentQueueSpec(producers, consumers, capacity, ordering, Preference.NONE); + if (q == null) { + q = QueueFactory.newQueue(spec); + } + return new Object[] {spec, q}; + } + + public static Object[] makeAtomic(int producers, int consumers, int capacity, Ordering ordering, + Queue q) { + ConcurrentQueueSpec spec = + new ConcurrentQueueSpec(producers, consumers, capacity, ordering, Preference.NONE); + if (q == null) { + q = AtomicQueueFactory.newQueue(spec); + } + return new Object[] {spec, q}; + } + + public static Matcher> emptyAndZeroSize() { + return allOf(hasSize(0), empty()); + } +} diff --git a/guava/src/test/java/com/google/common/cache/PopulatedCachesTest.java b/guava/src/test/java/com/google/common/cache/PopulatedCachesTest.java index 939c2796b4..86377fd291 100644 --- a/guava/src/test/java/com/google/common/cache/PopulatedCachesTest.java +++ b/guava/src/test/java/com/google/common/cache/PopulatedCachesTest.java @@ -27,8 +27,6 @@ import java.util.Map.Entry; import java.util.Set; -import junit.framework.TestCase; - import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.guava.CaffeinatedGuava; import com.google.common.base.Function; @@ -42,6 +40,8 @@ import com.google.common.collect.Maps; import com.google.common.testing.EqualsTester; +import junit.framework.TestCase; + /** * {@link LoadingCache} tests that deal with caches that actually contain some key-value mappings. * @@ -279,15 +279,19 @@ public void testWriteThroughEntry() { for (LoadingCache cache : caches()) { cache.getUnchecked(1); Entry entry = Iterables.getOnlyElement(cache.asMap().entrySet()); - try { - entry.setValue(3); - fail("expected entry.setValue to throw UnsupportedOperationException"); - } catch (UnsupportedOperationException expected) { - } + + cache.invalidate(1); + assertEquals(0, cache.size()); + + entry.setValue(3); + assertEquals(1, cache.size()); + assertEquals(3, cache.getIfPresent(1)); + checkValidState(cache); + try { entry.setValue(null); - fail("expected entry.setValue(null) to throw UnsupportedOperationException"); - } catch (UnsupportedOperationException expected) { + fail(); + } catch (NullPointerException expected) { } checkValidState(cache); }