From a419a4101d5e7951b72b7cf8691c50ec65702a13 Mon Sep 17 00:00:00 2001 From: Ben Manes Date: Sun, 29 Mar 2015 21:06:23 -0700 Subject: [PATCH] Dynamically size the read buffer based on contention Previously the read buffer consisted of a fixed number of segments, each consisting of a bounded ring buffer. This provided performance under heavy load, at the cost of high memory when the cache was created. For caches that are not heavily contended this is wasteful, especially when many are constructed. This fixed cost is replaced with a dynamic approach, which starts at a single buffer and expands as needed. The upper limit results in the same number of read buffers, but only under high load. This dramatically reduces memory usage. The approach is based on j.u.c.Striped64, which provides the mechanism for Java's high performance 64-bit atomic counters. --- build.gradle | 6 - .../caffeine/cache/ReadBufferBenchmark.java | 4 +- .../caffeine/cache/BoundedBuffer.java | 227 +++++--------- .../caffeine/cache/BoundedLocalCache.java | 39 ++- .../benmanes/caffeine/cache/Buffer.java | 93 ++++++ .../caffeine/cache/StripedBuffer.java | 286 ++++++++++++++++++ .../caffeine/cache/BoundedBufferTest.java | 12 +- .../caffeine/cache/BoundedLocalCacheTest.java | 16 +- .../cache/IsValidBoundedLocalCache.java | 2 +- .../caffeine/cache/StripedBufferTest.java | 114 +++++++ .../caffeine/cache/buffer/BufferTest.java | 10 +- .../caffeine/cache/buffer/BufferType.java | 6 +- .../caffeine/cache/buffer/FastFlowBuffer.java | 2 +- .../cache/buffer/ManyToOneBuffer.java | 2 +- .../cache/buffer/MpmcArrayBuffer.java | 2 +- .../cache/buffer/MpscArrayBuffer.java | 2 +- .../cache/buffer/MpscCompoundBuffer.java | 2 +- .../buffer/{Buffer.java => ReadBuffer.java} | 2 +- .../caffeine/cache/buffer/TicketBuffer.java | 2 +- gradle/dependencies.gradle | 4 +- 20 files changed, 623 insertions(+), 210 deletions(-) create mode 100644 caffeine/src/main/java/com/github/benmanes/caffeine/cache/Buffer.java create mode 100644 caffeine/src/main/java/com/github/benmanes/caffeine/cache/StripedBuffer.java create mode 100644 caffeine/src/test/java/com/github/benmanes/caffeine/cache/StripedBufferTest.java rename caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/{Buffer.java => ReadBuffer.java} (97%) diff --git a/build.gradle b/build.gradle index 03da078923..4cb9258d28 100644 --- a/build.gradle +++ b/build.gradle @@ -43,12 +43,6 @@ subprojects { proj -> sourceCompatibility = JavaVersion.VERSION_1_8 - tasks.withType(JavaCompile) { - if (!System.env.'CI') { - options.incremental = !rootProject.hasProperty('release') - } - } - group = 'com.github.ben-manes.caffeine' version.with { major = 1 // incompatible API changes diff --git a/caffeine/src/jmh/java/com/github/benmanes/caffeine/cache/ReadBufferBenchmark.java b/caffeine/src/jmh/java/com/github/benmanes/caffeine/cache/ReadBufferBenchmark.java index 65fd93e3b9..2668a3ecbb 100644 --- a/caffeine/src/jmh/java/com/github/benmanes/caffeine/cache/ReadBufferBenchmark.java +++ b/caffeine/src/jmh/java/com/github/benmanes/caffeine/cache/ReadBufferBenchmark.java @@ -23,7 +23,7 @@ import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; -import com.github.benmanes.caffeine.cache.buffer.Buffer; +import com.github.benmanes.caffeine.cache.buffer.ReadBuffer; import com.github.benmanes.caffeine.cache.buffer.BufferType; /** @@ -46,7 +46,7 @@ public class ReadBufferBenchmark { @Param BufferType bufferType; - Buffer buffer; + ReadBuffer buffer; @Setup public void setup() { diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedBuffer.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedBuffer.java index 211e458e76..2f9491a278 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedBuffer.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedBuffer.java @@ -20,191 +20,110 @@ import java.util.function.Consumer; /** - * A multiple-producer / single-consumer bounded buffer that rejects new elements if it is full or - * fails spuriously due to contention. Unlike a queue and stack, a buffer does not guarantee an - * ordering of elements in either FIFO or LIFO order. - *

