From 83ae0dcd365b2db5ab0f5a93ce0c56c0bc914de4 Mon Sep 17 00:00:00 2001 From: Ben Manes Date: Sun, 6 Dec 2020 21:06:00 -0800 Subject: [PATCH] Fix write-time optimization for variable expiration (fixes #478) A high write rate to the same key can overwhelm the write buffer as it may not drop entries and has a maximum capacity. When full this causes backpressure to allow for the mainance task to catch up. A write only needs to be recorded in this buffer when a major event occurs, such as the entry's size changed or expiration time differs. To improve throughput for expireAfterWrite a tolerance of 1s is used to allow for skipping the write buffer and recording into the read buffer instead. This improves throughput by 5x in a same key write benchmark. This optimization was not updated to take into account variable expiration, where the expire time may change based on the calculation determined by configured Expiry. This would cause the entry to not be reordered in the TimerWheel, possibly delaying the automic removal indefinitely. Now when the variable time differs by +/- 1s then the write-time reordering is required. Co-authored-by: Christopher Ng --- .../caffeine/cache/BoundedLocalCache.java | 9 ++- .../caffeine/cache/BoundedLocalCacheTest.java | 79 +++++++++++++++++++ checksum.xml | 1 + gradle/dependencies.gradle | 8 +- 4 files changed, 90 insertions(+), 7 deletions(-) diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index 3a9d6aa646..1e9f009e53 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -2026,7 +2026,7 @@ public Map getAllPresent(Iterable keys) { int oldWeight; boolean expired = false; boolean mayUpdate = true; - boolean exceedsTolerance = true; + boolean exceedsTolerance = false; synchronized (prior) { if (!prior.isAlive()) { continue; @@ -2051,7 +2051,10 @@ public Map getAllPresent(Iterable keys) { writer.write(key, value); } if (mayUpdate) { - exceedsTolerance = ((now - prior.getWriteTime()) > EXPIRE_WRITE_TOLERANCE); + exceedsTolerance = + (expiresAfterWrite() && (now - prior.getWriteTime()) > EXPIRE_WRITE_TOLERANCE) + || (expiresVariable() + && Math.abs(varTime - prior.getVariableTime()) > EXPIRE_WRITE_TOLERANCE); setWriteTime(prior, now); prior.setWeight(newWeight); @@ -2075,7 +2078,7 @@ public Map getAllPresent(Iterable keys) { int weightedDifference = mayUpdate ? (newWeight - oldWeight) : 0; if ((oldValue == null) || (weightedDifference != 0) || expired) { afterWrite(new UpdateTask(prior, weightedDifference)); - } else if (!onlyIfAbsent && expiresAfterWrite() && exceedsTolerance) { + } else if (!onlyIfAbsent && exceedsTolerance) { afterWrite(new UpdateTask(prior, weightedDifference)); } else { if (mayUpdate) { diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java index 30b8b40f1c..c99ab39efc 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java @@ -19,6 +19,7 @@ import static com.github.benmanes.caffeine.cache.BLCHeader.DrainStatusRef.PROCESSING_TO_IDLE; import static com.github.benmanes.caffeine.cache.BLCHeader.DrainStatusRef.PROCESSING_TO_REQUIRED; import static com.github.benmanes.caffeine.cache.BLCHeader.DrainStatusRef.REQUIRED; +import static com.github.benmanes.caffeine.cache.BoundedLocalCache.EXPIRE_WRITE_TOLERANCE; import static com.github.benmanes.caffeine.cache.BoundedLocalCache.PERCENT_MAIN_PROTECTED; import static com.github.benmanes.caffeine.cache.testing.HasRemovalNotifications.hasRemovalNotifications; import static com.github.benmanes.caffeine.cache.testing.HasStats.hasEvictionCount; @@ -34,10 +35,13 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.when; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @@ -85,6 +89,8 @@ static BoundedLocalCache asBoundedLocalCache(Cache) cache.asMap(); } + /* --------------- Maintenance --------------- */ + @Test @SuppressWarnings("UnusedVariable") public void cleanupTask_allowGc() { @@ -169,6 +175,8 @@ public void scheduleDrainBuffers_rejected(Cache cache, CacheCo cache.put(context.absentKey(), context.absentValue()); } + /* --------------- Eviction --------------- */ + @Test public void putWeighted_noOverflow() { Cache cache = Caffeine.newBuilder() @@ -708,4 +716,75 @@ private void adapt(Cache cache, // Fill main protected space cache.asMap().keySet().stream().forEach(cache::getIfPresent); } + + /* --------------- Expiration --------------- */ + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, initialCapacity = InitialCapacity.FULL, + expireAfterWrite = Expire.ONE_MINUTE) + public void put_expireTolerance_expireAfterWrite( + Cache cache, CacheContext context) { + BoundedLocalCache localCache = asBoundedLocalCache(cache); + boolean mayCheckReads = context.isStrongKeys() && context.isStrongValues() + && localCache.readBuffer != Buffer.>disabled(); + + cache.put(1, 1); + assertThat(localCache.writeBuffer().producerIndex, is(2L)); + + // If within the tolerance, treat the update as a read + cache.put(1, 2); + if (mayCheckReads) { + assertThat(localCache.readBuffer.reads(), is(0)); + assertThat(localCache.readBuffer.writes(), is(1)); + } + assertThat(localCache.writeBuffer().producerIndex, is(2L)); + + // If exceeds the tolerance, treat the update as a write + context.ticker().advance(EXPIRE_WRITE_TOLERANCE + 1, TimeUnit.NANOSECONDS); + cache.put(1, 3); + if (mayCheckReads) { + assertThat(localCache.readBuffer.reads(), is(1)); + assertThat(localCache.readBuffer.writes(), is(1)); + } + assertThat(localCache.writeBuffer().producerIndex, is(4L)); + } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, expiry = CacheExpiry.MOCKITO, expiryTime = Expire.ONE_MINUTE) + public void put_expireTolerance_expiry(Cache cache, CacheContext context) { + BoundedLocalCache localCache = asBoundedLocalCache(cache); + cache.put(1, 1); + assertThat(localCache.writeBuffer().producerIndex, is(2L)); + + // If within the tolerance, treat the update as a read + cache.put(1, 2); + assertThat(localCache.readBuffer.reads(), is(0)); + assertThat(localCache.readBuffer.writes(), is(1)); + assertThat(localCache.writeBuffer().producerIndex, is(2L)); + + // If exceeds the tolerance, treat the update as a write + context.ticker().advance(EXPIRE_WRITE_TOLERANCE + 1, TimeUnit.NANOSECONDS); + cache.put(1, 3); + assertThat(localCache.readBuffer.reads(), is(1)); + assertThat(localCache.readBuffer.writes(), is(1)); + assertThat(localCache.writeBuffer().producerIndex, is(4L)); + + // If the expire time reduces by more than the tolerance, treat the update as a write + when(context.expiry().expireAfterUpdate(any(), any(), anyLong(), anyLong())) + .thenReturn(Expire.ONE_MILLISECOND.timeNanos()); + cache.put(1, 4); + assertThat(localCache.readBuffer.reads(), is(1)); + assertThat(localCache.readBuffer.writes(), is(1)); + assertThat(localCache.writeBuffer().producerIndex, is(6L)); + + // If the expire time increases by more than the tolerance, treat the update as a write + when(context.expiry().expireAfterUpdate(any(), any(), anyLong(), anyLong())) + .thenReturn(Expire.FOREVER.timeNanos()); + cache.put(1, 4); + assertThat(localCache.readBuffer.reads(), is(1)); + assertThat(localCache.readBuffer.writes(), is(1)); + assertThat(localCache.writeBuffer().producerIndex, is(8L)); + } } diff --git a/checksum.xml b/checksum.xml index ad8036a634..2db249c803 100644 --- a/checksum.xml +++ b/checksum.xml @@ -95,6 +95,7 @@ + diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index ad4df67aa3..42aa3c289e 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -26,8 +26,8 @@ ext { versions = [ akka: '2.6.10', - cache2k: '1.9.3.Alpha', - checkerFramework: '3.7.1', + cache2k: '1.9.4.Beta', + checkerFramework: '3.8.0', coherence: '20.06', collision: '0.3.3', commonsCompress: '1.20', @@ -41,7 +41,7 @@ ext { elasticSearch: '7.10.0', expiringMap: '0.5.9', fastfilter: 'bf0b02297f', - fastutil: '8.4.3', + fastutil: '8.4.4', flipTables: '1.1.0', googleJavaFormat: '1.7', guava: '30.0-jre', @@ -97,7 +97,7 @@ ext { semanticVersioning: '1.1.0', shadow: '6.1.0', sonarqube: '3.0', - spotbugs: '4.1.4', + spotbugs: '4.2.0', spotbugsPlugin: '4.6.0', stats: '0.2.2', versions: '0.36.0',