From 49fed1e4fb41af221f6c4f798525cdd4f8dde26b Mon Sep 17 00:00:00 2001 From: Ben Manes Date: Sun, 28 Feb 2016 17:31:20 -0800 Subject: [PATCH] Fix async refresh from prematurely replacing the value When a refresh-after-write is triggered the entry should be reloaded in the background, the present value available for reads, and atomically replaced when the new value has been loaded. This operation is atomic with other writes to that entry and is blocking (rather than clobbering) if another write is attempted. This should work the same for synchronous and asynchronous caches, but unfortunately it wasn't. For an asycn cache a new, incomplete future was immediately put into the cache and available to be consumed by the next request. Due to layering, the custom reloadAsync was not called and the operation delegated to load instead. This was of course wrong and not the intended (or expected) behavior, so it is now fixed. Thanks to Etienne Houle @ Stingray for notifying me of this problem. --- .../caffeine/profiler/CacheProfiler.java | 9 +++- .../caffeine/cache/BoundedLocalCache.java | 54 ++++++++++++++----- .../caffeine/cache/UnboundedLocalCache.java | 5 +- .../caffeine/cache/AsyncLoadingCacheTest.java | 31 +++++++++++ .../caffeine/cache/RefreshAfterWriteTest.java | 40 ++++++++++++++ .../caffeine/cache/testing/CacheContext.java | 2 + .../caffeine/cache/testing/CacheProvider.java | 30 ++++++----- .../testing/CaffeineCacheFromContext.java | 2 + gradle/dependencies.gradle | 6 +-- 9 files changed, 146 insertions(+), 33 deletions(-) diff --git a/caffeine/src/jmh/java/com/github/benmanes/caffeine/profiler/CacheProfiler.java b/caffeine/src/jmh/java/com/github/benmanes/caffeine/profiler/CacheProfiler.java index c643ae05c4..beebd321a3 100644 --- a/caffeine/src/jmh/java/com/github/benmanes/caffeine/profiler/CacheProfiler.java +++ b/caffeine/src/jmh/java/com/github/benmanes/caffeine/profiler/CacheProfiler.java @@ -40,8 +40,15 @@ public final class CacheProfiler extends ProfilerHook { final boolean reads; CacheProfiler() { - ints = new Integer[SIZE]; cache = cacheType.create(2 * SIZE); + + // Ensure full initialization of internal structures + for (int i = 0; i < 2 * SIZE; i++) { + cache.put(i, Boolean.TRUE); + } + cache.clear(); + + ints = new Integer[SIZE]; NumberGenerator generator = new ScrambledZipfianGenerator(ITEMS); for (int i = 0; i < SIZE; i++) { ints[i] = generator.nextValue().intValue(); diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index d162e76fda..e21445f742 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -74,7 +74,7 @@ * for a given configuration are used. * * @author ben.manes@gmail.com (Ben Manes) - * @param the type of keys maintained by this map + * @param the type of keys maintained by this cache * @param the type of mapped values */ @ThreadSafe @@ -727,8 +727,6 @@ void afterRead(Node node, long now, boolean recordHit) { } node.setAccessTime(now); - // fastpath is disabled due to unfavorable benchmarks - // boolean delayable = canFastpath(node) || (readBuffer.offer(node) != Buffer.FULL); boolean delayable = skipReadBuffer() || (readBuffer.offer(node) != Buffer.FULL); if (shouldDrainBuffers(delayable)) { scheduleDrainBuffers(); @@ -751,20 +749,34 @@ void refreshIfNeeded(Node node, long now) { if (!refreshAfterWrite()) { return; } - long writeTime = node.getWriteTime(); - if (((now - writeTime) > refreshAfterWriteNanos()) && node.casWriteTime(writeTime, now)) { + long oldWriteTime = node.getWriteTime(); + long refreshWriteTime = isAsync ? Long.MAX_VALUE : now; + if (((now - oldWriteTime) > refreshAfterWriteNanos()) + && node.casWriteTime(oldWriteTime, refreshWriteTime)) { try { executor().execute(() -> { K key = node.getKey(); if ((key != null) && node.isAlive()) { BiFunction refreshFunction = (k, v) -> { - if (node.getWriteTime() != now) { + if (node.getWriteTime() != refreshWriteTime) { return v; } try { + if (isAsync) { + @SuppressWarnings("unchecked") + V oldValue = ((CompletableFuture) v).join(); + CompletableFuture future = + cacheLoader.asyncReload(key, oldValue, Runnable::run); + if (future.join() == null) { + return null; + } + @SuppressWarnings("unchecked") + V castFuture = (V) future; + return castFuture; + } return cacheLoader.reload(k, v); } catch (Exception e) { - node.setWriteTime(writeTime); + node.setWriteTime(oldWriteTime); return LocalCache.throwUnchecked(e); } }; @@ -2916,26 +2928,40 @@ static final class BoundedLocalAsyncLoadingCache @SuppressWarnings("unchecked") BoundedLocalAsyncLoadingCache(Caffeine builder, AsyncCacheLoader loader) { - super(LocalCacheFactory.newBoundedLocalCache((Caffeine>) builder, - asyncLoader(loader, builder), true), loader); + super((BoundedLocalCache>) LocalCacheFactory.newBoundedLocalCache( + builder, asyncLoader(loader, builder), true), loader); isWeighted = builder.isWeighted(); } - private static CacheLoader> asyncLoader( + private static CacheLoader asyncLoader( AsyncCacheLoader loader, Caffeine builder) { Executor executor = builder.getExecutor(); - return key -> loader.asyncLoad(key, executor); + return new CacheLoader() { + @Override public V load(K key) { + @SuppressWarnings("unchecked") + V newValue = (V) loader.asyncLoad(key, executor); + return newValue; + } + @Override public V reload(K key, V oldValue) { + @SuppressWarnings("unchecked") + V newValue = (V) loader.asyncReload(key, oldValue, executor); + return newValue; + } + @Override public CompletableFuture asyncReload(K key, V oldValue, Executor executor) { + return loader.asyncReload(key, oldValue, executor); + } + }; } @Override protected Policy policy() { if (policy == null) { @SuppressWarnings("unchecked") - BoundedLocalCache castedCache = (BoundedLocalCache) cache; + BoundedLocalCache castCache = (BoundedLocalCache) cache; Function, V> transformer = Async::getIfReady; @SuppressWarnings("unchecked") - Function castedTransformer = (Function) transformer; - policy = new BoundedPolicy<>(castedCache, castedTransformer, isWeighted); + Function castTransformer = (Function) transformer; + policy = new BoundedPolicy<>(castCache, castTransformer, isWeighted); } return policy; } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java index 7ddf3d685d..773b6e663c 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java @@ -53,6 +53,8 @@ final class UnboundedLocalCache implements LocalCache { @Nullable final RemovalListener removalListener; final ConcurrentHashMap data; + final StatsCounter statsCounter; + final boolean isRecordingStats; final CacheWriter writer; final Executor executor; final Ticker ticker; @@ -61,9 +63,6 @@ final class UnboundedLocalCache implements LocalCache { transient Collection values; transient Set> entrySet; - boolean isRecordingStats; - StatsCounter statsCounter; - UnboundedLocalCache(Caffeine builder, boolean async) { this.data = new ConcurrentHashMap<>(builder.getInitialCapacity()); this.statsCounter = builder.getStatsCounterSupplier().get(); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/AsyncLoadingCacheTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/AsyncLoadingCacheTest.java index a1db76eb98..3b63f27801 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/AsyncLoadingCacheTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/AsyncLoadingCacheTest.java @@ -21,6 +21,7 @@ import static com.github.benmanes.caffeine.cache.testing.HasStats.hasLoadFailureCount; import static com.github.benmanes.caffeine.cache.testing.HasStats.hasLoadSuccessCount; import static com.github.benmanes.caffeine.cache.testing.HasStats.hasMissCount; +import static com.github.benmanes.caffeine.testing.Awaits.await; import static com.github.benmanes.caffeine.testing.IsFutureValue.futureOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.both; @@ -30,6 +31,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; import java.util.Collections; import java.util.HashMap; @@ -49,10 +51,13 @@ import com.github.benmanes.caffeine.cache.testing.CacheProvider; import com.github.benmanes.caffeine.cache.testing.CacheSpec; import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheExecutor; +import com.github.benmanes.caffeine.cache.testing.CacheSpec.Compute; import com.github.benmanes.caffeine.cache.testing.CacheSpec.ExecutorFailure; +import com.github.benmanes.caffeine.cache.testing.CacheSpec.Implementation; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Listener; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Loader; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Population; +import com.github.benmanes.caffeine.cache.testing.CacheSpec.ReferenceType; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Writer; import com.github.benmanes.caffeine.cache.testing.CacheValidationListener; import com.github.benmanes.caffeine.cache.testing.CheckNoStats; @@ -682,6 +687,32 @@ public void put_replace(AsyncLoadingCache cache, CacheContext assertThat(cache, hasRemovalNotifications(context, count, RemovalCause.REPLACED)); } + /* ---------------- refresh -------------- */ + + @Test(dataProvider = "caches") + @CacheSpec(implementation = Implementation.Caffeine, population = Population.EMPTY, + executor = CacheExecutor.SINGLE, compute = Compute.ASYNC, values = ReferenceType.STRONG) + public void refresh(Caffeine builder, CacheContext context) { + AtomicBoolean done = new AtomicBoolean(); + AsyncLoadingCache cache = builder.buildAsync(key -> { + await().untilTrue(done); + return -key; + }); + + Integer key = 1; + cache.synchronous().put(key, key); + CompletableFuture original = cache.get(key); + for (int i = 0; i < 10; i++) { + context.ticker().advance(1, TimeUnit.SECONDS); + cache.synchronous().refresh(key); + + CompletableFuture next = cache.get(key); + assertThat(next, is(sameInstance(original))); + } + done.set(true); + await().until(() -> cache.synchronous().getIfPresent(key), is(-key)); + } + /* ---------------- serialize -------------- */ @Test(dataProvider = "caches") diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java index c83750bd4d..be27f04e08 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java @@ -16,18 +16,23 @@ package com.github.benmanes.caffeine.cache; import static com.github.benmanes.caffeine.cache.testing.HasRemovalNotifications.hasRemovalNotifications; +import static com.github.benmanes.caffeine.testing.Awaits.await; import static com.github.benmanes.caffeine.testing.IsEmptyMap.emptyMap; import static com.github.benmanes.caffeine.testing.IsFutureValue.futureOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import org.testng.annotations.Listeners; @@ -38,11 +43,14 @@ import com.github.benmanes.caffeine.cache.testing.CacheProvider; import com.github.benmanes.caffeine.cache.testing.CacheSpec; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Advance; +import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheExecutor; +import com.github.benmanes.caffeine.cache.testing.CacheSpec.Compute; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Expire; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Implementation; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Listener; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Loader; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Population; +import com.github.benmanes.caffeine.cache.testing.CacheSpec.ReferenceType; import com.github.benmanes.caffeine.cache.testing.CacheValidationListener; import com.github.benmanes.caffeine.cache.testing.CheckNoWriter; import com.github.benmanes.caffeine.cache.testing.RefreshAfterWrite; @@ -159,6 +167,38 @@ public void get(AsyncLoadingCache cache, CacheContext context) assertThat(cache, hasRemovalNotifications(context, 1, RemovalCause.REPLACED)); } + @Test(dataProvider = "caches") + @CacheSpec(implementation = Implementation.Caffeine, population = Population.EMPTY, + refreshAfterWrite = Expire.ONE_MINUTE, executor = CacheExecutor.SINGLE, + compute = Compute.ASYNC, values = ReferenceType.STRONG) + public void get_sameFuture(Caffeine builder, CacheContext context) { + AtomicBoolean done = new AtomicBoolean(); + AsyncLoadingCache cache = builder.buildAsync(key -> { + await().untilTrue(done); + return -key; + }); + + Integer key = 1; + cache.synchronous().put(key, key); + CompletableFuture original = cache.get(key); + for (int i = 0; i < 10; i++) { + context.ticker().advance(1, TimeUnit.MINUTES); + CompletableFuture next = cache.get(key); + assertThat(next, is(sameInstance(original))); + } + done.set(true); + await().until(() -> cache.synchronous().getIfPresent(key), is(-key)); + } + + @Test(dataProvider = "caches") + @CacheSpec(refreshAfterWrite = Expire.ONE_MINUTE, loader = Loader.NULL) + public void get_null(AsyncLoadingCache cache, CacheContext context) { + Integer key = 1; + cache.synchronous().put(key, key); + context.ticker().advance(2, TimeUnit.MINUTES); + await().until(() -> cache.synchronous().getIfPresent(key), is(nullValue())); + } + @CheckNoWriter @Test(dataProvider = "caches") @CacheSpec(refreshAfterWrite = Expire.ONE_MINUTE, loader = Loader.IDENTITY, diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheContext.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheContext.java index b6122d844d..5923426de8 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheContext.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheContext.java @@ -33,6 +33,7 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.CacheWriter; +import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.RemovalListener; import com.github.benmanes.caffeine.cache.stats.CacheStats; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Advance; @@ -85,6 +86,7 @@ public final class CacheContext { final boolean isAsyncLoading; Cache cache; + Caffeine builder; AsyncLoadingCache asyncCache; @Nullable Integer firstKey; diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheProvider.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheProvider.java index c44857973c..1d5116de7d 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheProvider.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheProvider.java @@ -33,6 +33,7 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import com.github.benmanes.caffeine.cache.Policy; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Compute; @@ -83,25 +84,30 @@ private static Iterator asTestCases(Method testMethod, Parameter[] parameters = testMethod.getParameters(); CacheContext[] stashed = new CacheContext[1]; return scenarios.map(entry -> { + CacheContext context = entry.getKey(); + Cache cache = entry.getValue(); + // Retain a strong reference to the context throughout the test execution so that the // cache entries are not collected due to the test not accepting the context parameter - stashed[0] = entry.getKey(); + stashed[0] = context; Object[] params = new Object[parameters.length]; for (int i = 0; i < params.length; i++) { Class clazz = parameters[i].getType(); if (clazz.isAssignableFrom(CacheContext.class)) { - params[i] = entry.getKey(); - } else if (clazz.isAssignableFrom(entry.getValue().getClass())) { - params[i] = entry.getValue(); // Cache or LoadingCache + params[i] = context; + } else if (clazz.isAssignableFrom(Caffeine.class)) { + params[i] = context.builder; + } else if (clazz.isAssignableFrom(cache.getClass())) { + params[i] = cache; } else if (clazz.isAssignableFrom(AsyncLoadingCache.class)) { - params[i] = entry.getKey().asyncCache; + params[i] = context.asyncCache; } else if (clazz.isAssignableFrom(Map.class)) { - params[i] = entry.getValue().asMap(); + params[i] = cache.asMap(); } else if (clazz.isAssignableFrom(Policy.Eviction.class)) { - params[i] = entry.getValue().policy().eviction().get(); + params[i] = cache.policy().eviction().get(); } else if (clazz.isAssignableFrom(Policy.Expiration.class)) { - params[i] = expirationPolicy(parameters[i], entry); + params[i] = expirationPolicy(parameters[i], cache); } if (params[i] == null) { checkNotNull(params[i], "Unknown parameter type: %s", clazz); @@ -113,13 +119,13 @@ private static Iterator asTestCases(Method testMethod, /** Returns the expiration policy for the given parameter. */ private static Policy.Expiration expirationPolicy( - Parameter parameter, Entry> entry) { + Parameter parameter, Cache cache) { if (parameter.isAnnotationPresent(ExpireAfterAccess.class)) { - return entry.getValue().policy().expireAfterAccess().get(); + return cache.policy().expireAfterAccess().get(); } else if (parameter.isAnnotationPresent(ExpireAfterWrite.class)) { - return entry.getValue().policy().expireAfterWrite().get(); + return cache.policy().expireAfterWrite().get(); } else if (parameter.isAnnotationPresent(RefreshAfterWrite.class)) { - return entry.getValue().policy().refreshAfterWrite().get(); + return cache.policy().refreshAfterWrite().get(); } throw new AssertionError("Expiration parameter must have a qualifier annotation"); } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CaffeineCacheFromContext.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CaffeineCacheFromContext.java index 206aefc1fe..09bc6b64c9 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CaffeineCacheFromContext.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CaffeineCacheFromContext.java @@ -39,6 +39,8 @@ private CaffeineCacheFromContext() {} public static Cache newCaffeineCache(CacheContext context) { Caffeine builder = Caffeine.newBuilder(); + context.builder = builder; + if (context.initialCapacity != InitialCapacity.DEFAULT) { builder.initialCapacity(context.initialCapacity.size()); } diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 62997896e8..c2d3569d0f 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -37,7 +37,7 @@ ext { jsr305: '3.0.1', stream: '2.9.1', univocity_parsers: '2.0.0', - ycsb: '0.7.0-RC2', + ycsb: '0.7.0', ] test_versions = [ awaitility: '1.7.0', @@ -62,11 +62,11 @@ ext { jamm: '0.3.1', java_object_layout: '0.4', koloboke: '0.6.8', - slf4j: '1.7.16', + slf4j: '1.7.18', tcache: '0.9.0', ] plugin_versions = [ - checkstyle: '6.15', + checkstyle: '6.16', coveralls: '2.6.3', extra_conf: '3.0.3', error_prone: '0.0.8',