- * Beware that it is the responsibility of the caller to ensure that a consumer has exclusive read - * access to the buffer. This implementation does not include fail-fast behavior to guard - * against incorrect consumer usage. + * A striped, non-blocking, bounded buffer. * * @param the type of elements maintained by this buffer * @author ben.manes@gmail.com (Ben Manes) */ -final class BoundedBuffer { - +final class BoundedBuffer extends StripedBuffer { /* - * A segmented, non-blocking, circular ring buffer is used to store the elements being transfered - * by the producers to the consumer. The monotonically increasing count of reads and writes allow - * indexing sequentially to the next element location. The arrays use power-of-two sizing for - * quickly determining to the proper location. + * A circular ring buffer stores the elements being transfered by the producers to the consumer. + * The monotonically increasing count of reads and writes allow indexing sequentially to the next + * element location based upon a power-of-two sizing. * * The producers race to read the counts, check if there is available capacity, and if so then try * once to CAS to the next write count. If the increment is successful then the producer lazily - * publishes the next element. The producer does not retry or block when unsuccessful due to a - * failed CAS or the buffer being full. + * publishes the element. The producer does not retry or block when unsuccessful due to a failed + * CAS or the buffer being full. * * The consumer reads the counts and takes the available elements. The clearing of the elements * and the next read count are lazily set. * - * To further increase concurrency the buffer is internally segmented into multiple ring buffers. - * The number of segments is determined as a size that minimize contention that may cause spurious - * failures for producers. The segment is chosen by a hash of the thread's id. + * This implementation is striped to further increase concurrency by rehashing and dynamically + * adding new buffers when contention is detected, up to an internal maximum. When rehashing in + * order to discover an available buffer, the producer may retry adding its element to determine + * whether it found a satisfactory buffer or if resizing is necessary. */ - /** The number of CPUs */ - static final int NCPU = Runtime.getRuntime().availableProcessors(); - - /** The number of read buffers to use. */ - static final int NUMBER_OF_SEGMENTS = 4 * ceilingNextPowerOfTwo(NCPU); - - /** Mask value for indexing into the read buffers. */ - static final int SEGMENT_MASK = NUMBER_OF_SEGMENTS - 1; - /** The maximum number of pending reads per buffer. */ - static final int RING_BUFFER_SIZE = 32; + static final int BUFFER_SIZE = 32; /** Mask value for indexing into the read buffer. */ - static final int RING_BUFFER_MASK = RING_BUFFER_SIZE - 1; + static final int BUFFER_MASK = BUFFER_SIZE - 1; - static int ceilingNextPowerOfTwo(int x) { - // From Hacker's Delight, Chapter 3, Harry S. Warren Jr. - return 1 << (Integer.SIZE - Integer.numberOfLeadingZeros(x - 1)); + @Override + protected Buffer create(E e) { + return new RingBuffer<>(e); } - final AtomicLong[] readCount; - final AtomicLong[] writeCount; - final AtomicReference[][] table; - - @SuppressWarnings({"unchecked", "cast", "rawtypes"}) - public BoundedBuffer() { - readCount = new AtomicLong[NUMBER_OF_SEGMENTS]; - writeCount = new AtomicLong[NUMBER_OF_SEGMENTS]; - table = new AtomicReference[NUMBER_OF_SEGMENTS][RING_BUFFER_SIZE]; - for (int i = 0; i < NUMBER_OF_SEGMENTS; i++) { - table[i] = new AtomicReference[RING_BUFFER_SIZE]; - for (int j = 0; j < RING_BUFFER_SIZE; j++) { - table[i][j] = new AtomicReference<>(); + static final class RingBuffer implements Buffer { + final AtomicLong readCounter; + final AtomicLong writeCounter; + final AtomicReference[] buffer; + + @SuppressWarnings({"unchecked", "cast", "rawtypes"}) + public RingBuffer(E e) { + readCounter = new AtomicLong(); + writeCounter = new AtomicLong(1); + buffer = new AtomicReference[BUFFER_SIZE]; + for (int i = 0; i < BUFFER_SIZE; i++) { + buffer[i] = new AtomicReference<>(); } - readCount[i] = new AtomicLong(); - writeCount[i] = new AtomicLong(); + buffer[0].lazySet(e); } - } - - /** - * Inserts the specified element into this buffer if it is possible to do so immediately without - * violating capacity restrictions. The addition is allowed to fail spuriously if multiple - * threads insert concurrently. - * - * @param e the element to add - * @return {@code true} if the element was or could have been added; {@code false} if full - */ - public boolean submit(E e) { - final int segmentIndex = segmentIndex(); - final AtomicLong readCounter = readCount[segmentIndex]; - final AtomicLong writeCounter = writeCount[segmentIndex]; - long head = readCounter.get(); - long tail = writeCounter.get(); - long size = (tail - head); - if (size >= RING_BUFFER_SIZE) { - return false; - } - if (writeCounter.compareAndSet(tail, tail + 1)) { - int index = (int) (tail & RING_BUFFER_MASK); - table[segmentIndex][index].lazySet(e); + @Override + public int offer(E e) { + long head = readCounter.get(); + long tail = writeCounter.get(); + long size = (tail - head); + if (size >= BUFFER_SIZE) { + return Buffer.FULL; + } + if (writeCounter.compareAndSet(tail, tail + 1)) { + int index = (int) (tail & BUFFER_MASK); + buffer[index].lazySet(e); + return Buffer.SUCCESS; + } + return Buffer.FAILED; } - return true; - } - /** - * Drains the buffer, sending each element to the consumer for processing. The caller must ensure - * that a consumer has exclusive read access to the buffer. - * - * @param consumer the action to perform on each element - */ - public void drain(Consumer consumer) { - final int start = segmentIndex(); - final int end = start + NUMBER_OF_SEGMENTS; - for (int i = start; i < end; i++) { - drainSegment(consumer, i & SEGMENT_MASK); + @Override + public void drain(Consumer consumer) { + long head = readCounter.get(); + long tail = writeCounter.get(); + long size = (tail - head); + if (size == 0) { + return; + } + do { + int index = (int) (head & BUFFER_MASK); + AtomicReference slot = buffer[index]; + E e = slot.get(); + if (e == null) { + // not published yet + break; + } + slot.lazySet(null); + consumer.accept(e); + head++; + } while (head != tail); + readCounter.lazySet(head); } - } - /** - * Drains an segment. - * - * @param consumer the action to perform on each element - * @param segmentIndex the segment index in the table - */ - private void drainSegment(Consumer consumer, int segmentIndex) { - final AtomicLong readCounter = readCount[segmentIndex]; - final AtomicLong writeCounter = writeCount[segmentIndex]; - - long head = readCounter.get(); - long tail = writeCounter.get(); - long size = (tail - head); - if (size == 0) { - return; + @Override + public int size() { + return writes() - reads(); } - do { - int index = (int) (head & RING_BUFFER_MASK); - AtomicReference slot = table[segmentIndex][index]; - E e = slot.get(); - if (e == null) { - // not published yet - break; - } - slot.lazySet(null); - consumer.accept(e); - head++; - } while (head != tail); - readCounter.lazySet(head); - } - - /** - * Returns the number of elements residing in the buffer. - * - * @return the number of elements in this buffer - */ - public int size() { - return writes() - reads(); - } - /** - * Returns the number of elements that have been written to the buffer. - * - * @return the number of elements written to this buffer - */ - public int writes() { - int writes = 0; - for (AtomicLong counter : writeCount) { - writes += counter.intValue(); + @Override + public int reads() { + return readCounter.intValue(); } - return writes; - } - /** - * Returns the number of elements that have been read from the buffer. - * - * @return the number of elements read from this buffer - */ - public int reads() { - int reads = 0; - for (AtomicLong counter : readCount) { - reads += counter.intValue(); + @Override + public int writes() { + return writeCounter.intValue(); } - return reads; - } - - /** - * Returns the index to the ring buffer to record into. Uses a one-step FNV-1a hash code - * (http://www.isthe.com/chongo/tech/comp/fnv) based on the current thread's id. These hash codes - * have more uniform distribution properties with respect to small moduli (here 1-31) than do - * other simple hashing functions. - */ - static int segmentIndex() { - int id = (int) Thread.currentThread().getId(); - return ((id ^ 0x811c9dc5) * 0x01000193) & SEGMENT_MASK; } } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index b832b4f6fd..d62fee8b78 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -129,7 +129,7 @@ abstract class BoundedLocalCache extends AbstractMap implements Loca // The policy management final AtomicReference drainStatus; - final BoundedBuffer> readBuffer; + final Buffer> readBuffer; final NonReentrantLock evictionLock; final Weigher weigher; final boolean isAsync; @@ -143,7 +143,6 @@ abstract class BoundedLocalCache extends AbstractMap implements Loca protected BoundedLocalCache(Caffeine builder, @Nullable CacheLoader loader, boolean isAsync) { this.isAsync = isAsync; - readBuffer = new BoundedBuffer<>(); weigher = builder.getWeigher(isAsync); evictionLock = new NonReentrantLock(); id = tracer().register(builder.name()); @@ -153,6 +152,9 @@ protected BoundedLocalCache(Caffeine builder, builder.isStrongValues(), builder.isWeakValues(), builder.isSoftValues(), builder.expiresAfterAccess(), builder.expiresAfterWrite(), builder.refreshes(), builder.evicts(), (isAsync && builder.evicts()) || builder.isWeighted()); + readBuffer = evicts() || collectKeys() || collectValues() || expiresAfterAccess() + ? new BoundedBuffer<>() + : Buffer.disabled(); } /** Returns if the node's value is currently being computed, asynchronously. */ @@ -469,23 +471,28 @@ void afterRead(Node node, boolean recordHit) { long now = ticker().read(); node.setAccessTime(now); - boolean delayable = readBuffer.submit(node); + boolean delayable = (readBuffer.offer(node) != Buffer.FULL); drainOnReadIfNeeded(delayable); + refreshIfNeeded(node, now); + } - if (refreshAfterWrite()) { - long writeTime = node.getWriteTime(); - if (((now - writeTime) > refreshAfterWriteNanos()) && node.casWriteTime(writeTime, now)) { - executor().execute(() -> { - K key = node.getKey(); - if ((key != null) && node.isAlive()) { - try { - computeIfPresent(key, cacheLoader()::reload); - } catch (Throwable t) { - logger.log(Level.WARNING, "Exception thrown during reload", t); - } + /** Asynchronously refreshes the entry if eligible. */ + void refreshIfNeeded(Node node, long now) { + if (!refreshAfterWrite()) { + return; + } + long writeTime = node.getWriteTime(); + if (((now - writeTime) > refreshAfterWriteNanos()) && node.casWriteTime(writeTime, now)) { + executor().execute(() -> { + K key = node.getKey(); + if ((key != null) && node.isAlive()) { + try { + computeIfPresent(key, cacheLoader()::reload); + } catch (Throwable t) { + logger.log(Level.WARNING, "Exception thrown during reload", t); } - }); - } + } + }); } } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Buffer.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Buffer.java new file mode 100644 index 0000000000..21d88cdde1 --- /dev/null +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Buffer.java @@ -0,0 +1,93 @@ +/* + * Copyright 2015 Ben Manes. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache; + +import java.util.function.Consumer; + +import javax.annotation.Nonnull; + +/** + * A multiple-producer / single-consumer buffer that rejects new elements if it is full or + * fails spuriously due to contention. Unlike a queue and stack, a buffer does not guarantee an + * ordering of elements in either FIFO or LIFO order. + *

+ * Beware that it is the responsibility of the caller to ensure that a consumer has exclusive read + * access to the buffer. This implementation does not include fail-fast behavior to guard + * against incorrect consumer usage. + * + * @param the type of elements maintained by this buffer + * @author ben.manes@gmail.com (Ben Manes) + */ +interface Buffer { + static final int FULL = 1; + static final int SUCCESS = 0; + static final int FAILED = -1; + + /** Returns a no-op implementation. */ + @SuppressWarnings("unchecked") + static Buffer disabled() { + return (Buffer) DisabledBuffer.INSTANCE; + } + + /** + * Inserts the specified element into this buffer if it is possible to do so immediately without + * violating capacity restrictions. The addition is allowed to fail spuriously if multiple + * threads insert concurrently. + * + * @param e the element to add + * @return {@code 1} if the buffer is full, {@code -1} if the CAS failed, or {@code 0} if added + */ + int offer(@Nonnull E e); + + /** + * Drains the buffer, sending each element to the consumer for processing. The caller must ensure + * that a consumer has exclusive read access to the buffer. + * + * @param consumer the action to perform on each element + */ + void drain(@Nonnull Consumer consumer); + + /** + * Returns the number of elements residing in the buffer. + * + * @return the number of elements in this buffer + */ + int size(); + + /** + * Returns the number of elements that have been read from the buffer. + * + * @return the number of elements read from this buffer + */ + public int reads(); + + /** + * Returns the number of elements that have been written to the buffer. + * + * @return the number of elements written to this buffer + */ + public int writes(); +} + +enum DisabledBuffer implements Buffer { + INSTANCE; + + @Override public int offer(Object e) { return Buffer.SUCCESS; } + @Override public void drain(Consumer consumer) {} + @Override public int size() { return 0; } + @Override public int reads() { return 0; } + @Override public int writes() { return 0; } +} diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/StripedBuffer.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/StripedBuffer.java new file mode 100644 index 0000000000..ba6d3c2de5 --- /dev/null +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/StripedBuffer.java @@ -0,0 +1,286 @@ +/* + * Copyright 2015 Ben Manes. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; + +import com.github.benmanes.caffeine.base.UnsafeAccess; + +/** + * A base class providing the mechanics for supporting dynamic striping of bounded buffers. This + * implementation is an adaption of the numeric 64-bit {@link java.util.concurrent.atomic.Striped64} + * class, which is used to by atomic counters. The approach was modified to lazily grow an array of + * buffers in order to minimize memory usage for caches that are not heavily contended on. + * + * @author ben.manes@gmail.com (Ben Manes) + */ +abstract class StripedBuffer implements Buffer { + /* + * This class maintains a lazily-initialized table of atomically updated buffers. The table size + * is a power of two. Indexing uses masked per-thread hash codes. Nearly all declarations in this + * class are package-private, accessed directly by subclasses. + * + * Table entries are of class Buffer and should be padded to reduce cache contention. Padding is + * overkill for most atomics because they are usually irregularly scattered in memory and thus + * don't interfere much with each other. But Atomic objects residing in arrays will tend to be + * placed adjacent to each other, and so will most often share cache lines (with a huge negative + * performance impact) without this precaution. + * + * In part because Buffers are relatively large, we avoid creating them until they are needed. + * When there is no contention, all updates are made to a single buffer. Upon contention (a failed + * CAS inserting into the buffer), the table is expanded to size 2. The table size is doubled upon + * further contention until reaching the nearest power of two greater than or equal to the number + * of CPUS. Table slots remain empty (null) until they are needed. + * + * A single spinlock ("tableBusy") is used for initializing and resizing the table, as well as + * populating slots with new Buffers. There is no need for a blocking lock; when the lock is not + * available, threads try other slots. During these retries, there is increased contention and + * reduced locality, which is still better than alternatives. + * + * The Thread probe fields maintained via ThreadLocalRandom serve as per-thread hash codes. We let + * them remain uninitialized as zero (if they come in this way) until they contend at slot 0. They + * are then initialized to values that typically do not often conflict with others. Contention + * and/or table collisions are indicated by failed CASes when performing an update operation. Upon + * a collision, if the table size is less than the capacity, it is doubled in size unless some + * other thread holds the lock. If a hashed slot is empty, and lock is available, a new Buffer is + * created. Otherwise, if the slot exists, a CAS is tried. Retries proceed by "double hashing", + * using a secondary hash (Marsaglia XorShift) to try to find a free slot. + * + * The table size is capped because, when there are more threads than CPUs, supposing that each + * thread were bound to a CPU, there would exist a perfect hash function mapping threads to slots + * that eliminates collisions. When we reach capacity, we search for this mapping by randomly + * varying the hash codes of colliding threads. Because search is random, and collisions only + * become known via CAS failures, convergence can be slow, and because threads are typically not + * bound to CPUS forever, may not occur at all. However, despite these limitations, observed + * contention rates are typically low in these cases. + * + * It is possible for a Buffer to become unused when threads that once hashed to it terminate, as + * well as in the case where doubling the table causes no thread to hash to it under expanded + * mask. We do not try to detect or remove buffers, under the assumption that for long-running + * instances, observed contention levels will recur, so the buffers will eventually be needed + * again; and for short-lived ones, it does not matter. + */ + + static final long TABLE_BUSY = UnsafeAccess.objectFieldOffset(StripedBuffer.class, "tableBusy"); + static final long PROBE = UnsafeAccess.objectFieldOffset(Thread.class, "threadLocalRandomProbe"); + + /** Number of CPUS. */ + static final int NCPU = Runtime.getRuntime().availableProcessors(); + + /** The bound on the table size. */ + static final int MAXIMUM_TABLE_SIZE = 4 * ceilingNextPowerOfTwo(NCPU); + + /** The maximum number of attempts when trying to expand the table. */ + static final int ATTEMPTS = 3; + + /** Table of buffers. When non-null, size is a power of 2. */ + transient volatile Buffer[] table; + + /** Spinlock (locked via CAS) used when resizing and/or creating Buffers. */ + transient volatile int tableBusy; + + /** CASes the tableBusy field from 0 to 1 to acquire lock. */ + final boolean casTableBusy() { + return UnsafeAccess.UNSAFE.compareAndSwapInt(this, TABLE_BUSY, 0, 1); + } + + /** + * Returns the probe value for the current thread. Duplicated from ThreadLocalRandom because of + * packaging restrictions. + */ + static final int getProbe() { + return UnsafeAccess.UNSAFE.getInt(Thread.currentThread(), PROBE); + } + + /** + * Pseudo-randomly advances and records the given probe value for the given thread. Duplicated + * from ThreadLocalRandom because of packaging restrictions. + */ + static final int advanceProbe(int probe) { + probe ^= probe << 13; // xorshift + probe ^= probe >>> 17; + probe ^= probe << 5; + UnsafeAccess.UNSAFE.putInt(Thread.currentThread(), PROBE, probe); + return probe; + } + + /** Returns the closest power-of-two at or higher than the given value. */ + static int ceilingNextPowerOfTwo(int x) { + // From Hacker's Delight, Chapter 3, Harry S. Warren Jr. + return 1 << (Integer.SIZE - Integer.numberOfLeadingZeros(x - 1)); + } + + /** + * Creates a new buffer instance after resizing to accommodate a producer. + * + * @param e the producer's element + * @return a newly created buffer populated with a single element + */ + protected abstract Buffer create(E e); + + @Override + public int offer(E e) { + int mask; + int result = 0; + Buffer buffer; + boolean uncontended = true; + Buffer[] buffers = table; + if ((buffers == null) + || (mask = buffers.length - 1) < 0 + || (buffer = buffers[getProbe() & mask]) == null + || !(uncontended = ((result = buffer.offer(e)) != Buffer.FAILED))) { + expandOrRetry(e, uncontended); + } + return result; + } + + @Override + public void drain(Consumer consumer) { + Buffer[] buffers = table; + if (buffers == null) { + return; + } + for (Buffer buffer : buffers) { + if (buffer != null) { + buffer.drain(consumer); + } + } + } + + @Override + public int size() { + return writes() - reads(); + } + + @Override + public int reads() { + Buffer[] buffers = table; + if (buffers == null) { + return 0; + } + int reads = 0; + for (Buffer buffer : buffers) { + if (buffer != null) { + reads += buffer.reads(); + } + } + return reads; + } + + @Override + public int writes() { + Buffer[] buffers = table; + if (buffers == null) { + return 0; + } + int writes = 0; + for (Buffer buffer : buffers) { + if (buffer != null) { + writes += buffer.writes(); + } + } + return writes; + } + + /** + * Handles cases of updates involving initialization, resizing, creating new Buffers, and/or + * contention. See above for explanation. This method suffers the usual non-modularity problems of + * optimistic retry code, relying on rechecked sets of reads. + * + * @param e the element to add + * @param wasUncontended false if CAS failed before call + */ + final void expandOrRetry(E e, boolean wasUncontended) { + int h; + if ((h = getProbe()) == 0) { + ThreadLocalRandom.current(); // force initialization + h = getProbe(); + wasUncontended = true; + } + boolean collide = false; // True if last slot nonempty + for (int attempt = 0; attempt < ATTEMPTS; attempt++) { + Buffer[] buffers; + Buffer buffer; + int n; + if ((buffers = table) != null && (n = buffers.length) > 0) { + if ((buffer = buffers[(n - 1) & h]) == null) { + if (tableBusy == 0) { // Try to attach new Buffer + if (tableBusy == 0 && casTableBusy()) { + boolean created = false; + try { // Recheck under lock + Buffer[] rs; + int mask, j; + if (((rs = table) != null) && ((mask = rs.length) > 0) + && (rs[j = (mask - 1) & h] == null)) { + rs[j] = create(e); + created = true; + } + } finally { + tableBusy = 0; + } + if (created) { + break; + } + continue; // Slot is now non-empty + } + } + collide = false; + } else if (!wasUncontended) { // CAS already known to fail + wasUncontended = true; // Continue after rehash + } else if (buffer.offer(e) != Buffer.FAILED) { + break; + } else if (n >= MAXIMUM_TABLE_SIZE || table != buffers) { + collide = false; // At max size or stale + } else if (!collide) { + collide = true; + } else if (tableBusy == 0 && casTableBusy()) { + try { + if (table == buffers) { // Expand table unless stale + @SuppressWarnings({"unchecked", "rawtypes"}) + Buffer[] rs = new Buffer[n << 1]; + for (int i = 0; i < n; ++i) { + rs[i] = buffers[i]; + } + table = rs; + } + } finally { + tableBusy = 0; + } + collide = false; + continue; // Retry with expanded table + } + h = advanceProbe(h); + } else if ((tableBusy == 0) && (table == buffers) && casTableBusy()) { + boolean init = false; + try { // Initialize table + if (table == buffers) { + @SuppressWarnings({"unchecked", "rawtypes"}) + Buffer[] rs = new Buffer[1]; + rs[0] = create(e); + table = rs; + init = true; + } + } finally { + tableBusy = 0; + } + if (init) { + break; + } + } + } + } +} diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedBufferTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedBufferTest.java index de4fb52452..dbce777f20 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedBufferTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedBufferTest.java @@ -42,10 +42,10 @@ public Object[][] buffer() { } @Test(dataProvider = "buffer") - public void submit(BoundedBuffer buffer) { + public void offer(BoundedBuffer buffer) { ConcurrentTestHarness.timeTasks(10, () -> { for (int i = 0; i < 100; i++) { - buffer.submit(DUMMY); + buffer.offer(DUMMY); } }); assertThat(buffer.writes(), is(greaterThan(0))); @@ -54,8 +54,8 @@ public void submit(BoundedBuffer buffer) { @Test(dataProvider = "buffer") public void drain(BoundedBuffer buffer) { - for (int i = 0; i < BoundedBuffer.RING_BUFFER_SIZE; i++) { - buffer.submit(DUMMY); + for (int i = 0; i < BoundedBuffer.BUFFER_SIZE; i++) { + buffer.offer(DUMMY); } int[] read = new int[1]; buffer.drain(e -> read[0]++); @@ -64,12 +64,12 @@ public void drain(BoundedBuffer buffer) { } @Test(dataProvider = "buffer") - public void submitAndDrain(BoundedBuffer buffer) { + public void offerAndDrain(BoundedBuffer buffer) { Lock lock = new ReentrantLock(); AtomicInteger reads = new AtomicInteger(); ConcurrentTestHarness.timeTasks(10, () -> { for (int i = 0; i < 1000; i++) { - boolean shouldDrain = !buffer.submit(DUMMY); + boolean shouldDrain = (buffer.offer(DUMMY) == Buffer.FULL); if (shouldDrain && lock.tryLock()) { buffer.drain(e -> reads.incrementAndGet()); lock.unlock(); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java index 82cef0b8bd..8bfb7be3ce 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java @@ -253,14 +253,14 @@ public void exceedsMaximumBufferSize_onRead(Cache cache) { BoundedLocalCache localCache = asBoundedLocalCache(cache); Node dummy = localCache.nodeFactory.newNode(null, null, null, 1, 0); - BoundedBuffer> buffer = localCache.readBuffer; - for (int i = 0; i < BoundedBuffer.RING_BUFFER_SIZE; i++) { - buffer.submit(dummy); + Buffer> buffer = localCache.readBuffer; + for (int i = 0; i < BoundedBuffer.BUFFER_SIZE; i++) { + buffer.offer(dummy); } - assertThat(buffer.submit(dummy), is(false)); + assertThat(buffer.offer(dummy), is(Buffer.FULL)); localCache.afterRead(dummy, true); - assertThat(buffer.submit(dummy), is(true)); + assertThat(buffer.offer(dummy), is(not(Buffer.FULL))); } @Test(dataProvider = "caches") @@ -283,14 +283,14 @@ public void exceedsMaximumBufferSize_onWrite(Cache cache) { public void drain_onRead(Cache cache, CacheContext context) { BoundedLocalCache localCache = asBoundedLocalCache(cache); - BoundedBuffer> buffer = localCache.readBuffer; - for (int i = 0; i < BoundedBuffer.RING_BUFFER_SIZE; i++) { + Buffer> buffer = localCache.readBuffer; + for (int i = 0; i < BoundedBuffer.BUFFER_SIZE; i++) { localCache.get(context.firstKey()); } int pending = buffer.size(); assertThat(buffer.writes(), is(equalTo(pending))); - assertThat(pending, is(BoundedBuffer.RING_BUFFER_SIZE)); + assertThat(pending, is(BoundedBuffer.BUFFER_SIZE)); localCache.get(context.firstKey()); assertThat(buffer.size(), is(0)); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/IsValidBoundedLocalCache.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/IsValidBoundedLocalCache.java index 3afce00d2f..5a88fb2a03 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/IsValidBoundedLocalCache.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/IsValidBoundedLocalCache.java @@ -72,7 +72,7 @@ private void checkReadBuffer(BoundedLocalCache cache) { if (!cache.evicts() && !cache.expiresAfterAccess()) { return; } - BoundedBuffer buffer = cache.readBuffer; + Buffer buffer = cache.readBuffer; desc.expectThat("buffer is empty", buffer.size(), is(0)); desc.expectThat("buffer reads = writes", buffer.reads(), is(buffer.writes())); } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/StripedBufferTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/StripedBufferTest.java new file mode 100644 index 0000000000..9eaefefaf4 --- /dev/null +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/StripedBufferTest.java @@ -0,0 +1,114 @@ +/* + * Copyright 2015 Ben Manes. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache; + +import static java.util.Objects.requireNonNull; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; + +import java.util.function.Consumer; + +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import com.github.benmanes.caffeine.ConcurrentTestHarness; +import com.google.common.base.MoreObjects; + +/** + * @author ben.manes@gmail.com (Ben Manes) + */ +public final class StripedBufferTest { + static final Integer ELEMENT = 1; + + @Test(dataProvider = "buffers") + public void init(FakeBuffer buffer) { + assertThat(buffer.table, is(nullValue())); + + buffer.offer(ELEMENT); + assertThat(buffer.table.length, is(1)); + } + + @Test(dataProvider = "buffers") + public void produce(FakeBuffer buffer) { + ConcurrentTestHarness.timeTasks(10, () -> { + for (int i = 0; i < 10; i++) { + buffer.offer(ELEMENT); + Thread.yield(); + } + }); + assertThat(buffer.table.length, lessThanOrEqualTo(StripedBuffer.MAXIMUM_TABLE_SIZE)); + } + + @Test(dataProvider = "buffers") + public void drain(FakeBuffer buffer) { + buffer.drain(e -> {}); + assertThat(buffer.drains, is(0)); + + // Expand and drain + buffer.offer(ELEMENT); + buffer.drain(e -> {}); + assertThat(buffer.drains, is(1)); + } + + @DataProvider + public Object[][] buffers() { + return new Object[][] { + { new FakeBuffer(Buffer.FULL) }, + { new FakeBuffer(Buffer.FAILED) }, + { new FakeBuffer(Buffer.SUCCESS) }, + }; + } + + static int ceilingNextPowerOfTwo(int x) { + return 1 << (Integer.SIZE - Integer.numberOfLeadingZeros(x - 1)); + } + + static final class FakeBuffer extends StripedBuffer { + final int result; + int drains = 0; + + FakeBuffer(int result) { + this.result = requireNonNull(result); + } + + @Override protected Buffer create(E e) { + return new Buffer() { + @Override public int offer(E e) { + return result; + } + @Override public void drain(Consumer consumer) { + drains++; + } + @Override public int size() { + return 0; + } + @Override public int reads() { + return 0; + } + @Override public int writes() { + return 0; + } + }; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).addValue(result).toString(); + } + } +} diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/BufferTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/BufferTest.java index 06a8d8310c..6f707cad39 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/BufferTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/BufferTest.java @@ -42,7 +42,7 @@ public Iterator buffers() { } @Test(dataProvider = "buffers") - public void record(Buffer buffer) { + public void record(ReadBuffer buffer) { ConcurrentTestHarness.timeTasks(100, () -> { for (int i = 0; i < 1000; i++) { buffer.record(); @@ -50,12 +50,12 @@ public void record(Buffer buffer) { } }); long recorded = buffer.recorded(); - assertThat(recorded, is((long) Buffer.MAX_SIZE)); + assertThat(recorded, is((long) ReadBuffer.MAX_SIZE)); } @Test(dataProvider = "buffers") - public void drain(Buffer buffer) { - for (int i = 0; i < 2 * Buffer.MAX_SIZE; i++) { + public void drain(ReadBuffer buffer) { + for (int i = 0; i < 2 * ReadBuffer.MAX_SIZE; i++) { buffer.record(); } buffer.drain(); @@ -65,7 +65,7 @@ public void drain(Buffer buffer) { } @Test(dataProvider = "buffers") - public void recordAndDrain(Buffer buffer) { + public void recordAndDrain(ReadBuffer buffer) { ConcurrentTestHarness.timeTasks(100, () -> { for (int i = 0; i < 1000; i++) { boolean shouldDrain = buffer.record(); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/BufferType.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/BufferType.java index 2848727fb5..2e55b4efd4 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/BufferType.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/BufferType.java @@ -30,14 +30,14 @@ public enum BufferType { MpmcArray(MpmcArrayBuffer::new), MpscCompound(MpscCompoundBuffer::new); - private final Supplier factory; + private final Supplier factory; - private BufferType(Supplier factory) { + private BufferType(Supplier factory) { this.factory = factory; } /** Returns a new buffer instance. */ - public Buffer create() { + public ReadBuffer create() { return factory.get(); } } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/FastFlowBuffer.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/FastFlowBuffer.java index 4acdf088cc..e428e036a2 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/FastFlowBuffer.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/FastFlowBuffer.java @@ -33,7 +33,7 @@ * * @author ben.manes@gmail.com (Ben Manes) */ -final class FastFlowBuffer implements Buffer { +final class FastFlowBuffer implements ReadBuffer { final Lock evictionLock; final AtomicLong readCache; diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/ManyToOneBuffer.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/ManyToOneBuffer.java index 4b8eb0552d..d57a0fd663 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/ManyToOneBuffer.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/ManyToOneBuffer.java @@ -29,7 +29,7 @@ * * @author ben.manes@gmail.com (Ben Manes) */ -final class ManyToOneBuffer implements Buffer { +final class ManyToOneBuffer implements ReadBuffer { final Lock evictionLock; final AtomicLong readCounter; final AtomicLong writeCounter; diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/MpmcArrayBuffer.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/MpmcArrayBuffer.java index 8117c3fc4d..3f84ecb88d 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/MpmcArrayBuffer.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/MpmcArrayBuffer.java @@ -24,7 +24,7 @@ /** * @author ben.manes@gmail.com (Ben Manes) */ -final class MpmcArrayBuffer implements Buffer { +final class MpmcArrayBuffer implements ReadBuffer { final MpmcArrayQueue queue; final Lock evictionLock; long drained; diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/MpscArrayBuffer.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/MpscArrayBuffer.java index 8ead9a3ee1..c868c71fde 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/MpscArrayBuffer.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/MpscArrayBuffer.java @@ -24,7 +24,7 @@ /** * @author ben.manes@gmail.com (Ben Manes) */ -final class MpscArrayBuffer implements Buffer { +final class MpscArrayBuffer implements ReadBuffer { final MpscArrayQueue queue; final Lock evictionLock; long drained; diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/MpscCompoundBuffer.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/MpscCompoundBuffer.java index e3f29c5a46..f5e0a1ce49 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/MpscCompoundBuffer.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/MpscCompoundBuffer.java @@ -24,7 +24,7 @@ /** * @author ben.manes@gmail.com (Ben Manes) */ -final class MpscCompoundBuffer implements Buffer { +final class MpscCompoundBuffer implements ReadBuffer { final MpscCompoundQueue queue; final Lock evictionLock; long drained; diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/Buffer.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/ReadBuffer.java similarity index 97% rename from caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/Buffer.java rename to caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/ReadBuffer.java index b2edd7b751..55b9124404 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/Buffer.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/ReadBuffer.java @@ -21,7 +21,7 @@ * * @author ben.manes@gmail.com (Ben Manes) */ -public interface Buffer { +public interface ReadBuffer { static final int MAX_SIZE = 32; // power of 2 static final int MAX_SIZE_MASK = MAX_SIZE - 1; diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/TicketBuffer.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/TicketBuffer.java index a3d85760c4..85bf69df89 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/TicketBuffer.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/TicketBuffer.java @@ -33,7 +33,7 @@ * * @author ben.manes@gmail.com (Ben Manes) */ -final class TicketBuffer implements Buffer { +final class TicketBuffer implements ReadBuffer { final Lock evictionLock; final AtomicLong writeCounter; final AtomicReference[] buffer; diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 227d50e358..c4bd9bf766 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -51,13 +51,13 @@ ext { benchmark_versions = [ concurrentlinkedhashmap: '1.4.2', high_scale_lib: '1.0.6', - jamm: '0.3.0', + jamm: '0.3.1', java_object_layout: '0.3.1', koloboke: '0.6.6', ] plugin_versions = [ bundle: '0.6.2', - checkstyle: '6.4.1', + checkstyle: '6.5', coveralls: '2.3.1', error_prone: '0.0.6', jmh: '0.2.0',