diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedBuffer.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedBuffer.java index 273ed83bd1..ca29ef92d0 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedBuffer.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedBuffer.java @@ -15,7 +15,7 @@ */ package com.github.benmanes.caffeine.cache; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.function.Consumer; import com.github.benmanes.caffeine.base.UnsafeAccess; @@ -47,10 +47,12 @@ final class BoundedBuffer extends StripedBuffer { */ /** The maximum number of elements per buffer. */ - static final int BUFFER_SIZE = 32; + static final int BUFFER_SIZE = 16; - /** Mask value for indexing into the ring buffer. */ - static final int BUFFER_MASK = BUFFER_SIZE - 1; + // Assume 4-byte references and 64-byte cache line (16 elements per line) + static final int SPACED_SIZE = BUFFER_SIZE << 4; + static final int SPACED_MASK = SPACED_SIZE - 1; + static final int OFFSET = 16; @Override protected Buffer create(E e) { @@ -58,16 +60,13 @@ protected Buffer create(E e) { } static final class RingBuffer extends BBHeader.ReadAndWriteCounterRef implements Buffer { - final AtomicReference[] buffer; + final AtomicReferenceArray buffer; @SuppressWarnings({"unchecked", "cast", "rawtypes"}) public RingBuffer(E e) { - super(1); - buffer = new AtomicReference[BUFFER_SIZE]; - for (int i = 0; i < BUFFER_SIZE; i++) { - buffer[i] = new AtomicReference<>(); - } - buffer[0].lazySet(e); + super(OFFSET); + buffer = new AtomicReferenceArray<>(SPACED_SIZE); + buffer.lazySet(0, e); } @Override @@ -75,12 +74,12 @@ public int offer(E e) { long head = readCounter; long tail = relaxedWriteCounter(); long size = (tail - head); - if (size >= BUFFER_SIZE) { + if (size >= SPACED_SIZE) { return Buffer.FULL; } - if (casWriteCounter(tail, tail + 1)) { - int index = (int) (tail & BUFFER_MASK); - buffer[index].lazySet(e); + if (casWriteCounter(tail, tail + OFFSET)) { + int index = (int) (tail & SPACED_MASK); + buffer.lazySet(index, e); return Buffer.SUCCESS; } return Buffer.FAILED; @@ -95,28 +94,27 @@ public void drainTo(Consumer consumer) { return; } do { - int index = (int) (head & BUFFER_MASK); - AtomicReference slot = buffer[index]; - E e = slot.get(); + int index = (int) (head & SPACED_MASK); + E e = buffer.get(index); if (e == null) { // not published yet break; } - slot.lazySet(null); + buffer.lazySet(index, null); consumer.accept(e); - head++; + head += OFFSET; } while (head != tail); lazySetReadCounter(head); } @Override public int reads() { - return (int) readCounter; + return (int) readCounter / OFFSET; } @Override public int writes() { - return (int) writeCounter; + return (int) writeCounter / OFFSET; } } }