diff --git a/caffeine/build.gradle b/caffeine/build.gradle index 876b8523ae..5ca45be13e 100644 --- a/caffeine/build.gradle +++ b/caffeine/build.gradle @@ -41,7 +41,6 @@ dependencies { jmh benchmark_libraries.infinispan jmh benchmark_libraries.jackrabbit jmh benchmark_libraries.elastic_search - jmh benchmark_libraries.jctools_experimental jmh benchmark_libraries.concurrentlinkedhashmap javaPoetCompile libraries.guava diff --git a/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/Specifications.java b/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/Specifications.java index 8b57026c8d..9b9d043aab 100644 --- a/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/Specifications.java +++ b/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/Specifications.java @@ -99,7 +99,8 @@ public final class Specifications { public static final TypeName WRITE_ORDER_DEQUE = ParameterizedTypeName.get( ClassName.get(PACKAGE_NAME, "WriteOrderDeque"), NODE); - public static final ClassName WRITE_QUEUE_TYPE = ClassName.get(PACKAGE_NAME, "WriteBuffer"); + public static final ClassName WRITE_QUEUE_TYPE = + ClassName.get(PACKAGE_NAME, "MpscGrowableArrayQueue"); public static final TypeName WRITE_QUEUE = ParameterizedTypeName.get( WRITE_QUEUE_TYPE, ClassName.get(Runnable.class)); diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/base/UnsafeAccess.java b/caffeine/src/main/java/com/github/benmanes/caffeine/base/UnsafeAccess.java index c2ab6e3e19..8a748e0717 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/base/UnsafeAccess.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/base/UnsafeAccess.java @@ -29,6 +29,7 @@ * * @author ben.manes@gmail.com (Ben Manes) */ +@SuppressWarnings("restriction") public final class UnsafeAccess { static final String ANDROID = "THE_ONE"; static final String OPEN_JDK = "theUnsafe"; diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index 5514142a2b..015a19edd6 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -235,7 +235,7 @@ protected boolean buffersWrites() { return false; } - protected WriteBuffer writeBuffer() { + protected MpscGrowableArrayQueue writeBuffer() { throw new UnsupportedOperationException(); } @@ -634,6 +634,7 @@ void evictFromMain(int candidates) { * @param victimKey the key for the entry chosen by the eviction policy for replacement * @return if the candidate should be admitted and the victim ejected */ + @GuardedBy("evictionLock") boolean admit(K candidateKey, K victimKey) { int victimFreq = frequencySketch().frequency(victimKey); int candidateFreq = frequencySketch().frequency(candidateKey); diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/MpscGrowableArrayQueue.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/MpscGrowableArrayQueue.java new file mode 100644 index 0000000000..d248e6a187 --- /dev/null +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/MpscGrowableArrayQueue.java @@ -0,0 +1,658 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache; + +import static com.github.benmanes.caffeine.base.UnsafeAccess.UNSAFE; +import static com.github.benmanes.caffeine.cache.BoundedLocalCache.ceilingPowerOfTwo; +import static com.github.benmanes.caffeine.cache.UnsafeRefArrayAccess.REF_ARRAY_BASE; +import static com.github.benmanes.caffeine.cache.UnsafeRefArrayAccess.REF_ELEMENT_SHIFT; +import static com.github.benmanes.caffeine.cache.UnsafeRefArrayAccess.lvElement; +import static com.github.benmanes.caffeine.cache.UnsafeRefArrayAccess.soElement; + +import java.lang.reflect.Field; +import java.util.AbstractQueue; +import java.util.Iterator; + +/** + * An MPSC array queue which starts at initialCapacity and grows to maxCapacity in + * linked chunks of the initial size. The queue grows only when the current buffer is full and + * elements are not copied on resize, instead a link to the new buffer is stored in the old buffer + * for the consumer to follow.
+ *

+ * This is a shaded copy of MpscGrowableArrayQueue provided by + * JCTools from version 2.0. + * + * @author nitsanw@yahoo.com (Nitsan Wakart) + */ +final class MpscGrowableArrayQueue extends MpscChunkedArrayQueue { + + /** + * @param initialCapacity the queue initial capacity. If chunk size is fixed this will be the + * chunk size. Must be 2 or more. + * @param maxCapacity the maximum capacity will be rounded up to the closest power of 2 and will + * be the upper limit of number of elements in this queue. Must be 4 or more and round up + * to a larger power of 2 than initialCapacity. + */ + public MpscGrowableArrayQueue(int initialCapacity, int maxCapacity) { + super(initialCapacity, maxCapacity); + } + + @Override + protected int getNextBufferSize(E[] buffer) { + long maxSize = maxQueueCapacity / 2; + if (buffer.length > maxSize) { + throw new IllegalStateException(); + } + final int newSize = 2 * (buffer.length - 1); + return newSize + 1; + } + + @Override + protected long getCurrentBufferCapacity(long mask) { + return (mask + 2 == maxQueueCapacity) ? maxQueueCapacity : mask; + } +} + +abstract class MpscChunkedArrayQueue extends MpscChunkedArrayQueueColdProducerFields { + long p0, p1, p2, p3, p4, p5, p6, p7; + long p10, p11, p12, p13, p14, p15, p16, p17; + + public MpscChunkedArrayQueue(int initialCapacity, int maxCapacity) { + super(initialCapacity, maxCapacity); + } + + @Override + protected long availableInQueue(long pIndex, long cIndex) { + return maxQueueCapacity - (pIndex - cIndex); + } + + @Override + public int capacity() { + return (int) (maxQueueCapacity / 2); + } + + @Override + protected int getNextBufferSize(E[] buffer) { + return buffer.length; + } + + @Override + protected long getCurrentBufferCapacity(long mask) { + return mask; + } +} + + +abstract class MpscChunkedArrayQueueColdProducerFields extends BaseMpscLinkedArrayQueue { + protected final long maxQueueCapacity; + + public MpscChunkedArrayQueueColdProducerFields(int initialCapacity, int maxCapacity) { + super(initialCapacity); + if (maxCapacity < 4) { + throw new IllegalArgumentException("Max capacity must be 4 or more"); + } + if (ceilingPowerOfTwo(initialCapacity) >= ceilingPowerOfTwo(maxCapacity)) { + throw new IllegalArgumentException( + "Initial capacity cannot exceed maximum capacity(both rounded up to a power of 2)"); + } + maxQueueCapacity = ((long) ceilingPowerOfTwo(maxCapacity)) << 1; + } +} + +abstract class BaseMpscLinkedArrayQueuePad1 extends AbstractQueue { + long p01, p02, p03, p04, p05, p06, p07; + long p10, p11, p12, p13, p14, p15, p16, p17; +} + + +abstract class BaseMpscLinkedArrayQueueProducerFields extends BaseMpscLinkedArrayQueuePad1 { + protected long producerIndex; +} + +abstract class BaseMpscLinkedArrayQueuePad2 extends BaseMpscLinkedArrayQueueProducerFields { + long p01, p02, p03, p04, p05, p06, p07; + long p10, p11, p12, p13, p14, p15, p16, p17; +} + +abstract class BaseMpscLinkedArrayQueueConsumerFields extends BaseMpscLinkedArrayQueuePad2 { + protected long consumerMask; + protected E[] consumerBuffer; + protected long consumerIndex; +} + +abstract class BaseMpscLinkedArrayQueuePad3 extends BaseMpscLinkedArrayQueueConsumerFields { + long p0, p1, p2, p3, p4, p5, p6, p7; + long p10, p11, p12, p13, p14, p15, p16, p17; +} + +abstract class BaseMpscLinkedArrayQueueColdProducerFields + extends BaseMpscLinkedArrayQueuePad3 { + protected volatile long producerLimit; + protected long producerMask; + protected E[] producerBuffer; +} + +@SuppressWarnings({"PMD", "restriction"}) +abstract class BaseMpscLinkedArrayQueue extends BaseMpscLinkedArrayQueueColdProducerFields { + // No post padding here, subclasses must add + + private final static long P_INDEX_OFFSET; + private final static long C_INDEX_OFFSET; + private final static long P_LIMIT_OFFSET; + + static { + try { + Field iField = BaseMpscLinkedArrayQueueProducerFields.class.getDeclaredField("producerIndex"); + P_INDEX_OFFSET = UNSAFE.objectFieldOffset(iField); + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } + try { + Field iField = BaseMpscLinkedArrayQueueConsumerFields.class.getDeclaredField("consumerIndex"); + C_INDEX_OFFSET = UNSAFE.objectFieldOffset(iField); + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } + try { + Field iField = + BaseMpscLinkedArrayQueueColdProducerFields.class.getDeclaredField("producerLimit"); + P_LIMIT_OFFSET = UNSAFE.objectFieldOffset(iField); + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } + } + + private final static Object JUMP = new Object(); + + /** + * @param initialCapacity the queue initial capacity. If chunk size is fixed this will be the + * chunk size. Must be 2 or more. + */ + public BaseMpscLinkedArrayQueue(final int initialCapacity) { + if (initialCapacity < 2) { + throw new IllegalArgumentException("Initial capacity must be 2 or more"); + } + + int p2capacity = ceilingPowerOfTwo(initialCapacity); + // leave lower bit of mask clear + long mask = (p2capacity - 1) << 1; + // need extra element to point at next array + E[] buffer = allocate(p2capacity + 1); + producerBuffer = buffer; + producerMask = mask; + consumerBuffer = buffer; + consumerMask = mask; + soProducerLimit(mask); // we know it's all empty to start with + } + + @Override + public final Iterator iterator() { + throw new UnsupportedOperationException(); + } + + @Override + public String toString() { + return getClass().getName() + "@" + Integer.toHexString(hashCode()); + } + + @Override + public boolean offer(final E e) { + if (null == e) { + throw new NullPointerException(); + } + + long mask; + E[] buffer; + long pIndex; + + while (true) { + long producerLimit = lvProducerLimit(); + pIndex = lvProducerIndex(); + // lower bit is indicative of resize, if we see it we spin until it's cleared + if ((pIndex & 1) == 1) { + continue; + } + // pIndex is even (lower bit is 0) -> actual index is (pIndex >> 1) + + // mask/buffer may get changed by resizing -> only use for array access after successful CAS. + mask = this.producerMask; + buffer = this.producerBuffer; + // a successful CAS ties the ordering, lv(pIndex)-[mask/buffer]->cas(pIndex) + + // assumption behind this optimization is that queue is almost always empty or near empty + if (producerLimit <= pIndex) { + int result = offerSlowPath(mask, pIndex, producerLimit); + switch (result) { + case 0: + break; + case 1: + continue; + case 2: + return false; + case 3: + resize(mask, buffer, pIndex, e); + return true; + } + } + + if (casProducerIndex(pIndex, pIndex + 2)) { + break; + } + } + // INDEX visible before ELEMENT, consistent with consumer expectation + final long offset = modifiedCalcElementOffset(pIndex, mask); + soElement(buffer, offset, e); + return true; + } + + /** + * We do not inline resize into this method because we do not resize on fill. + */ + private int offerSlowPath(long mask, long pIndex, long producerLimit) { + int result; + final long cIndex = lvConsumerIndex(); + long bufferCapacity = getCurrentBufferCapacity(mask); + result = 0;// 0 - goto pIndex CAS + if (cIndex + bufferCapacity > pIndex) { + if (!casProducerLimit(producerLimit, cIndex + bufferCapacity)) { + result = 1;// retry from top + } + } + // full and cannot grow + else if (availableInQueue(pIndex, cIndex) <= 0) { + result = 2;// -> return false; + } + // grab index for resize -> set lower bit + else if (casProducerIndex(pIndex, pIndex + 1)) { + result = 3;// -> resize + } else { + result = 1;// failed resize attempt, retry from top + } + return result; + } + + /** + * @return available elements in queue * 2 + */ + protected abstract long availableInQueue(long pIndex, final long cIndex); + + /** + * This method assumes index is actually (index << 1) because lower bit is used for resize. This + * is compensated for by reducing the element shift. The computation is constant folded, so + * there's no cost. + */ + private static long modifiedCalcElementOffset(long index, long mask) { + return REF_ARRAY_BASE + ((index & mask) << (REF_ELEMENT_SHIFT - 1)); + } + + /** + * {@inheritDoc} + *

+ * This implementation is correct for single consumer thread use only. + */ + @SuppressWarnings("unchecked") + @Override + public E poll() { + final E[] buffer = consumerBuffer; + final long index = consumerIndex; + final long mask = consumerMask; + + final long offset = modifiedCalcElementOffset(index, mask); + Object e = lvElement(buffer, offset);// LoadLoad + if (e == null) { + if (index != lvProducerIndex()) { + // poll() == null iff queue is empty, null element is not strong enough indicator, so we + // must + // check the producer index. If the queue is indeed not empty we spin until element is + // visible. + do { + e = lvElement(buffer, offset); + } while (e == null); + } else { + return null; + } + } + if (e == JUMP) { + final E[] nextBuffer = getNextBuffer(buffer, mask); + return newBufferPoll(nextBuffer, index); + } + soElement(buffer, offset, null); + soConsumerIndex(index + 2); + return (E) e; + } + + /** + * {@inheritDoc} + *

+ * This implementation is correct for single consumer thread use only. + */ + @SuppressWarnings("unchecked") + @Override + public E peek() { + final E[] buffer = consumerBuffer; + final long index = consumerIndex; + final long mask = consumerMask; + + final long offset = modifiedCalcElementOffset(index, mask); + Object e = lvElement(buffer, offset);// LoadLoad + if (e == null && index != lvProducerIndex()) { + // peek() == null iff queue is empty, null element is not strong enough indicator, so we must + // check the producer index. If the queue is indeed not empty we spin until element is + // visible. + while ((e = lvElement(buffer, offset)) == null) { + ; + } + } + if (e == JUMP) { + return newBufferPeek(getNextBuffer(buffer, mask), index); + } + return (E) e; + } + + @SuppressWarnings("unchecked") + private E[] getNextBuffer(final E[] buffer, final long mask) { + final long nextArrayOffset = nextArrayOffset(mask); + final E[] nextBuffer = (E[]) lvElement(buffer, nextArrayOffset); + soElement(buffer, nextArrayOffset, null); + return nextBuffer; + } + + private long nextArrayOffset(final long mask) { + return modifiedCalcElementOffset(mask + 2, Long.MAX_VALUE); + } + + private E newBufferPoll(E[] nextBuffer, final long index) { + final long offsetInNew = newBufferAndOffset(nextBuffer, index); + final E n = lvElement(nextBuffer, offsetInNew);// LoadLoad + if (n == null) { + throw new IllegalStateException("new buffer must have at least one element"); + } + soElement(nextBuffer, offsetInNew, null);// StoreStore + soConsumerIndex(index + 2); + return n; + } + + private E newBufferPeek(E[] nextBuffer, final long index) { + final long offsetInNew = newBufferAndOffset(nextBuffer, index); + final E n = lvElement(nextBuffer, offsetInNew);// LoadLoad + if (null == n) { + throw new IllegalStateException("new buffer must have at least one element"); + } + return n; + } + + private long newBufferAndOffset(E[] nextBuffer, final long index) { + consumerBuffer = nextBuffer; + consumerMask = (nextBuffer.length - 2) << 1; + final long offsetInNew = modifiedCalcElementOffset(index, consumerMask); + return offsetInNew; + } + + @Override + public final int size() { + // NOTE: because indices are on even numbers we cannot use the size util. + + /* + * It is possible for a thread to be interrupted or reschedule between the read of the producer + * and consumer indices, therefore protection is required to ensure size is within valid range. + * In the event of concurrent polls/offers to this method the size is OVER estimated as we read + * consumer index BEFORE the producer index. + */ + long after = lvConsumerIndex(); + long size; + while (true) { + final long before = after; + final long currentProducerIndex = lvProducerIndex(); + after = lvConsumerIndex(); + if (before == after) { + size = ((currentProducerIndex - after) >> 1); + break; + } + } + // Long overflow is impossible, so size is always positive. Integer overflow is possible for the + // unbounded + // indexed queues. + if (size > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } else { + return (int) size; + } + } + + @Override + public final boolean isEmpty() { + // Order matters! + // Loading consumer before producer allows for producer increments after consumer index is read. + // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there + // is + // nothing we can do to make this an exact method. + return (this.lvConsumerIndex() == this.lvProducerIndex()); + } + + private long lvProducerIndex() { + return UNSAFE.getLongVolatile(this, P_INDEX_OFFSET); + } + + private long lvConsumerIndex() { + return UNSAFE.getLongVolatile(this, C_INDEX_OFFSET); + } + + private void soProducerIndex(long v) { + UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, v); + } + + private boolean casProducerIndex(long expect, long newValue) { + return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue); + } + + private void soConsumerIndex(long v) { + UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, v); + } + + private long lvProducerLimit() { + return producerLimit; + } + + private boolean casProducerLimit(long expect, long newValue) { + return UNSAFE.compareAndSwapLong(this, P_LIMIT_OFFSET, expect, newValue); + } + + private void soProducerLimit(long v) { + UNSAFE.putOrderedLong(this, P_LIMIT_OFFSET, v); + } + + public long currentProducerIndex() { + return lvProducerIndex() / 2; + } + + public long currentConsumerIndex() { + return lvConsumerIndex() / 2; + } + + public abstract int capacity(); + + public boolean relaxedOffer(E e) { + return offer(e); + } + + @SuppressWarnings("unchecked") + public E relaxedPoll() { + final E[] buffer = consumerBuffer; + final long index = consumerIndex; + final long mask = consumerMask; + + final long offset = modifiedCalcElementOffset(index, mask); + Object e = lvElement(buffer, offset);// LoadLoad + if (e == null) { + return null; + } + if (e == JUMP) { + final E[] nextBuffer = getNextBuffer(buffer, mask); + return newBufferPoll(nextBuffer, index); + } + soElement(buffer, offset, null); + soConsumerIndex(index + 2); + return (E) e; + } + + @SuppressWarnings("unchecked") + public E relaxedPeek() { + final E[] buffer = consumerBuffer; + final long index = consumerIndex; + final long mask = consumerMask; + + final long offset = modifiedCalcElementOffset(index, mask); + Object e = lvElement(buffer, offset);// LoadLoad + if (e == JUMP) { + return newBufferPeek(getNextBuffer(buffer, mask), index); + } + return (E) e; + } + + private void resize(long oldMask, E[] oldBuffer, long pIndex, final E e) { + int newBufferLength = getNextBufferSize(oldBuffer); + final E[] newBuffer = allocate(newBufferLength); + + producerBuffer = newBuffer; + final int newMask = (newBufferLength - 2) << 1; + producerMask = newMask; + + final long offsetInOld = modifiedCalcElementOffset(pIndex, oldMask); + final long offsetInNew = modifiedCalcElementOffset(pIndex, newMask); + + + soElement(newBuffer, offsetInNew, e);// element in new array + soElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);// buffer linked + + // ASSERT code + final long cIndex = lvConsumerIndex(); + final long availableInQueue = availableInQueue(pIndex, cIndex); + if (availableInQueue <= 0) { + throw new IllegalStateException(); + } + + // Invalidate racing CASs + // We never set the limit beyond the bounds of a buffer + soProducerLimit(pIndex + Math.min(newMask, availableInQueue)); + + // make resize visible to the other producers + soProducerIndex(pIndex + 2); + + // INDEX visible before ELEMENT, consistent with consumer expectation + + // make resize visible to consumer + soElement(oldBuffer, offsetInOld, JUMP); + } + + @SuppressWarnings("unchecked") + public static E[] allocate(int capacity) { + return (E[]) new Object[capacity]; + } + + /** + * @return next buffer size(inclusive of next array pointer) + */ + protected abstract int getNextBufferSize(E[] buffer); + + /** + * @return current buffer capacity for elements (excluding next pointer and jump entry) * 2 + */ + protected abstract long getCurrentBufferCapacity(long mask); +} + + +/** + * A concurrent access enabling class used by circular array based queues this class exposes an + * offset computation method along with differently memory fenced load/store methods into the + * underlying array. The class is pre-padded and the array is padded on either side to help with + * False sharing prvention. It is expected theat subclasses handle post padding. + *

+ * Offset calculation is separate from access to enable the reuse of a give compute offset. + *

+ * Load/Store methods using a buffer parameter are provided to allow the prevention of final + * field reload after a LoadLoad barrier. + *

+ */ +@SuppressWarnings("restriction") +final class UnsafeRefArrayAccess { + public static final long REF_ARRAY_BASE; + public static final int REF_ELEMENT_SHIFT; + static { + final int scale = UNSAFE.arrayIndexScale(Object[].class); + if (4 == scale) { + REF_ELEMENT_SHIFT = 2; + } else if (8 == scale) { + REF_ELEMENT_SHIFT = 3; + } else { + throw new IllegalStateException("Unknown pointer size"); + } + REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class); + } + + private UnsafeRefArrayAccess() {} + + /** + * A plain store (no ordering/fences) of an element to a given offset + * + * @param buffer this.buffer + * @param offset computed via {@link UnsafeRefArrayAccess#calcElementOffset(long)} + * @param e an orderly kitty + */ + public static void spElement(E[] buffer, long offset, E e) { + UNSAFE.putObject(buffer, offset, e); + } + + /** + * An ordered store(store + StoreStore barrier) of an element to a given offset + * + * @param buffer this.buffer + * @param offset computed via {@link UnsafeRefArrayAccess#calcElementOffset} + * @param e an orderly kitty + */ + public static void soElement(E[] buffer, long offset, E e) { + UNSAFE.putOrderedObject(buffer, offset, e); + } + + /** + * A plain load (no ordering/fences) of an element from a given offset. + * + * @param buffer this.buffer + * @param offset computed via {@link UnsafeRefArrayAccess#calcElementOffset(long)} + * @return the element at the offset + */ + @SuppressWarnings("unchecked") + public static E lpElement(E[] buffer, long offset) { + return (E) UNSAFE.getObject(buffer, offset); + } + + /** + * A volatile load (load + LoadLoad barrier) of an element from a given offset. + * + * @param buffer this.buffer + * @param offset computed via {@link UnsafeRefArrayAccess#calcElementOffset(long)} + * @return the element at the offset + */ + @SuppressWarnings("unchecked") + public static E lvElement(E[] buffer, long offset) { + return (E) UNSAFE.getObjectVolatile(buffer, offset); + } + + /** + * @param index desirable element index + * @return the offset in bytes within the array for a given index. + */ + public static long calcElementOffset(long index) { + return REF_ARRAY_BASE + (index << REF_ELEMENT_SHIFT); + } +} diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/WriteBuffer.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/WriteBuffer.java deleted file mode 100644 index 6b1da59bc7..0000000000 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/WriteBuffer.java +++ /dev/null @@ -1,357 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.github.benmanes.caffeine.cache; - -import static com.github.benmanes.caffeine.cache.BoundedLocalCache.ceilingPowerOfTwo; -import static java.util.Objects.requireNonNull; - -import com.github.benmanes.caffeine.base.UnsafeAccess; -import com.github.benmanes.caffeine.cache.WBHeader.ConsumerRef; - -/** - * An MPSC array queue which starts at initialCapacity and grows to maxCapacity in - * linked chunks of the initial size. The queue grows only when the current buffer is full and - * elements are not copied on resize, instead a link to the new buffer is stored in the old buffer - * for the consumer to follow. - *

- * This is a trimmed down version of MpscChunkedArrayQueue provided by - * JCTools. - * - * @author nitsanw@yahoo.com (Nitsan Wakart) - * @author ben.manes@gmail.com (Ben Manes) - */ -@SuppressWarnings("restriction") -final class WriteBuffer extends ConsumerRef { - long p60, p61, p62, p63, p64, p65, p66, p67; - long p70, p71, p72, p73, p74, p75, p76; - - static final long ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class); - static final Object JUMP = new Object(); - static final int ELEMENT_SHIFT; - - /** - * @param initialCapacity the queue initial capacity. If chunk size is fixed this will be the - * chunk size. Must be 2 or more. - * @param maxCapacity the maximum capacity will be rounded up to the closest power of 2 and will - * be the upper limit of number of elements in this queue. Must be 4 or more and round up - * to a larger power of 2 than initialCapacity. - */ - public WriteBuffer(int initialCapacity, int maxCapacity) { - Caffeine.requireArgument(maxCapacity > 4); - Caffeine.requireArgument(initialCapacity > 2); - if (ceilingPowerOfTwo(initialCapacity) >= ceilingPowerOfTwo(maxCapacity)) { - throw new IllegalArgumentException( - "Initial capacity cannot exceed maximum capacity(both rounded up to a power of 2)"); - } - - int p2capacity = ceilingPowerOfTwo(initialCapacity); - // leave lower bit of mask clear - long mask = (p2capacity - 1) << 1; - // need extra element to point at next array - @SuppressWarnings("unchecked") - E[] buffer = (E[]) new Object[p2capacity + 1]; - producerBuffer = buffer; - producerMask = mask; - consumerBuffer = buffer; - consumerMask = mask; - maxQueueCapacity = ((long) ceilingPowerOfTwo(maxCapacity)) << 1; - soProducerLimit(mask); // we know it's all empty to start with - } - - @SuppressWarnings({"PMD.SwitchStmtsShouldHaveDefault", "PMD.MissingBreakInSwitch"}) - public boolean offer(E e) { - requireNonNull(e); - - long mask; - E[] buffer; - long producerIndex; - for (;;) { - long producerLimit = lvProducerLimit(); - producerIndex = lvProducerIndex(); - - // lower bit is indicative of resize, if we see it we spin until it's cleared - if ((producerIndex & 1) == 1) { - continue; - } - // producerIndex is even (lower bit is 0) -> actual index is (producerIndex >> 1) - - // mask/buffer may get changed by resizing -> only use for array access after successful CAS - mask = producerMask; - buffer = producerBuffer; - // a successful CAS ties the ordering, lv(producerIndex)-[mask/buffer]->cas(producerIndex) - - // assumption behind this optimization is that queue is almost always empty or near empty - if (producerLimit <= producerIndex) { - int result = offerSlowPath(e, mask, buffer, producerIndex, producerLimit); - switch (result) { - case 0: break; - case 1: continue; - case 2: return false; - case 3: return true; - } - } - - if (casProducerIndex(producerIndex, producerIndex + 2)) { - break; - } - } - - long offset = modifiedCalcElementOffset(producerIndex, mask); - UnsafeAccess.UNSAFE.putOrderedObject(buffer, offset, e); - return true; - } - - private int offerSlowPath(E e, long mask, E[] buffer, long producerIndex, long producerLimit) { - int result; - long consumerIndex = lvConsumerIndex(); - long maxQueueCapacity = this.maxQueueCapacity; - long bufferCapacity = ((mask + 2) == maxQueueCapacity) ? maxQueueCapacity : mask; - - if (consumerIndex + bufferCapacity > producerIndex) { - if (!casProducerLimit(producerLimit, consumerIndex + bufferCapacity)) { - result = 1;// retry from top - } - result = 0;// 0 - goto producerIndex CAS - } - // full and cannot grow - else if (consumerIndex == (producerIndex - maxQueueCapacity)) { - result = 2;// -> return false; - } - // grab index for resize -> set lower bit - else if (casProducerIndex(producerIndex, producerIndex + 1)) { - // resize will adjust the consumerIndexCache - int newBufferLength = buffer.length; - if (buffer.length - 1 == maxQueueCapacity) { - throw new IllegalStateException(); - } - newBufferLength = 2 * buffer.length - 1; - - @SuppressWarnings("unchecked") - E[] newBuffer = (E[]) new Object[newBufferLength]; - - producerBuffer = newBuffer; - producerMask = (newBufferLength - 2) << 1; - - long offsetInOld = modifiedCalcElementOffset(producerIndex, mask); - long offsetInNew = modifiedCalcElementOffset(producerIndex, producerMask); - UnsafeAccess.UNSAFE.putOrderedObject(newBuffer, offsetInNew, e); - UnsafeAccess.UNSAFE.putOrderedObject(buffer, nextArrayOffset(mask), newBuffer); - long available = maxQueueCapacity - (producerIndex - consumerIndex); - - if (available <= 0) { - throw new IllegalStateException(); - } - // invalidate racing CASs - soProducerLimit(producerIndex + Math.min(mask, available)); - - // make resize visible to consumer - UnsafeAccess.UNSAFE.putOrderedObject(buffer, offsetInOld, JUMP); - - // make resize visible to the other producers - soProducerIndex(producerIndex + 2); - result = 3;// -> return true - } else { - result = 1;// failed resize attempt, retry from top - } - return result; - } - - @SuppressWarnings("PMD.ConfusingTernary") - public E poll() { - E[] buffer = consumerBuffer; - long index = consumerIndex; - long mask = consumerMask; - - long offset = modifiedCalcElementOffset(index, mask); - Object e = lvElement(buffer, offset);// LoadLoad - if (e == null) { - if (index != lvProducerIndex()) { - // poll() == null iff queue is empty, null element is not strong enough indicator, so we - // must check the producer index. If the queue is indeed not empty we spin until element is - // visible. - do { - e = lvElement(buffer, offset); - } while (e == null); - } else { - return null; - } - } - if (e == JUMP) { - final E[] nextBuffer = getNextBuffer(buffer, mask); - return newBufferPoll(nextBuffer, index); - } - soElement(buffer, offset, null); - soConsumerIndex(index + 2); - - @SuppressWarnings("unchecked") - E casted = (E) e; - return casted; - } - - public int size() { - /* - * It is possible for a thread to be interrupted or reschedule between the read of the producer - * and consumer indices, therefore protection is required to ensure size is within valid range. - * In the event of concurrent polls/offers to this method the size is OVER estimated as we read - * consumer index BEFORE the producer index. - */ - long after = lvConsumerIndex(); - for (;;) { - long before = after; - long currentProducerIndex = lvProducerIndex(); - after = lvConsumerIndex(); - if (before == after) { - return (int) (currentProducerIndex - after) >> 1; - } - } - } - - /** - * This method assumes index is actually (index << 1) because lower bit is used for resize. This - * is compensated for by reducing the element shift. The computation is constant folded, so - * there's no cost. - */ - private static long modifiedCalcElementOffset(long index, long mask) { - return ARRAY_BASE + ((index & mask) << (ELEMENT_SHIFT - 1)); - } - - @SuppressWarnings("unchecked") - private E[] getNextBuffer(E[] buffer, long mask) { - long nextArrayOffset = nextArrayOffset(mask); - E[] nextBuffer = (E[]) UnsafeAccess.UNSAFE.getObjectVolatile(buffer, nextArrayOffset); - UnsafeAccess.UNSAFE.putOrderedObject(buffer, nextArrayOffset, null); - return nextBuffer; - } - - private long nextArrayOffset(long mask) { - return modifiedCalcElementOffset(mask + 2, Long.MAX_VALUE); - } - - @SuppressWarnings("unchecked") - private E newBufferPoll(E[] nextBuffer, long index) { - long offsetInNew = newBufferAndOffset(nextBuffer, index); - E n = (E) UnsafeAccess.UNSAFE.getObjectVolatile(nextBuffer, offsetInNew);// LoadLoad - if (n == null) { - throw new IllegalStateException("new buffer must have at least one element"); - } - UnsafeAccess.UNSAFE.putOrderedObject(nextBuffer, offsetInNew, null);// StoreStore - soConsumerIndex(index + 2); - return n; - } - - @SuppressWarnings("PMD.ArrayIsStoredDirectly") - private long newBufferAndOffset(E[] nextBuffer, long index) { - consumerBuffer = nextBuffer; - consumerMask = (nextBuffer.length - 2) << 1; - long offsetInNew = modifiedCalcElementOffset(index, consumerMask); - return offsetInNew; - } - - @SuppressWarnings("unchecked") - public static E lvElement(E[] buffer, long offset) { - return (E) UnsafeAccess.UNSAFE.getObjectVolatile(buffer, offset); - } - - public static void soElement(E[] buffer, long offset, E e) { - UnsafeAccess.UNSAFE.putOrderedObject(buffer, offset, e); - } - - private long lvProducerIndex() { - return UnsafeAccess.UNSAFE.getLongVolatile(this, P_INDEX_OFFSET); - } - - private long lvConsumerIndex() { - return UnsafeAccess.UNSAFE.getLongVolatile(this, C_INDEX_OFFSET); - } - - private void soProducerIndex(long v) { - UnsafeAccess.UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, v); - } - - private boolean casProducerIndex(long expect, long newValue) { - return UnsafeAccess.UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue); - } - - private void soConsumerIndex(long v) { - UnsafeAccess.UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, v); - } - - private long lvProducerLimit() { - return producerLimit; - } - - private boolean casProducerLimit(long expect, long newValue) { - return UnsafeAccess.UNSAFE.compareAndSwapLong(this, P_LIMIT_OFFSET, expect, newValue); - } - - private void soProducerLimit(long v) { - UnsafeAccess.UNSAFE.putOrderedLong(this, P_LIMIT_OFFSET, v); - } - - static { - int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class); - if (scale == 4) { - ELEMENT_SHIFT = 2; - } else if (scale == 8) { - ELEMENT_SHIFT = 3; - } else { - throw new IllegalStateException("Unknown pointer size"); - } - } -} - -/** The namespace for field padding through inheritance. */ -final class WBHeader { - @SuppressWarnings("PMD.AbstractClassWithoutAbstractMethod") - static abstract class PadProducerIndex { - long p00, p01, p02, p03, p04, p05, p06, p07; - long p10, p11, p12, p13, p14, p15, p16; - } - - static abstract class ProducerIndexRef extends PadProducerIndex { - static final long P_INDEX_OFFSET = - UnsafeAccess.objectFieldOffset(ProducerIndexRef.class, "producerIndex"); - - long producerIndex; - } - - static abstract class PadColdProducer extends ProducerIndexRef { - long p20, p21, p22, p23, p24, p25, p26, p27; - long p30, p31, p32, p33, p34, p35, p36; - } - - static abstract class ColdProducerRef extends PadColdProducer { - static final long P_LIMIT_OFFSET = - UnsafeAccess.objectFieldOffset(ColdProducerRef.class, "producerLimit"); - - long maxQueueCapacity; - long producerMask; - E[] producerBuffer; - volatile long producerLimit; - } - - static abstract class PadConsumer extends ColdProducerRef { - long p40, p41, p42, p43, p44, p45, p46, p47; - long p50, p51, p52, p53, p54, p55, p56; - } - - static abstract class ConsumerRef extends PadConsumer { - static final long C_INDEX_OFFSET = - UnsafeAccess.objectFieldOffset(ConsumerRef.class, "consumerIndex"); - - long consumerMask; - E[] consumerBuffer; - long consumerIndex; - } -} diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/EvictionTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/EvictionTest.java index 6fbcabc0f4..0ad0ba9e98 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/EvictionTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/EvictionTest.java @@ -656,7 +656,7 @@ public void coldest_snapshot(Cache cache, CacheContext context @CacheSpec(implementation = Implementation.Caffeine, maximumSize = Maximum.FULL) @Test(dataProvider = "caches", expectedExceptions = UnsupportedOperationException.class) public void hottest_unmodifiable(CacheContext context, Eviction eviction) { - eviction.hottest(Integer.MAX_VALUE).clear();; + eviction.hottest(Integer.MAX_VALUE).clear(); } @CacheSpec(implementation = Implementation.Caffeine, maximumSize = Maximum.FULL) diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/WriteBufferTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MpscGrowableArrayQueueTest.java similarity index 82% rename from caffeine/src/test/java/com/github/benmanes/caffeine/cache/WriteBufferTest.java rename to caffeine/src/test/java/com/github/benmanes/caffeine/cache/MpscGrowableArrayQueueTest.java index 17108fe298..4b8f089003 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/WriteBufferTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MpscGrowableArrayQueueTest.java @@ -31,7 +31,7 @@ /** * @author ben.manes@gmail.com (Ben Manes) */ -public final class WriteBufferTest { +public final class MpscGrowableArrayQueueTest { private static final int NUM_PRODUCERS = 10; private static final int PRODUCE = 100; @@ -41,31 +41,31 @@ public final class WriteBufferTest { /* ---------------- Size -------------- */ @Test(dataProvider = "empty") - public void size_whenEmpty(WriteBuffer buffer) { + public void size_whenEmpty(MpscGrowableArrayQueue buffer) { assertThat(buffer.size(), is(0)); } @Test(dataProvider = "populated") - public void size_whenPopulated(WriteBuffer buffer) { + public void size_whenPopulated(MpscGrowableArrayQueue buffer) { assertThat(buffer.size(), is(POPULATED_SIZE)); } /* ---------------- Offer -------------- */ @Test(dataProvider = "empty") - public void offer_whenEmpty(WriteBuffer buffer) { + public void offer_whenEmpty(MpscGrowableArrayQueue buffer) { assertThat(buffer.offer(1), is(true)); assertThat(buffer.size(), is(1)); } @Test(dataProvider = "populated") - public void offer_whenPopulated(WriteBuffer buffer) { + public void offer_whenPopulated(MpscGrowableArrayQueue buffer) { assertThat(buffer.offer(1), is(true)); assertThat(buffer.size(), is(POPULATED_SIZE + 1)); } @Test(dataProvider = "full") - public void offer_whenFull(WriteBuffer buffer) { + public void offer_whenFull(MpscGrowableArrayQueue buffer) { assertThat(buffer.offer(1), is(false)); assertThat(buffer.size(), is(FULL_SIZE)); } @@ -73,18 +73,18 @@ public void offer_whenFull(WriteBuffer buffer) { /* ---------------- Poll -------------- */ @Test(dataProvider = "empty") - public void poll_whenEmpty(WriteBuffer buffer) { + public void poll_whenEmpty(MpscGrowableArrayQueue buffer) { assertThat(buffer.poll(), is(nullValue())); } @Test(dataProvider = "populated") - public void poll_whenPopulated(WriteBuffer buffer) { + public void poll_whenPopulated(MpscGrowableArrayQueue buffer) { assertThat(buffer.poll(), is(not(nullValue()))); assertThat(buffer.size(), is(POPULATED_SIZE - 1)); } @Test(dataProvider = "full") - public void poll_toEmpty(WriteBuffer buffer) { + public void poll_toEmpty(MpscGrowableArrayQueue buffer) { while (buffer.poll() != null) {} assertThat(buffer.size(), is(0)); } @@ -92,7 +92,7 @@ public void poll_toEmpty(WriteBuffer buffer) { /* ---------------- Concurrency -------------- */ @Test(dataProvider = "empty") - public void oneProducer_oneConsumer(WriteBuffer buffer) { + public void oneProducer_oneConsumer(MpscGrowableArrayQueue buffer) { AtomicInteger started = new AtomicInteger(); AtomicInteger finished = new AtomicInteger(); @@ -118,7 +118,7 @@ public void oneProducer_oneConsumer(WriteBuffer buffer) { } @Test(dataProvider = "empty") - public void manyProducers_noConsumer(WriteBuffer buffer) { + public void manyProducers_noConsumer(MpscGrowableArrayQueue buffer) { AtomicInteger count = new AtomicInteger(); ConcurrentTestHarness.timeTasks(NUM_PRODUCERS, () -> { for (int i = 0; i < PRODUCE; i++) { @@ -131,7 +131,7 @@ public void manyProducers_noConsumer(WriteBuffer buffer) { } @Test(dataProvider = "empty") - public void manyProducers_oneConsumer(WriteBuffer buffer) { + public void manyProducers_oneConsumer(MpscGrowableArrayQueue buffer) { AtomicInteger started = new AtomicInteger(); AtomicInteger finished = new AtomicInteger(); @@ -174,8 +174,8 @@ public Object[][] providesFull() { return new Object[][] {{ makePopulated(FULL_SIZE) }}; } - static WriteBuffer makePopulated(int items) { - WriteBuffer buffer = new WriteBuffer<>(4, FULL_SIZE); + static MpscGrowableArrayQueue makePopulated(int items) { + MpscGrowableArrayQueue buffer = new MpscGrowableArrayQueue<>(4, FULL_SIZE); for (int i = 0; i < items; i++) { buffer.offer(i); } diff --git a/config/findbugs/exclude.xml b/config/findbugs/exclude.xml index 1d92a6dff2..0b423e84f2 100644 --- a/config/findbugs/exclude.xml +++ b/config/findbugs/exclude.xml @@ -34,7 +34,7 @@ - + diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 9302478fff..323bada727 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -25,19 +25,19 @@ */ ext { versions = [ - akka: '2.4.11', + akka: '2.4.12', commons_compress: '1.12', - commons_lang3: '3.4', + commons_lang3: '3.5', config: '1.3.1', error_prone_annotations: '2.0.13', flip_tables: '1.0.2', - guava: '19.0', + guava: '20.0', javapoet: '1.7.0', jcache: '1.0.0', jsr305: '3.0.1', jsr330: '1', stream: '2.9.5', - univocity_parsers: '2.2.2', + univocity_parsers: '2.2.3', ycsb: '0.11.0', xz: '1.5', ] @@ -46,10 +46,10 @@ ext { easymock: '3.4', hamcrest: '2.0.0.0', jcache_tck: '1.0.1', - jctools: '1.2.1', + jctools: '2.0', junit: '4.12', - mockito: '2.2.1', - pax_exam: '4.9.1', + mockito: '2.2.9', + pax_exam: '4.9.2', testng: '6.9.12', truth: '0.24', ] @@ -57,16 +57,16 @@ ext { cache2k: '0.28-BETA', collision: '0.2.4', concurrentlinkedhashmap: '1.4.2', - ehcache2: '2.10.2.2.21', + ehcache2: '2.10.3', ehcache3: '3.1.3', - elastic_search: '5.0.0-rc1', + elastic_search: '5.0.0', infinispan: '9.0.0.Alpha4', - jackrabbit: '1.5.11', + jackrabbit: '1.5.12', jamm: '0.3.1', java_object_layout: '0.6', koloboke: '0.6.8', slf4j: '1.7.21', - tcache: '0.9.5', + tcache: '0.9.8', ] plugin_versions = [ checkstyle: '7.1.2', @@ -118,13 +118,13 @@ ext { exclude group: 'org.hamcrest' }, osgi_compile: [ - 'org.apache.felix:org.apache.felix.framework:5.6.0', + 'org.apache.felix:org.apache.felix.framework:5.6.1', "org.ops4j.pax.exam:pax-exam-junit4:${test_versions.pax_exam}", ], osgi_runtime: [ "org.ops4j.pax.exam:pax-exam-container-native:${test_versions.pax_exam}", "org.ops4j.pax.exam:pax-exam-link-mvn:${test_versions.pax_exam}", - "org.ops4j.pax.url:pax-url-aether:2.5.0", + "org.ops4j.pax.url:pax-url-aether:2.5.1", ], testng: [ dependencies.create("org.testng:testng:${test_versions.testng}") { @@ -150,7 +150,6 @@ ext { }, jamm: "com.github.jbellis:jamm:${benchmark_versions.jamm}", java_object_layout: "org.openjdk.jol:jol-cli:${benchmark_versions.java_object_layout}", - jctools_experimental: "com.github.JCTools.JCTools:jctools-experimental:v${test_versions.jctools}", koloboke: [ "net.openhft:koloboke-api-jdk8:${benchmark_versions.koloboke}", "net.openhft:koloboke-impl-jdk8:${benchmark_versions.koloboke}", diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 4c464819bd..5577a703c6 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-3.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-3.2-rc-1-bin.zip diff --git a/guava/build.gradle b/guava/build.gradle index 656ebcc539..ade2cfb2d8 100644 --- a/guava/build.gradle +++ b/guava/build.gradle @@ -9,6 +9,7 @@ dependencies { testCompile test_libraries.junit testCompile test_libraries.truth + testCompile test_libraries.jctools testCompile test_libraries.easymock testCompile test_libraries.guava_testlib } diff --git a/guava/src/main/java/com/github/benmanes/caffeine/guava/CaffeinatedGuavaCache.java b/guava/src/main/java/com/github/benmanes/caffeine/guava/CaffeinatedGuavaCache.java index 060ef809ba..b3013177db 100644 --- a/guava/src/main/java/com/github/benmanes/caffeine/guava/CaffeinatedGuavaCache.java +++ b/guava/src/main/java/com/github/benmanes/caffeine/guava/CaffeinatedGuavaCache.java @@ -19,7 +19,6 @@ import java.io.Serializable; import java.util.Collection; -import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; @@ -36,8 +35,6 @@ import com.google.common.cache.CacheStats; import com.google.common.collect.ForwardingCollection; import com.google.common.collect.ForwardingConcurrentMap; -import com.google.common.collect.ForwardingIterator; -import com.google.common.collect.ForwardingMapEntry; import com.google.common.collect.ForwardingSet; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ExecutionError; @@ -198,26 +195,6 @@ public ConcurrentMap asMap() { @Override public boolean removeIf(Predicate> filter) { return delegate().removeIf(filter); } - @Override - public Iterator> iterator() { - Iterator> iterator = delegate().iterator(); - return new ForwardingIterator>() { - @Override public Entry next() { - Entry entry = delegate().next(); - return new ForwardingMapEntry() { - @Override public V setValue(V value) { - throw new UnsupportedOperationException(); - } - @Override protected Entry delegate() { - return entry; - } - }; - } - @Override protected Iterator> delegate() { - return iterator; - } - }; - } @Override protected Set> delegate() { return cache.asMap().entrySet(); } diff --git a/guava/src/test/java/com/github/benmanes/caffeine/cache/MpscGrowableQueueSanityTest.java b/guava/src/test/java/com/github/benmanes/caffeine/cache/MpscGrowableQueueSanityTest.java new file mode 100644 index 0000000000..3adc38ebcf --- /dev/null +++ b/guava/src/test/java/com/github/benmanes/caffeine/cache/MpscGrowableQueueSanityTest.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Queue; + +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * @author nitsanw@yahoo.com (Nitsan Wakart) + */ +@RunWith(Parameterized.class) +public final class MpscGrowableQueueSanityTest extends QueueSanityTest { + @Parameterized.Parameters + public static Collection parameters() { + ArrayList list = new ArrayList(); + // MPSC size 1 + list.add(makeQueue(0, 1, 4, Ordering.FIFO, new MpscGrowableArrayQueue<>(2, 4))); + // MPSC size SIZE + list.add(makeQueue(0, 1, SIZE, Ordering.FIFO, new MpscGrowableArrayQueue<>(8, SIZE))); + return list; + } + + public MpscGrowableQueueSanityTest(ConcurrentQueueSpec spec, Queue queue) { + super(spec, queue); + } +} diff --git a/guava/src/test/java/com/github/benmanes/caffeine/cache/QueueSanityTest.java b/guava/src/test/java/com/github/benmanes/caffeine/cache/QueueSanityTest.java new file mode 100644 index 0000000000..15ebecd9c9 --- /dev/null +++ b/guava/src/test/java/com/github/benmanes/caffeine/cache/QueueSanityTest.java @@ -0,0 +1,317 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache; + +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.sameInstance; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeThat; + +import java.util.Collection; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.hamcrest.Matcher; +import org.jctools.queues.QueueFactory; +import org.jctools.queues.atomic.AtomicQueueFactory; +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.jctools.queues.spec.Preference; +import org.jctools.util.Pow2; +import org.junit.Before; +import org.junit.Test; + +/** + * @author nitsanw@yahoo.com (Nitsan Wakart) + */ +public abstract class QueueSanityTest { + + public static final int SIZE = 8192 * 2; + + private final Queue queue; + private final ConcurrentQueueSpec spec; + + public QueueSanityTest(ConcurrentQueueSpec spec, Queue queue) { + this.queue = queue; + this.spec = spec; + } + + @Before + public void clear() { + queue.clear(); + } + + @Test + public void sanity() { + for (int i = 0; i < SIZE; i++) { + assertNull(queue.poll()); + assertThat(queue, emptyAndZeroSize()); + } + int i = 0; + while (i < SIZE && queue.offer(i)) { + i++; + } + int size = i; + assertEquals(size, queue.size()); + if (spec.ordering == Ordering.FIFO) { + // expect FIFO + i = 0; + Integer p; + Integer e; + while ((p = queue.peek()) != null) { + e = queue.poll(); + assertEquals(p, e); + assertEquals(size - (i + 1), queue.size()); + assertEquals(i++, e.intValue()); + } + assertEquals(size, i); + } else { + // expect sum of elements is (size - 1) * size / 2 = 0 + 1 + .... + (size - 1) + int sum = (size - 1) * size / 2; + i = 0; + Integer e; + while ((e = queue.poll()) != null) { + assertEquals(--size, queue.size()); + sum -= e; + } + assertEquals(0, sum); + } + assertNull(queue.poll()); + assertThat(queue, emptyAndZeroSize()); + } + + @Test + public void testSizeIsTheNumberOfOffers() { + int currentSize = 0; + while (currentSize < SIZE && queue.offer(currentSize)) { + currentSize++; + assertThat(queue, hasSize(currentSize)); + } + } + + @Test + public void whenFirstInThenFirstOut() { + assumeThat(spec.ordering, is(Ordering.FIFO)); + + // Arrange + int i = 0; + while (i < SIZE && queue.offer(i)) { + i++; + } + final int size = queue.size(); + + // Act + i = 0; + Integer prev; + while ((prev = queue.peek()) != null) { + final Integer item = queue.poll(); + + assertThat(item, is(prev)); + assertThat(queue, hasSize(size - (i + 1))); + assertThat(item, is(i)); + i++; + } + + // Assert + assertThat(i, is(size)); + } + + @Test(expected = NullPointerException.class) + public void offerNullResultsInNPE() { + queue.offer(null); + } + + @Test + public void whenOfferItemAndPollItemThenSameInstanceReturnedAndQueueIsEmpty() { + assertThat(queue, emptyAndZeroSize()); + + // Act + final Integer e = new Integer(1876876); + queue.offer(e); + assertFalse(queue.isEmpty()); + assertEquals(1, queue.size()); + + final Integer oh = queue.poll(); + assertEquals(e, oh); + + // Assert + assertThat(oh, sameInstance(e)); + assertThat(queue, emptyAndZeroSize()); + } + + @Test + public void testPowerOf2Capacity() { + assumeThat(spec.isBounded(), is(true)); + int n = Pow2.roundToPowerOfTwo(spec.capacity); + + for (int i = 0; i < n; i++) { + assertTrue("Failed to insert:" + i, queue.offer(i)); + } + assertFalse(queue.offer(n)); + } + + static final class Val { + public int value; + } + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testHappensBefore() throws Exception { + final AtomicBoolean stop = new AtomicBoolean(); + final Queue q = queue; + final Val fail = new Val(); + Thread t1 = new Thread(new Runnable() { + @Override public void run() { + while (!stop.get()) { + for (int i = 1; i <= 10; i++) { + Val v = new Val(); + v.value = i; + q.offer(v); + } + // slow down the producer, this will make the queue mostly empty encouraging visibility + // issues. + Thread.yield(); + } + } + }); + Thread t2 = new Thread(new Runnable() { + @Override + public void run() { + while (!stop.get()) { + for (int i = 0; i < 10; i++) { + Val v = (Val) q.peek(); + if (v != null && v.value == 0) { + fail.value = 1; + stop.set(true); + } + q.poll(); + } + } + } + }); + + t1.start(); + t2.start(); + Thread.sleep(1000); + stop.set(true); + t1.join(); + t2.join(); + assertEquals("reordering detected", 0, fail.value); + + } + + @Test + public void testSize() throws Exception { + final AtomicBoolean stop = new AtomicBoolean(); + final Queue q = queue; + final Val fail = new Val(); + Thread t1 = new Thread(new Runnable() { + @Override + public void run() { + while (!stop.get()) { + q.offer(1); + q.poll(); + } + } + }); + Thread t2 = new Thread(new Runnable() { + @Override + public void run() { + while (!stop.get()) { + int size = q.size(); + if (size != 0 && size != 1) { + fail.value++; + } + } + } + }); + + t1.start(); + t2.start(); + Thread.sleep(1000); + stop.set(true); + t1.join(); + t2.join(); + assertEquals("Unexpected size observed", 0, fail.value); + + } + + @Test + public void testPollAfterIsEmpty() throws Exception { + final AtomicBoolean stop = new AtomicBoolean(); + final Queue q = queue; + final Val fail = new Val(); + Thread t1 = new Thread(new Runnable() { + @Override + public void run() { + while (!stop.get()) { + q.offer(1); + // slow down the producer, this will make the queue mostly empty encouraging visibility + // issues. + Thread.yield(); + } + } + }); + Thread t2 = new Thread(new Runnable() { + @Override + public void run() { + while (!stop.get()) { + if (!q.isEmpty() && q.poll() == null) { + fail.value++; + } + } + } + }); + + t1.start(); + t2.start(); + Thread.sleep(1000); + stop.set(true); + t1.join(); + t2.join(); + assertEquals("Observed no element in non-empty queue", 0, fail.value); + + } + + public static Object[] makeQueue(int producers, int consumers, int capacity, Ordering ordering, + Queue q) { + ConcurrentQueueSpec spec = + new ConcurrentQueueSpec(producers, consumers, capacity, ordering, Preference.NONE); + if (q == null) { + q = QueueFactory.newQueue(spec); + } + return new Object[] {spec, q}; + } + + public static Object[] makeAtomic(int producers, int consumers, int capacity, Ordering ordering, + Queue q) { + ConcurrentQueueSpec spec = + new ConcurrentQueueSpec(producers, consumers, capacity, ordering, Preference.NONE); + if (q == null) { + q = AtomicQueueFactory.newQueue(spec); + } + return new Object[] {spec, q}; + } + + public static Matcher> emptyAndZeroSize() { + return allOf(hasSize(0), empty()); + } +} diff --git a/guava/src/test/java/com/google/common/cache/PopulatedCachesTest.java b/guava/src/test/java/com/google/common/cache/PopulatedCachesTest.java index 939c2796b4..86377fd291 100644 --- a/guava/src/test/java/com/google/common/cache/PopulatedCachesTest.java +++ b/guava/src/test/java/com/google/common/cache/PopulatedCachesTest.java @@ -27,8 +27,6 @@ import java.util.Map.Entry; import java.util.Set; -import junit.framework.TestCase; - import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.guava.CaffeinatedGuava; import com.google.common.base.Function; @@ -42,6 +40,8 @@ import com.google.common.collect.Maps; import com.google.common.testing.EqualsTester; +import junit.framework.TestCase; + /** * {@link LoadingCache} tests that deal with caches that actually contain some key-value mappings. * @@ -279,15 +279,19 @@ public void testWriteThroughEntry() { for (LoadingCache cache : caches()) { cache.getUnchecked(1); Entry entry = Iterables.getOnlyElement(cache.asMap().entrySet()); - try { - entry.setValue(3); - fail("expected entry.setValue to throw UnsupportedOperationException"); - } catch (UnsupportedOperationException expected) { - } + + cache.invalidate(1); + assertEquals(0, cache.size()); + + entry.setValue(3); + assertEquals(1, cache.size()); + assertEquals(3, cache.getIfPresent(1)); + checkValidState(cache); + try { entry.setValue(null); - fail("expected entry.setValue(null) to throw UnsupportedOperationException"); - } catch (UnsupportedOperationException expected) { + fail(); + } catch (NullPointerException expected) { } checkValidState(cache); }