diff --git a/build.gradle b/build.gradle index 03da078923..4cb9258d28 100644 --- a/build.gradle +++ b/build.gradle @@ -43,12 +43,6 @@ subprojects { proj -> sourceCompatibility = JavaVersion.VERSION_1_8 - tasks.withType(JavaCompile) { - if (!System.env.'CI') { - options.incremental = !rootProject.hasProperty('release') - } - } - group = 'com.github.ben-manes.caffeine' version.with { major = 1 // incompatible API changes diff --git a/caffeine/src/jmh/java/com/github/benmanes/caffeine/cache/ReadBufferBenchmark.java b/caffeine/src/jmh/java/com/github/benmanes/caffeine/cache/ReadBufferBenchmark.java index 65fd93e3b9..2668a3ecbb 100644 --- a/caffeine/src/jmh/java/com/github/benmanes/caffeine/cache/ReadBufferBenchmark.java +++ b/caffeine/src/jmh/java/com/github/benmanes/caffeine/cache/ReadBufferBenchmark.java @@ -23,7 +23,7 @@ import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; -import com.github.benmanes.caffeine.cache.buffer.Buffer; +import com.github.benmanes.caffeine.cache.buffer.ReadBuffer; import com.github.benmanes.caffeine.cache.buffer.BufferType; /** @@ -46,7 +46,7 @@ public class ReadBufferBenchmark { @Param BufferType bufferType; - Buffer buffer; + ReadBuffer buffer; @Setup public void setup() { diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedBuffer.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedBuffer.java index 211e458e76..2f9491a278 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedBuffer.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedBuffer.java @@ -20,191 +20,110 @@ import java.util.function.Consumer; /** - * A multiple-producer / single-consumer bounded buffer that rejects new elements if it is full or - * fails spuriously due to contention. Unlike a queue and stack, a buffer does not guarantee an - * ordering of elements in either FIFO or LIFO order. - *

- * Beware that it is the responsibility of the caller to ensure that a consumer has exclusive read - * access to the buffer. This implementation does not include fail-fast behavior to guard - * against incorrect consumer usage. + * A striped, non-blocking, bounded buffer. * * @param the type of elements maintained by this buffer * @author ben.manes@gmail.com (Ben Manes) */ -final class BoundedBuffer { - +final class BoundedBuffer extends StripedBuffer { /* - * A segmented, non-blocking, circular ring buffer is used to store the elements being transfered - * by the producers to the consumer. The monotonically increasing count of reads and writes allow - * indexing sequentially to the next element location. The arrays use power-of-two sizing for - * quickly determining to the proper location. + * A circular ring buffer stores the elements being transfered by the producers to the consumer. + * The monotonically increasing count of reads and writes allow indexing sequentially to the next + * element location based upon a power-of-two sizing. * * The producers race to read the counts, check if there is available capacity, and if so then try * once to CAS to the next write count. If the increment is successful then the producer lazily - * publishes the next element. The producer does not retry or block when unsuccessful due to a - * failed CAS or the buffer being full. + * publishes the element. The producer does not retry or block when unsuccessful due to a failed + * CAS or the buffer being full. * * The consumer reads the counts and takes the available elements. The clearing of the elements * and the next read count are lazily set. * - * To further increase concurrency the buffer is internally segmented into multiple ring buffers. - * The number of segments is determined as a size that minimize contention that may cause spurious - * failures for producers. The segment is chosen by a hash of the thread's id. + * This implementation is striped to further increase concurrency by rehashing and dynamically + * adding new buffers when contention is detected, up to an internal maximum. When rehashing in + * order to discover an available buffer, the producer may retry adding its element to determine + * whether it found a satisfactory buffer or if resizing is necessary. */ - /** The number of CPUs */ - static final int NCPU = Runtime.getRuntime().availableProcessors(); - - /** The number of read buffers to use. */ - static final int NUMBER_OF_SEGMENTS = 4 * ceilingNextPowerOfTwo(NCPU); - - /** Mask value for indexing into the read buffers. */ - static final int SEGMENT_MASK = NUMBER_OF_SEGMENTS - 1; - /** The maximum number of pending reads per buffer. */ - static final int RING_BUFFER_SIZE = 32; + static final int BUFFER_SIZE = 32; /** Mask value for indexing into the read buffer. */ - static final int RING_BUFFER_MASK = RING_BUFFER_SIZE - 1; + static final int BUFFER_MASK = BUFFER_SIZE - 1; - static int ceilingNextPowerOfTwo(int x) { - // From Hacker's Delight, Chapter 3, Harry S. Warren Jr. - return 1 << (Integer.SIZE - Integer.numberOfLeadingZeros(x - 1)); + @Override + protected Buffer create(E e) { + return new RingBuffer<>(e); } - final AtomicLong[] readCount; - final AtomicLong[] writeCount; - final AtomicReference[][] table; - - @SuppressWarnings({"unchecked", "cast", "rawtypes"}) - public BoundedBuffer() { - readCount = new AtomicLong[NUMBER_OF_SEGMENTS]; - writeCount = new AtomicLong[NUMBER_OF_SEGMENTS]; - table = new AtomicReference[NUMBER_OF_SEGMENTS][RING_BUFFER_SIZE]; - for (int i = 0; i < NUMBER_OF_SEGMENTS; i++) { - table[i] = new AtomicReference[RING_BUFFER_SIZE]; - for (int j = 0; j < RING_BUFFER_SIZE; j++) { - table[i][j] = new AtomicReference<>(); + static final class RingBuffer implements Buffer { + final AtomicLong readCounter; + final AtomicLong writeCounter; + final AtomicReference[] buffer; + + @SuppressWarnings({"unchecked", "cast", "rawtypes"}) + public RingBuffer(E e) { + readCounter = new AtomicLong(); + writeCounter = new AtomicLong(1); + buffer = new AtomicReference[BUFFER_SIZE]; + for (int i = 0; i < BUFFER_SIZE; i++) { + buffer[i] = new AtomicReference<>(); } - readCount[i] = new AtomicLong(); - writeCount[i] = new AtomicLong(); + buffer[0].lazySet(e); } - } - - /** - * Inserts the specified element into this buffer if it is possible to do so immediately without - * violating capacity restrictions. The addition is allowed to fail spuriously if multiple - * threads insert concurrently. - * - * @param e the element to add - * @return {@code true} if the element was or could have been added; {@code false} if full - */ - public boolean submit(E e) { - final int segmentIndex = segmentIndex(); - final AtomicLong readCounter = readCount[segmentIndex]; - final AtomicLong writeCounter = writeCount[segmentIndex]; - long head = readCounter.get(); - long tail = writeCounter.get(); - long size = (tail - head); - if (size >= RING_BUFFER_SIZE) { - return false; - } - if (writeCounter.compareAndSet(tail, tail + 1)) { - int index = (int) (tail & RING_BUFFER_MASK); - table[segmentIndex][index].lazySet(e); + @Override + public int offer(E e) { + long head = readCounter.get(); + long tail = writeCounter.get(); + long size = (tail - head); + if (size >= BUFFER_SIZE) { + return Buffer.FULL; + } + if (writeCounter.compareAndSet(tail, tail + 1)) { + int index = (int) (tail & BUFFER_MASK); + buffer[index].lazySet(e); + return Buffer.SUCCESS; + } + return Buffer.FAILED; } - return true; - } - /** - * Drains the buffer, sending each element to the consumer for processing. The caller must ensure - * that a consumer has exclusive read access to the buffer. - * - * @param consumer the action to perform on each element - */ - public void drain(Consumer consumer) { - final int start = segmentIndex(); - final int end = start + NUMBER_OF_SEGMENTS; - for (int i = start; i < end; i++) { - drainSegment(consumer, i & SEGMENT_MASK); + @Override + public void drain(Consumer consumer) { + long head = readCounter.get(); + long tail = writeCounter.get(); + long size = (tail - head); + if (size == 0) { + return; + } + do { + int index = (int) (head & BUFFER_MASK); + AtomicReference slot = buffer[index]; + E e = slot.get(); + if (e == null) { + // not published yet + break; + } + slot.lazySet(null); + consumer.accept(e); + head++; + } while (head != tail); + readCounter.lazySet(head); } - } - /** - * Drains an segment. - * - * @param consumer the action to perform on each element - * @param segmentIndex the segment index in the table - */ - private void drainSegment(Consumer consumer, int segmentIndex) { - final AtomicLong readCounter = readCount[segmentIndex]; - final AtomicLong writeCounter = writeCount[segmentIndex]; - - long head = readCounter.get(); - long tail = writeCounter.get(); - long size = (tail - head); - if (size == 0) { - return; + @Override + public int size() { + return writes() - reads(); } - do { - int index = (int) (head & RING_BUFFER_MASK); - AtomicReference slot = table[segmentIndex][index]; - E e = slot.get(); - if (e == null) { - // not published yet - break; - } - slot.lazySet(null); - consumer.accept(e); - head++; - } while (head != tail); - readCounter.lazySet(head); - } - - /** - * Returns the number of elements residing in the buffer. - * - * @return the number of elements in this buffer - */ - public int size() { - return writes() - reads(); - } - /** - * Returns the number of elements that have been written to the buffer. - * - * @return the number of elements written to this buffer - */ - public int writes() { - int writes = 0; - for (AtomicLong counter : writeCount) { - writes += counter.intValue(); + @Override + public int reads() { + return readCounter.intValue(); } - return writes; - } - /** - * Returns the number of elements that have been read from the buffer. - * - * @return the number of elements read from this buffer - */ - public int reads() { - int reads = 0; - for (AtomicLong counter : readCount) { - reads += counter.intValue(); + @Override + public int writes() { + return writeCounter.intValue(); } - return reads; - } - - /** - * Returns the index to the ring buffer to record into. Uses a one-step FNV-1a hash code - * (http://www.isthe.com/chongo/tech/comp/fnv) based on the current thread's id. These hash codes - * have more uniform distribution properties with respect to small moduli (here 1-31) than do - * other simple hashing functions. - */ - static int segmentIndex() { - int id = (int) Thread.currentThread().getId(); - return ((id ^ 0x811c9dc5) * 0x01000193) & SEGMENT_MASK; } } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index b832b4f6fd..d62fee8b78 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -129,7 +129,7 @@ abstract class BoundedLocalCache extends AbstractMap implements Loca // The policy management final AtomicReference drainStatus; - final BoundedBuffer> readBuffer; + final Buffer> readBuffer; final NonReentrantLock evictionLock; final Weigher weigher; final boolean isAsync; @@ -143,7 +143,6 @@ abstract class BoundedLocalCache extends AbstractMap implements Loca protected BoundedLocalCache(Caffeine builder, @Nullable CacheLoader loader, boolean isAsync) { this.isAsync = isAsync; - readBuffer = new BoundedBuffer<>(); weigher = builder.getWeigher(isAsync); evictionLock = new NonReentrantLock(); id = tracer().register(builder.name()); @@ -153,6 +152,9 @@ protected BoundedLocalCache(Caffeine builder, builder.isStrongValues(), builder.isWeakValues(), builder.isSoftValues(), builder.expiresAfterAccess(), builder.expiresAfterWrite(), builder.refreshes(), builder.evicts(), (isAsync && builder.evicts()) || builder.isWeighted()); + readBuffer = evicts() || collectKeys() || collectValues() || expiresAfterAccess() + ? new BoundedBuffer<>() + : Buffer.disabled(); } /** Returns if the node's value is currently being computed, asynchronously. */ @@ -469,23 +471,28 @@ void afterRead(Node node, boolean recordHit) { long now = ticker().read(); node.setAccessTime(now); - boolean delayable = readBuffer.submit(node); + boolean delayable = (readBuffer.offer(node) != Buffer.FULL); drainOnReadIfNeeded(delayable); + refreshIfNeeded(node, now); + } - if (refreshAfterWrite()) { - long writeTime = node.getWriteTime(); - if (((now - writeTime) > refreshAfterWriteNanos()) && node.casWriteTime(writeTime, now)) { - executor().execute(() -> { - K key = node.getKey(); - if ((key != null) && node.isAlive()) { - try { - computeIfPresent(key, cacheLoader()::reload); - } catch (Throwable t) { - logger.log(Level.WARNING, "Exception thrown during reload", t); - } + /** Asynchronously refreshes the entry if eligible. */ + void refreshIfNeeded(Node node, long now) { + if (!refreshAfterWrite()) { + return; + } + long writeTime = node.getWriteTime(); + if (((now - writeTime) > refreshAfterWriteNanos()) && node.casWriteTime(writeTime, now)) { + executor().execute(() -> { + K key = node.getKey(); + if ((key != null) && node.isAlive()) { + try { + computeIfPresent(key, cacheLoader()::reload); + } catch (Throwable t) { + logger.log(Level.WARNING, "Exception thrown during reload", t); } - }); - } + } + }); } } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Buffer.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Buffer.java new file mode 100644 index 0000000000..21d88cdde1 --- /dev/null +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Buffer.java @@ -0,0 +1,93 @@ +/* + * Copyright 2015 Ben Manes. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache; + +import java.util.function.Consumer; + +import javax.annotation.Nonnull; + +/** + * A multiple-producer / single-consumer buffer that rejects new elements if it is full or + * fails spuriously due to contention. Unlike a queue and stack, a buffer does not guarantee an + * ordering of elements in either FIFO or LIFO order. + *

+ * Beware that it is the responsibility of the caller to ensure that a consumer has exclusive read + * access to the buffer. This implementation does not include fail-fast behavior to guard + * against incorrect consumer usage. + * + * @param the type of elements maintained by this buffer + * @author ben.manes@gmail.com (Ben Manes) + */ +interface Buffer { + static final int FULL = 1; + static final int SUCCESS = 0; + static final int FAILED = -1; + + /** Returns a no-op implementation. */ + @SuppressWarnings("unchecked") + static Buffer disabled() { + return (Buffer) DisabledBuffer.INSTANCE; + } + + /** + * Inserts the specified element into this buffer if it is possible to do so immediately without + * violating capacity restrictions. The addition is allowed to fail spuriously if multiple + * threads insert concurrently. + * + * @param e the element to add + * @return {@code 1} if the buffer is full, {@code -1} if the CAS failed, or {@code 0} if added + */ + int offer(@Nonnull E e); + + /** + * Drains the buffer, sending each element to the consumer for processing. The caller must ensure + * that a consumer has exclusive read access to the buffer. + * + * @param consumer the action to perform on each element + */ + void drain(@Nonnull Consumer consumer); + + /** + * Returns the number of elements residing in the buffer. + * + * @return the number of elements in this buffer + */ + int size(); + + /** + * Returns the number of elements that have been read from the buffer. + * + * @return the number of elements read from this buffer + */ + public int reads(); + + /** + * Returns the number of elements that have been written to the buffer. + * + * @return the number of elements written to this buffer + */ + public int writes(); +} + +enum DisabledBuffer implements Buffer { + INSTANCE; + + @Override public int offer(Object e) { return Buffer.SUCCESS; } + @Override public void drain(Consumer consumer) {} + @Override public int size() { return 0; } + @Override public int reads() { return 0; } + @Override public int writes() { return 0; } +} diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/StripedBuffer.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/StripedBuffer.java new file mode 100644 index 0000000000..ba6d3c2de5 --- /dev/null +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/StripedBuffer.java @@ -0,0 +1,286 @@ +/* + * Copyright 2015 Ben Manes. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; + +import com.github.benmanes.caffeine.base.UnsafeAccess; + +/** + * A base class providing the mechanics for supporting dynamic striping of bounded buffers. This + * implementation is an adaption of the numeric 64-bit {@link java.util.concurrent.atomic.Striped64} + * class, which is used to by atomic counters. The approach was modified to lazily grow an array of + * buffers in order to minimize memory usage for caches that are not heavily contended on. + * + * @author ben.manes@gmail.com (Ben Manes) + */ +abstract class StripedBuffer implements Buffer { + /* + * This class maintains a lazily-initialized table of atomically updated buffers. The table size + * is a power of two. Indexing uses masked per-thread hash codes. Nearly all declarations in this + * class are package-private, accessed directly by subclasses. + * + * Table entries are of class Buffer and should be padded to reduce cache contention. Padding is + * overkill for most atomics because they are usually irregularly scattered in memory and thus + * don't interfere much with each other. But Atomic objects residing in arrays will tend to be + * placed adjacent to each other, and so will most often share cache lines (with a huge negative + * performance impact) without this precaution. + * + * In part because Buffers are relatively large, we avoid creating them until they are needed. + * When there is no contention, all updates are made to a single buffer. Upon contention (a failed + * CAS inserting into the buffer), the table is expanded to size 2. The table size is doubled upon + * further contention until reaching the nearest power of two greater than or equal to the number + * of CPUS. Table slots remain empty (null) until they are needed. + * + * A single spinlock ("tableBusy") is used for initializing and resizing the table, as well as + * populating slots with new Buffers. There is no need for a blocking lock; when the lock is not + * available, threads try other slots. During these retries, there is increased contention and + * reduced locality, which is still better than alternatives. + * + * The Thread probe fields maintained via ThreadLocalRandom serve as per-thread hash codes. We let + * them remain uninitialized as zero (if they come in this way) until they contend at slot 0. They + * are then initialized to values that typically do not often conflict with others. Contention + * and/or table collisions are indicated by failed CASes when performing an update operation. Upon + * a collision, if the table size is less than the capacity, it is doubled in size unless some + * other thread holds the lock. If a hashed slot is empty, and lock is available, a new Buffer is + * created. Otherwise, if the slot exists, a CAS is tried. Retries proceed by "double hashing", + * using a secondary hash (Marsaglia XorShift) to try to find a free slot. + * + * The table size is capped because, when there are more threads than CPUs, supposing that each + * thread were bound to a CPU, there would exist a perfect hash function mapping threads to slots + * that eliminates collisions. When we reach capacity, we search for this mapping by randomly + * varying the hash codes of colliding threads. Because search is random, and collisions only + * become known via CAS failures, convergence can be slow, and because threads are typically not + * bound to CPUS forever, may not occur at all. However, despite these limitations, observed + * contention rates are typically low in these cases. + * + * It is possible for a Buffer to become unused when threads that once hashed to it terminate, as + * well as in the case where doubling the table causes no thread to hash to it under expanded + * mask. We do not try to detect or remove buffers, under the assumption that for long-running + * instances, observed contention levels will recur, so the buffers will eventually be needed + * again; and for short-lived ones, it does not matter. + */ + + static final long TABLE_BUSY = UnsafeAccess.objectFieldOffset(StripedBuffer.class, "tableBusy"); + static final long PROBE = UnsafeAccess.objectFieldOffset(Thread.class, "threadLocalRandomProbe"); + + /** Number of CPUS. */ + static final int NCPU = Runtime.getRuntime().availableProcessors(); + + /** The bound on the table size. */ + static final int MAXIMUM_TABLE_SIZE = 4 * ceilingNextPowerOfTwo(NCPU); + + /** The maximum number of attempts when trying to expand the table. */ + static final int ATTEMPTS = 3; + + /** Table of buffers. When non-null, size is a power of 2. */ + transient volatile Buffer[] table; + + /** Spinlock (locked via CAS) used when resizing and/or creating Buffers. */ + transient volatile int tableBusy; + + /** CASes the tableBusy field from 0 to 1 to acquire lock. */ + final boolean casTableBusy() { + return UnsafeAccess.UNSAFE.compareAndSwapInt(this, TABLE_BUSY, 0, 1); + } + + /** + * Returns the probe value for the current thread. Duplicated from ThreadLocalRandom because of + * packaging restrictions. + */ + static final int getProbe() { + return UnsafeAccess.UNSAFE.getInt(Thread.currentThread(), PROBE); + } + + /** + * Pseudo-randomly advances and records the given probe value for the given thread. Duplicated + * from ThreadLocalRandom because of packaging restrictions. + */ + static final int advanceProbe(int probe) { + probe ^= probe << 13; // xorshift + probe ^= probe >>> 17; + probe ^= probe << 5; + UnsafeAccess.UNSAFE.putInt(Thread.currentThread(), PROBE, probe); + return probe; + } + + /** Returns the closest power-of-two at or higher than the given value. */ + static int ceilingNextPowerOfTwo(int x) { + // From Hacker's Delight, Chapter 3, Harry S. Warren Jr. + return 1 << (Integer.SIZE - Integer.numberOfLeadingZeros(x - 1)); + } + + /** + * Creates a new buffer instance after resizing to accommodate a producer. + * + * @param e the producer's element + * @return a newly created buffer populated with a single element + */ + protected abstract Buffer create(E e); + + @Override + public int offer(E e) { + int mask; + int result = 0; + Buffer buffer; + boolean uncontended = true; + Buffer[] buffers = table; + if ((buffers == null) + || (mask = buffers.length - 1) < 0 + || (buffer = buffers[getProbe() & mask]) == null + || !(uncontended = ((result = buffer.offer(e)) != Buffer.FAILED))) { + expandOrRetry(e, uncontended); + } + return result; + } + + @Override + public void drain(Consumer consumer) { + Buffer[] buffers = table; + if (buffers == null) { + return; + } + for (Buffer buffer : buffers) { + if (buffer != null) { + buffer.drain(consumer); + } + } + } + + @Override + public int size() { + return writes() - reads(); + } + + @Override + public int reads() { + Buffer[] buffers = table; + if (buffers == null) { + return 0; + } + int reads = 0; + for (Buffer buffer : buffers) { + if (buffer != null) { + reads += buffer.reads(); + } + } + return reads; + } + + @Override + public int writes() { + Buffer[] buffers = table; + if (buffers == null) { + return 0; + } + int writes = 0; + for (Buffer buffer : buffers) { + if (buffer != null) { + writes += buffer.writes(); + } + } + return writes; + } + + /** + * Handles cases of updates involving initialization, resizing, creating new Buffers, and/or + * contention. See above for explanation. This method suffers the usual non-modularity problems of + * optimistic retry code, relying on rechecked sets of reads. + * + * @param e the element to add + * @param wasUncontended false if CAS failed before call + */ + final void expandOrRetry(E e, boolean wasUncontended) { + int h; + if ((h = getProbe()) == 0) { + ThreadLocalRandom.current(); // force initialization + h = getProbe(); + wasUncontended = true; + } + boolean collide = false; // True if last slot nonempty + for (int attempt = 0; attempt < ATTEMPTS; attempt++) { + Buffer[] buffers; + Buffer buffer; + int n; + if ((buffers = table) != null && (n = buffers.length) > 0) { + if ((buffer = buffers[(n - 1) & h]) == null) { + if (tableBusy == 0) { // Try to attach new Buffer + if (tableBusy == 0 && casTableBusy()) { + boolean created = false; + try { // Recheck under lock + Buffer[] rs; + int mask, j; + if (((rs = table) != null) && ((mask = rs.length) > 0) + && (rs[j = (mask - 1) & h] == null)) { + rs[j] = create(e); + created = true; + } + } finally { + tableBusy = 0; + } + if (created) { + break; + } + continue; // Slot is now non-empty + } + } + collide = false; + } else if (!wasUncontended) { // CAS already known to fail + wasUncontended = true; // Continue after rehash + } else if (buffer.offer(e) != Buffer.FAILED) { + break; + } else if (n >= MAXIMUM_TABLE_SIZE || table != buffers) { + collide = false; // At max size or stale + } else if (!collide) { + collide = true; + } else if (tableBusy == 0 && casTableBusy()) { + try { + if (table == buffers) { // Expand table unless stale + @SuppressWarnings({"unchecked", "rawtypes"}) + Buffer[] rs = new Buffer[n << 1]; + for (int i = 0; i < n; ++i) { + rs[i] = buffers[i]; + } + table = rs; + } + } finally { + tableBusy = 0; + } + collide = false; + continue; // Retry with expanded table + } + h = advanceProbe(h); + } else if ((tableBusy == 0) && (table == buffers) && casTableBusy()) { + boolean init = false; + try { // Initialize table + if (table == buffers) { + @SuppressWarnings({"unchecked", "rawtypes"}) + Buffer[] rs = new Buffer[1]; + rs[0] = create(e); + table = rs; + init = true; + } + } finally { + tableBusy = 0; + } + if (init) { + break; + } + } + } + } +} diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedBufferTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedBufferTest.java index de4fb52452..dbce777f20 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedBufferTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedBufferTest.java @@ -42,10 +42,10 @@ public Object[][] buffer() { } @Test(dataProvider = "buffer") - public void submit(BoundedBuffer buffer) { + public void offer(BoundedBuffer buffer) { ConcurrentTestHarness.timeTasks(10, () -> { for (int i = 0; i < 100; i++) { - buffer.submit(DUMMY); + buffer.offer(DUMMY); } }); assertThat(buffer.writes(), is(greaterThan(0))); @@ -54,8 +54,8 @@ public void submit(BoundedBuffer buffer) { @Test(dataProvider = "buffer") public void drain(BoundedBuffer buffer) { - for (int i = 0; i < BoundedBuffer.RING_BUFFER_SIZE; i++) { - buffer.submit(DUMMY); + for (int i = 0; i < BoundedBuffer.BUFFER_SIZE; i++) { + buffer.offer(DUMMY); } int[] read = new int[1]; buffer.drain(e -> read[0]++); @@ -64,12 +64,12 @@ public void drain(BoundedBuffer buffer) { } @Test(dataProvider = "buffer") - public void submitAndDrain(BoundedBuffer buffer) { + public void offerAndDrain(BoundedBuffer buffer) { Lock lock = new ReentrantLock(); AtomicInteger reads = new AtomicInteger(); ConcurrentTestHarness.timeTasks(10, () -> { for (int i = 0; i < 1000; i++) { - boolean shouldDrain = !buffer.submit(DUMMY); + boolean shouldDrain = (buffer.offer(DUMMY) == Buffer.FULL); if (shouldDrain && lock.tryLock()) { buffer.drain(e -> reads.incrementAndGet()); lock.unlock(); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java index 82cef0b8bd..8bfb7be3ce 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java @@ -253,14 +253,14 @@ public void exceedsMaximumBufferSize_onRead(Cache cache) { BoundedLocalCache localCache = asBoundedLocalCache(cache); Node dummy = localCache.nodeFactory.newNode(null, null, null, 1, 0); - BoundedBuffer> buffer = localCache.readBuffer; - for (int i = 0; i < BoundedBuffer.RING_BUFFER_SIZE; i++) { - buffer.submit(dummy); + Buffer> buffer = localCache.readBuffer; + for (int i = 0; i < BoundedBuffer.BUFFER_SIZE; i++) { + buffer.offer(dummy); } - assertThat(buffer.submit(dummy), is(false)); + assertThat(buffer.offer(dummy), is(Buffer.FULL)); localCache.afterRead(dummy, true); - assertThat(buffer.submit(dummy), is(true)); + assertThat(buffer.offer(dummy), is(not(Buffer.FULL))); } @Test(dataProvider = "caches") @@ -283,14 +283,14 @@ public void exceedsMaximumBufferSize_onWrite(Cache cache) { public void drain_onRead(Cache cache, CacheContext context) { BoundedLocalCache localCache = asBoundedLocalCache(cache); - BoundedBuffer> buffer = localCache.readBuffer; - for (int i = 0; i < BoundedBuffer.RING_BUFFER_SIZE; i++) { + Buffer> buffer = localCache.readBuffer; + for (int i = 0; i < BoundedBuffer.BUFFER_SIZE; i++) { localCache.get(context.firstKey()); } int pending = buffer.size(); assertThat(buffer.writes(), is(equalTo(pending))); - assertThat(pending, is(BoundedBuffer.RING_BUFFER_SIZE)); + assertThat(pending, is(BoundedBuffer.BUFFER_SIZE)); localCache.get(context.firstKey()); assertThat(buffer.size(), is(0)); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/IsValidBoundedLocalCache.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/IsValidBoundedLocalCache.java index 3afce00d2f..5a88fb2a03 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/IsValidBoundedLocalCache.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/IsValidBoundedLocalCache.java @@ -72,7 +72,7 @@ private void checkReadBuffer(BoundedLocalCache cache) { if (!cache.evicts() && !cache.expiresAfterAccess()) { return; } - BoundedBuffer buffer = cache.readBuffer; + Buffer buffer = cache.readBuffer; desc.expectThat("buffer is empty", buffer.size(), is(0)); desc.expectThat("buffer reads = writes", buffer.reads(), is(buffer.writes())); } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/StripedBufferTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/StripedBufferTest.java new file mode 100644 index 0000000000..9eaefefaf4 --- /dev/null +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/StripedBufferTest.java @@ -0,0 +1,114 @@ +/* + * Copyright 2015 Ben Manes. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache; + +import static java.util.Objects.requireNonNull; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; + +import java.util.function.Consumer; + +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import com.github.benmanes.caffeine.ConcurrentTestHarness; +import com.google.common.base.MoreObjects; + +/** + * @author ben.manes@gmail.com (Ben Manes) + */ +public final class StripedBufferTest { + static final Integer ELEMENT = 1; + + @Test(dataProvider = "buffers") + public void init(FakeBuffer buffer) { + assertThat(buffer.table, is(nullValue())); + + buffer.offer(ELEMENT); + assertThat(buffer.table.length, is(1)); + } + + @Test(dataProvider = "buffers") + public void produce(FakeBuffer buffer) { + ConcurrentTestHarness.timeTasks(10, () -> { + for (int i = 0; i < 10; i++) { + buffer.offer(ELEMENT); + Thread.yield(); + } + }); + assertThat(buffer.table.length, lessThanOrEqualTo(StripedBuffer.MAXIMUM_TABLE_SIZE)); + } + + @Test(dataProvider = "buffers") + public void drain(FakeBuffer buffer) { + buffer.drain(e -> {}); + assertThat(buffer.drains, is(0)); + + // Expand and drain + buffer.offer(ELEMENT); + buffer.drain(e -> {}); + assertThat(buffer.drains, is(1)); + } + + @DataProvider + public Object[][] buffers() { + return new Object[][] { + { new FakeBuffer(Buffer.FULL) }, + { new FakeBuffer(Buffer.FAILED) }, + { new FakeBuffer(Buffer.SUCCESS) }, + }; + } + + static int ceilingNextPowerOfTwo(int x) { + return 1 << (Integer.SIZE - Integer.numberOfLeadingZeros(x - 1)); + } + + static final class FakeBuffer extends StripedBuffer { + final int result; + int drains = 0; + + FakeBuffer(int result) { + this.result = requireNonNull(result); + } + + @Override protected Buffer create(E e) { + return new Buffer() { + @Override public int offer(E e) { + return result; + } + @Override public void drain(Consumer consumer) { + drains++; + } + @Override public int size() { + return 0; + } + @Override public int reads() { + return 0; + } + @Override public int writes() { + return 0; + } + }; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).addValue(result).toString(); + } + } +} diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/BufferTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/BufferTest.java index 06a8d8310c..6f707cad39 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/BufferTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/BufferTest.java @@ -42,7 +42,7 @@ public Iterator buffers() { } @Test(dataProvider = "buffers") - public void record(Buffer buffer) { + public void record(ReadBuffer buffer) { ConcurrentTestHarness.timeTasks(100, () -> { for (int i = 0; i < 1000; i++) { buffer.record(); @@ -50,12 +50,12 @@ public void record(Buffer buffer) { } }); long recorded = buffer.recorded(); - assertThat(recorded, is((long) Buffer.MAX_SIZE)); + assertThat(recorded, is((long) ReadBuffer.MAX_SIZE)); } @Test(dataProvider = "buffers") - public void drain(Buffer buffer) { - for (int i = 0; i < 2 * Buffer.MAX_SIZE; i++) { + public void drain(ReadBuffer buffer) { + for (int i = 0; i < 2 * ReadBuffer.MAX_SIZE; i++) { buffer.record(); } buffer.drain(); @@ -65,7 +65,7 @@ public void drain(Buffer buffer) { } @Test(dataProvider = "buffers") - public void recordAndDrain(Buffer buffer) { + public void recordAndDrain(ReadBuffer buffer) { ConcurrentTestHarness.timeTasks(100, () -> { for (int i = 0; i < 1000; i++) { boolean shouldDrain = buffer.record(); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/BufferType.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/BufferType.java index 2848727fb5..2e55b4efd4 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/BufferType.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/BufferType.java @@ -30,14 +30,14 @@ public enum BufferType { MpmcArray(MpmcArrayBuffer::new), MpscCompound(MpscCompoundBuffer::new); - private final Supplier factory; + private final Supplier factory; - private BufferType(Supplier factory) { + private BufferType(Supplier factory) { this.factory = factory; } /** Returns a new buffer instance. */ - public Buffer create() { + public ReadBuffer create() { return factory.get(); } } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/FastFlowBuffer.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/FastFlowBuffer.java index 4acdf088cc..e428e036a2 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/FastFlowBuffer.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/FastFlowBuffer.java @@ -33,7 +33,7 @@ * * @author ben.manes@gmail.com (Ben Manes) */ -final class FastFlowBuffer implements Buffer { +final class FastFlowBuffer implements ReadBuffer { final Lock evictionLock; final AtomicLong readCache; diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/ManyToOneBuffer.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/ManyToOneBuffer.java index 4b8eb0552d..d57a0fd663 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/ManyToOneBuffer.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/ManyToOneBuffer.java @@ -29,7 +29,7 @@ * * @author ben.manes@gmail.com (Ben Manes) */ -final class ManyToOneBuffer implements Buffer { +final class ManyToOneBuffer implements ReadBuffer { final Lock evictionLock; final AtomicLong readCounter; final AtomicLong writeCounter; diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/MpmcArrayBuffer.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/MpmcArrayBuffer.java index 8117c3fc4d..3f84ecb88d 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/MpmcArrayBuffer.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/MpmcArrayBuffer.java @@ -24,7 +24,7 @@ /** * @author ben.manes@gmail.com (Ben Manes) */ -final class MpmcArrayBuffer implements Buffer { +final class MpmcArrayBuffer implements ReadBuffer { final MpmcArrayQueue queue; final Lock evictionLock; long drained; diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/MpscArrayBuffer.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/MpscArrayBuffer.java index 8ead9a3ee1..c868c71fde 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/MpscArrayBuffer.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/MpscArrayBuffer.java @@ -24,7 +24,7 @@ /** * @author ben.manes@gmail.com (Ben Manes) */ -final class MpscArrayBuffer implements Buffer { +final class MpscArrayBuffer implements ReadBuffer { final MpscArrayQueue queue; final Lock evictionLock; long drained; diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/MpscCompoundBuffer.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/MpscCompoundBuffer.java index e3f29c5a46..f5e0a1ce49 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/MpscCompoundBuffer.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/MpscCompoundBuffer.java @@ -24,7 +24,7 @@ /** * @author ben.manes@gmail.com (Ben Manes) */ -final class MpscCompoundBuffer implements Buffer { +final class MpscCompoundBuffer implements ReadBuffer { final MpscCompoundQueue queue; final Lock evictionLock; long drained; diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/Buffer.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/ReadBuffer.java similarity index 97% rename from caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/Buffer.java rename to caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/ReadBuffer.java index b2edd7b751..55b9124404 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/Buffer.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/ReadBuffer.java @@ -21,7 +21,7 @@ * * @author ben.manes@gmail.com (Ben Manes) */ -public interface Buffer { +public interface ReadBuffer { static final int MAX_SIZE = 32; // power of 2 static final int MAX_SIZE_MASK = MAX_SIZE - 1; diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/TicketBuffer.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/TicketBuffer.java index a3d85760c4..85bf69df89 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/TicketBuffer.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/buffer/TicketBuffer.java @@ -33,7 +33,7 @@ * * @author ben.manes@gmail.com (Ben Manes) */ -final class TicketBuffer implements Buffer { +final class TicketBuffer implements ReadBuffer { final Lock evictionLock; final AtomicLong writeCounter; final AtomicReference[] buffer; diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 227d50e358..c4bd9bf766 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -51,13 +51,13 @@ ext { benchmark_versions = [ concurrentlinkedhashmap: '1.4.2', high_scale_lib: '1.0.6', - jamm: '0.3.0', + jamm: '0.3.1', java_object_layout: '0.3.1', koloboke: '0.6.6', ] plugin_versions = [ bundle: '0.6.2', - checkstyle: '6.4.1', + checkstyle: '6.5', coveralls: '2.3.1', error_prone: '0.0.6', jmh: '0.2.0',