Skip to content

Commit

Permalink
Fix async refresh from prematurely replacing the value
Browse files Browse the repository at this point in the history
When a refresh-after-write is triggered the entry should be reloaded in
the background, the present value available for reads, and atomically
replaced when the new value has been loaded. This operation is atomic
with other writes to that entry and is blocking (rather than clobbering)
if another write is attempted.

This should work the same for synchronous and asynchronous caches, but
unfortunately it wasn't. For an asycn cache a new, incomplete future
was immediately put into the cache and available to be consumed by the
next request. Due to layering, the custom reloadAsync was not called and
the operation delegated to load instead. This was of course wrong and
not the intended (or expected) behavior, so it is now fixed.

Thanks to Etienne Houle @ Stingray for notifying me of this problem.
  • Loading branch information
ben-manes committed Feb 29, 2016
1 parent 92f9c59 commit 49fed1e
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,15 @@ public final class CacheProfiler extends ProfilerHook {
final boolean reads;

CacheProfiler() {
ints = new Integer[SIZE];
cache = cacheType.create(2 * SIZE);

// Ensure full initialization of internal structures
for (int i = 0; i < 2 * SIZE; i++) {
cache.put(i, Boolean.TRUE);
}
cache.clear();

ints = new Integer[SIZE];
NumberGenerator generator = new ScrambledZipfianGenerator(ITEMS);
for (int i = 0; i < SIZE; i++) {
ints[i] = generator.nextValue().intValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
* for a given configuration are used.
*
* @author [email protected] (Ben Manes)
* @param <K> the type of keys maintained by this map
* @param <K> the type of keys maintained by this cache
* @param <V> the type of mapped values
*/
@ThreadSafe
Expand Down Expand Up @@ -727,8 +727,6 @@ void afterRead(Node<K, V> node, long now, boolean recordHit) {
}
node.setAccessTime(now);

// fastpath is disabled due to unfavorable benchmarks
// boolean delayable = canFastpath(node) || (readBuffer.offer(node) != Buffer.FULL);
boolean delayable = skipReadBuffer() || (readBuffer.offer(node) != Buffer.FULL);
if (shouldDrainBuffers(delayable)) {
scheduleDrainBuffers();
Expand All @@ -751,20 +749,34 @@ void refreshIfNeeded(Node<K, V> node, long now) {
if (!refreshAfterWrite()) {
return;
}
long writeTime = node.getWriteTime();
if (((now - writeTime) > refreshAfterWriteNanos()) && node.casWriteTime(writeTime, now)) {
long oldWriteTime = node.getWriteTime();
long refreshWriteTime = isAsync ? Long.MAX_VALUE : now;
if (((now - oldWriteTime) > refreshAfterWriteNanos())
&& node.casWriteTime(oldWriteTime, refreshWriteTime)) {
try {
executor().execute(() -> {
K key = node.getKey();
if ((key != null) && node.isAlive()) {
BiFunction<? super K, ? super V, ? extends V> refreshFunction = (k, v) -> {
if (node.getWriteTime() != now) {
if (node.getWriteTime() != refreshWriteTime) {
return v;
}
try {
if (isAsync) {
@SuppressWarnings("unchecked")
V oldValue = ((CompletableFuture<V>) v).join();
CompletableFuture<V> future =
cacheLoader.asyncReload(key, oldValue, Runnable::run);
if (future.join() == null) {
return null;
}
@SuppressWarnings("unchecked")
V castFuture = (V) future;
return castFuture;
}
return cacheLoader.reload(k, v);
} catch (Exception e) {
node.setWriteTime(writeTime);
node.setWriteTime(oldWriteTime);
return LocalCache.throwUnchecked(e);
}
};
Expand Down Expand Up @@ -2916,26 +2928,40 @@ static final class BoundedLocalAsyncLoadingCache<K, V>

@SuppressWarnings("unchecked")
BoundedLocalAsyncLoadingCache(Caffeine<K, V> builder, AsyncCacheLoader<? super K, V> loader) {
super(LocalCacheFactory.newBoundedLocalCache((Caffeine<K, CompletableFuture<V>>) builder,
asyncLoader(loader, builder), true), loader);
super((BoundedLocalCache<K, CompletableFuture<V>>) LocalCacheFactory.newBoundedLocalCache(
builder, asyncLoader(loader, builder), true), loader);
isWeighted = builder.isWeighted();
}

private static <K, V> CacheLoader<? super K, CompletableFuture<V>> asyncLoader(
private static <K, V> CacheLoader<K, V> asyncLoader(
AsyncCacheLoader<? super K, V> loader, Caffeine<?, ?> builder) {
Executor executor = builder.getExecutor();
return key -> loader.asyncLoad(key, executor);
return new CacheLoader<K, V>() {
@Override public V load(K key) {
@SuppressWarnings("unchecked")
V newValue = (V) loader.asyncLoad(key, executor);
return newValue;
}
@Override public V reload(K key, V oldValue) {
@SuppressWarnings("unchecked")
V newValue = (V) loader.asyncReload(key, oldValue, executor);
return newValue;
}
@Override public CompletableFuture<V> asyncReload(K key, V oldValue, Executor executor) {
return loader.asyncReload(key, oldValue, executor);
}
};
}

@Override
protected Policy<K, V> policy() {
if (policy == null) {
@SuppressWarnings("unchecked")
BoundedLocalCache<K, V> castedCache = (BoundedLocalCache<K, V>) cache;
BoundedLocalCache<K, V> castCache = (BoundedLocalCache<K, V>) cache;
Function<CompletableFuture<V>, V> transformer = Async::getIfReady;
@SuppressWarnings("unchecked")
Function<V, V> castedTransformer = (Function<V, V>) transformer;
policy = new BoundedPolicy<>(castedCache, castedTransformer, isWeighted);
Function<V, V> castTransformer = (Function<V, V>) transformer;
policy = new BoundedPolicy<>(castCache, castTransformer, isWeighted);
}
return policy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
final class UnboundedLocalCache<K, V> implements LocalCache<K, V> {
@Nullable final RemovalListener<K, V> removalListener;
final ConcurrentHashMap<K, V> data;
final StatsCounter statsCounter;
final boolean isRecordingStats;
final CacheWriter<K, V> writer;
final Executor executor;
final Ticker ticker;
Expand All @@ -61,9 +63,6 @@ final class UnboundedLocalCache<K, V> implements LocalCache<K, V> {
transient Collection<V> values;
transient Set<Entry<K, V>> entrySet;

boolean isRecordingStats;
StatsCounter statsCounter;

UnboundedLocalCache(Caffeine<? super K, ? super V> builder, boolean async) {
this.data = new ConcurrentHashMap<>(builder.getInitialCapacity());
this.statsCounter = builder.getStatsCounterSupplier().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.github.benmanes.caffeine.cache.testing.HasStats.hasLoadFailureCount;
import static com.github.benmanes.caffeine.cache.testing.HasStats.hasLoadSuccessCount;
import static com.github.benmanes.caffeine.cache.testing.HasStats.hasMissCount;
import static com.github.benmanes.caffeine.testing.Awaits.await;
import static com.github.benmanes.caffeine.testing.IsFutureValue.futureOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.both;
Expand All @@ -30,6 +31,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;

import java.util.Collections;
import java.util.HashMap;
Expand All @@ -49,10 +51,13 @@
import com.github.benmanes.caffeine.cache.testing.CacheProvider;
import com.github.benmanes.caffeine.cache.testing.CacheSpec;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheExecutor;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Compute;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.ExecutorFailure;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Implementation;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Listener;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Loader;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Population;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.ReferenceType;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Writer;
import com.github.benmanes.caffeine.cache.testing.CacheValidationListener;
import com.github.benmanes.caffeine.cache.testing.CheckNoStats;
Expand Down Expand Up @@ -682,6 +687,32 @@ public void put_replace(AsyncLoadingCache<Integer, Integer> cache, CacheContext
assertThat(cache, hasRemovalNotifications(context, count, RemovalCause.REPLACED));
}

/* ---------------- refresh -------------- */

@Test(dataProvider = "caches")
@CacheSpec(implementation = Implementation.Caffeine, population = Population.EMPTY,
executor = CacheExecutor.SINGLE, compute = Compute.ASYNC, values = ReferenceType.STRONG)
public void refresh(Caffeine<Integer, Integer> builder, CacheContext context) {
AtomicBoolean done = new AtomicBoolean();
AsyncLoadingCache<Integer, Integer> cache = builder.buildAsync(key -> {
await().untilTrue(done);
return -key;
});

Integer key = 1;
cache.synchronous().put(key, key);
CompletableFuture<Integer> original = cache.get(key);
for (int i = 0; i < 10; i++) {
context.ticker().advance(1, TimeUnit.SECONDS);
cache.synchronous().refresh(key);

CompletableFuture<Integer> next = cache.get(key);
assertThat(next, is(sameInstance(original)));
}
done.set(true);
await().until(() -> cache.synchronous().getIfPresent(key), is(-key));
}

/* ---------------- serialize -------------- */

@Test(dataProvider = "caches")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,23 @@
package com.github.benmanes.caffeine.cache;

import static com.github.benmanes.caffeine.cache.testing.HasRemovalNotifications.hasRemovalNotifications;
import static com.github.benmanes.caffeine.testing.Awaits.await;
import static com.github.benmanes.caffeine.testing.IsEmptyMap.emptyMap;
import static com.github.benmanes.caffeine.testing.IsFutureValue.futureOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;

import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import org.testng.annotations.Listeners;
Expand All @@ -38,11 +43,14 @@
import com.github.benmanes.caffeine.cache.testing.CacheProvider;
import com.github.benmanes.caffeine.cache.testing.CacheSpec;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Advance;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheExecutor;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Compute;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Expire;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Implementation;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Listener;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Loader;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Population;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.ReferenceType;
import com.github.benmanes.caffeine.cache.testing.CacheValidationListener;
import com.github.benmanes.caffeine.cache.testing.CheckNoWriter;
import com.github.benmanes.caffeine.cache.testing.RefreshAfterWrite;
Expand Down Expand Up @@ -159,6 +167,38 @@ public void get(AsyncLoadingCache<Integer, Integer> cache, CacheContext context)
assertThat(cache, hasRemovalNotifications(context, 1, RemovalCause.REPLACED));
}

@Test(dataProvider = "caches")
@CacheSpec(implementation = Implementation.Caffeine, population = Population.EMPTY,
refreshAfterWrite = Expire.ONE_MINUTE, executor = CacheExecutor.SINGLE,
compute = Compute.ASYNC, values = ReferenceType.STRONG)
public void get_sameFuture(Caffeine<Integer, Integer> builder, CacheContext context) {
AtomicBoolean done = new AtomicBoolean();
AsyncLoadingCache<Integer, Integer> cache = builder.buildAsync(key -> {
await().untilTrue(done);
return -key;
});

Integer key = 1;
cache.synchronous().put(key, key);
CompletableFuture<Integer> original = cache.get(key);
for (int i = 0; i < 10; i++) {
context.ticker().advance(1, TimeUnit.MINUTES);
CompletableFuture<Integer> next = cache.get(key);
assertThat(next, is(sameInstance(original)));
}
done.set(true);
await().until(() -> cache.synchronous().getIfPresent(key), is(-key));
}

@Test(dataProvider = "caches")
@CacheSpec(refreshAfterWrite = Expire.ONE_MINUTE, loader = Loader.NULL)
public void get_null(AsyncLoadingCache<Integer, Integer> cache, CacheContext context) {
Integer key = 1;
cache.synchronous().put(key, key);
context.ticker().advance(2, TimeUnit.MINUTES);
await().until(() -> cache.synchronous().getIfPresent(key), is(nullValue()));
}

@CheckNoWriter
@Test(dataProvider = "caches")
@CacheSpec(refreshAfterWrite = Expire.ONE_MINUTE, loader = Loader.IDENTITY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.CacheWriter;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Advance;
Expand Down Expand Up @@ -85,6 +86,7 @@ public final class CacheContext {
final boolean isAsyncLoading;

Cache<?, ?> cache;
Caffeine<Object, Object> builder;
AsyncLoadingCache<?, ?> asyncCache;

@Nullable Integer firstKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Policy;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Compute;
Expand Down Expand Up @@ -83,25 +84,30 @@ private static Iterator<Object[]> asTestCases(Method testMethod,
Parameter[] parameters = testMethod.getParameters();
CacheContext[] stashed = new CacheContext[1];
return scenarios.map(entry -> {
CacheContext context = entry.getKey();
Cache<Integer, Integer> cache = entry.getValue();

// Retain a strong reference to the context throughout the test execution so that the
// cache entries are not collected due to the test not accepting the context parameter
stashed[0] = entry.getKey();
stashed[0] = context;

Object[] params = new Object[parameters.length];
for (int i = 0; i < params.length; i++) {
Class<?> clazz = parameters[i].getType();
if (clazz.isAssignableFrom(CacheContext.class)) {
params[i] = entry.getKey();
} else if (clazz.isAssignableFrom(entry.getValue().getClass())) {
params[i] = entry.getValue(); // Cache or LoadingCache
params[i] = context;
} else if (clazz.isAssignableFrom(Caffeine.class)) {
params[i] = context.builder;
} else if (clazz.isAssignableFrom(cache.getClass())) {
params[i] = cache;
} else if (clazz.isAssignableFrom(AsyncLoadingCache.class)) {
params[i] = entry.getKey().asyncCache;
params[i] = context.asyncCache;
} else if (clazz.isAssignableFrom(Map.class)) {
params[i] = entry.getValue().asMap();
params[i] = cache.asMap();
} else if (clazz.isAssignableFrom(Policy.Eviction.class)) {
params[i] = entry.getValue().policy().eviction().get();
params[i] = cache.policy().eviction().get();
} else if (clazz.isAssignableFrom(Policy.Expiration.class)) {
params[i] = expirationPolicy(parameters[i], entry);
params[i] = expirationPolicy(parameters[i], cache);
}
if (params[i] == null) {
checkNotNull(params[i], "Unknown parameter type: %s", clazz);
Expand All @@ -113,13 +119,13 @@ private static Iterator<Object[]> asTestCases(Method testMethod,

/** Returns the expiration policy for the given parameter. */
private static Policy.Expiration<Integer, Integer> expirationPolicy(
Parameter parameter, Entry<CacheContext, Cache<Integer, Integer>> entry) {
Parameter parameter, Cache<Integer, Integer> cache) {
if (parameter.isAnnotationPresent(ExpireAfterAccess.class)) {
return entry.getValue().policy().expireAfterAccess().get();
return cache.policy().expireAfterAccess().get();
} else if (parameter.isAnnotationPresent(ExpireAfterWrite.class)) {
return entry.getValue().policy().expireAfterWrite().get();
return cache.policy().expireAfterWrite().get();
} else if (parameter.isAnnotationPresent(RefreshAfterWrite.class)) {
return entry.getValue().policy().refreshAfterWrite().get();
return cache.policy().refreshAfterWrite().get();
}
throw new AssertionError("Expiration parameter must have a qualifier annotation");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ private CaffeineCacheFromContext() {}

public static <K, V> Cache<K, V> newCaffeineCache(CacheContext context) {
Caffeine<Object, Object> builder = Caffeine.newBuilder();
context.builder = builder;

if (context.initialCapacity != InitialCapacity.DEFAULT) {
builder.initialCapacity(context.initialCapacity.size());
}
Expand Down
6 changes: 3 additions & 3 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ ext {
jsr305: '3.0.1',
stream: '2.9.1',
univocity_parsers: '2.0.0',
ycsb: '0.7.0-RC2',
ycsb: '0.7.0',
]
test_versions = [
awaitility: '1.7.0',
Expand All @@ -62,11 +62,11 @@ ext {
jamm: '0.3.1',
java_object_layout: '0.4',
koloboke: '0.6.8',
slf4j: '1.7.16',
slf4j: '1.7.18',
tcache: '0.9.0',
]
plugin_versions = [
checkstyle: '6.15',
checkstyle: '6.16',
coveralls: '2.6.3',
extra_conf: '3.0.3',
error_prone: '0.0.8',
Expand Down

0 comments on commit 49fed1e

Please sign in to comment.