Skip to content

Commit

Permalink
Return refresh future and ignore redundant refreshes
Browse files Browse the repository at this point in the history
A mapping of in-flight refreshes is now maintained and lazily
initialized if not used. This allows the cache to ignore redundant
requests for reloads, like Guava does. It also removes disablement
of expiration during refresh and resolves an ABA problem if the
entry is modified in an undectectable way. The refresh future can
now be obtained from LoadingCache to chain operations against.

TODO: unit tests for these changes

fixes #143
fixes #193
fixes #236
fixes #282
fixes #322
fixed #373
fixes #467
  • Loading branch information
ben-manes committed Jan 2, 2021
1 parent d1d2b23 commit 5a5e37c
Show file tree
Hide file tree
Showing 8 changed files with 293 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand Down Expand Up @@ -220,10 +221,10 @@ abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef<K, V>
final Executor executor;
final boolean isAsync;

// The collection views
@Nullable transient Set<K> keySet;
@Nullable transient Collection<V> values;
@Nullable transient Set<Entry<K, V>> entrySet;
@Nullable Set<K> keySet;
@Nullable Collection<V> values;
@Nullable Set<Entry<K, V>> entrySet;
AtomicReference<ConcurrentMap<Object, CompletableFuture<?>>> refreshes;

/** Creates an instance based on the builder's configuration. */
protected BoundedLocalCache(Caffeine<K, V> builder,
Expand All @@ -233,6 +234,7 @@ protected BoundedLocalCache(Caffeine<K, V> builder,
executor = builder.getExecutor();
writer = builder.getCacheWriter();
evictionLock = new ReentrantLock();
refreshes = new AtomicReference<>();
weigher = builder.getWeigher(isAsync);
drainBuffersTask = new PerformCleanupTask(this);
nodeFactory = NodeFactory.newFactory(builder, isAsync);
Expand Down Expand Up @@ -288,11 +290,29 @@ public final Executor executor() {
return executor;
}

@Override
@SuppressWarnings("NullAway")
public ConcurrentMap<Object, CompletableFuture<?>> refreshes() {
var pending = refreshes.get();
if (pending == null) {
pending = new ConcurrentHashMap<>();
if (!refreshes.compareAndSet(null, pending)) {
pending = refreshes.get();
}
}
return pending;
}

/** Returns whether this cache notifies a writer when an entry is modified. */
protected boolean hasWriter() {
return (writer != CacheWriter.disabledWriter());
}

@Override
public Object referenceKey(K key) {
return nodeFactory.newLookupKey(key);
}

/* --------------- Stats Support --------------- */

@Override
Expand Down Expand Up @@ -899,8 +919,9 @@ boolean evictEntry(Node<K, V> node, RemovalCause cause, long now) {
boolean[] removed = new boolean[1];
boolean[] resurrect = new boolean[1];
RemovalCause[] actualCause = new RemovalCause[1];
Object keyReference = node.getKeyReference();

data.computeIfPresent(node.getKeyReference(), (k, n) -> {
data.computeIfPresent(keyReference, (k, n) -> {
if (n != node) {
return n;
}
Expand Down Expand Up @@ -965,6 +986,12 @@ boolean evictEntry(Node<K, V> node, RemovalCause cause, long now) {

if (removed[0]) {
statsCounter().recordEviction(node.getWeight(), actualCause[0]);

var pending = refreshes.get();
if (pending != null) {
pending.remove(keyReference);
}

if (hasRemovalListener()) {
// Notify the listener only if the entry was evicted. This must be performed as the last
// step during eviction to safe guard against the executor rejecting the notification task.
Expand Down Expand Up @@ -1172,70 +1199,82 @@ void refreshIfNeeded(Node<K, V> node, long now) {
if (!refreshAfterWrite()) {
return;
}

K key;
V oldValue;
long oldWriteTime = node.getWriteTime();
long refreshWriteTime = (now + ASYNC_EXPIRY);
if (((now - oldWriteTime) > refreshAfterWriteNanos())
long writeTime = node.getWriteTime();
Object keyReference = node.getKeyReference();
if (((now - writeTime) > refreshAfterWriteNanos()) && (keyReference != null)
&& ((key = node.getKey()) != null) && ((oldValue = node.getValue()) != null)
&& node.casWriteTime(oldWriteTime, refreshWriteTime)) {
try {
CompletableFuture<V> refreshFuture;
long startTime = statsTicker().read();
if (isAsync) {
@SuppressWarnings("unchecked")
CompletableFuture<V> future = (CompletableFuture<V>) oldValue;
if (Async.isReady(future)) {
@SuppressWarnings("NullAway")
CompletableFuture<V> refresh = future.thenCompose(value ->
cacheLoader.asyncReload(key, value, executor));
refreshFuture = refresh;
} else {
// no-op if load is pending
node.casWriteTime(refreshWriteTime, oldWriteTime);
return;
}
} else {
&& !refreshes().containsKey(keyReference)) {
var reloading = new CompletableFuture<?>[1];
refreshes().computeIfAbsent(keyReference, k -> {
reloading[0] = reload(node, key, oldValue, writeTime);
return reloading[0];
});
if (reloading[0] != null) {
reloading[0].whenComplete((r, e) -> refreshes().remove(keyReference, reloading[0]));
}
}
}

@SuppressWarnings("FutureReturnValueIgnored")
private @Nullable CompletableFuture<V> reload(Node<K, V> node,
K key, V oldValue, long writeTime) {
try {
CompletableFuture<V> refreshFuture;
long startTime = statsTicker().read();
if (isAsync) {
@SuppressWarnings("unchecked")
CompletableFuture<V> future = (CompletableFuture<V>) oldValue;
if (Async.isReady(future)) {
@SuppressWarnings("NullAway")
CompletableFuture<V> refresh = cacheLoader.asyncReload(key, oldValue, executor);
var refresh = cacheLoader.asyncReload(key, future.join(), executor);
refreshFuture = refresh;
} else {
// no-op if load is pending
return future;
}
} else {
@SuppressWarnings("NullAway")
var refresh = cacheLoader.asyncReload(key, oldValue, executor);
refreshFuture = refresh;
}
refreshFuture.whenComplete((newValue, error) -> {
long loadTime = statsTicker().read() - startTime;
if (error != null) {
logger.log(Level.WARNING, "Exception thrown during refresh", error);
statsCounter().recordLoadFailure(loadTime);
return;
}
refreshFuture.whenComplete((newValue, error) -> {
long loadTime = statsTicker().read() - startTime;
if (error != null) {
logger.log(Level.WARNING, "Exception thrown during refresh", error);
node.casWriteTime(refreshWriteTime, oldWriteTime);
statsCounter().recordLoadFailure(loadTime);
return;
}

@SuppressWarnings("unchecked")
V value = (isAsync && (newValue != null)) ? (V) refreshFuture : newValue;

boolean[] discard = new boolean[1];
compute(key, (k, currentValue) -> {
if (currentValue == null) {
return value;
} else if ((currentValue == oldValue) && (node.getWriteTime() == refreshWriteTime)) {
return value;
}
discard[0] = true;
return currentValue;
}, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true);

if (discard[0] && hasRemovalListener()) {
notifyRemoval(key, value, RemovalCause.REPLACED);
}
if (newValue == null) {
statsCounter().recordLoadFailure(loadTime);
} else {
statsCounter().recordLoadSuccess(loadTime);
@SuppressWarnings("unchecked")
V value = (isAsync && (newValue != null)) ? (V) refreshFuture : newValue;

boolean[] discard = new boolean[1];
compute(key, (k, currentValue) -> {
if (currentValue == null) {
return value;
} else if ((currentValue == oldValue) && (node.getWriteTime() == writeTime)) {
return value;
}
});
} catch (Throwable t) {
node.casWriteTime(refreshWriteTime, oldWriteTime);
logger.log(Level.ERROR, "Exception thrown when submitting refresh task", t);
}
discard[0] = true;
return currentValue;
}, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true);

if (discard[0] && hasRemovalListener()) {
notifyRemoval(key, value, RemovalCause.REPLACED);
}
if (newValue == null) {
statsCounter().recordLoadFailure(loadTime);
} else {
statsCounter().recordLoadSuccess(loadTime);
}
});
return refreshFuture;
} catch (Throwable t) {
logger.log(Level.ERROR, "Exception thrown when submitting refresh task", t);
return null;
}
}

Expand Down Expand Up @@ -1782,8 +1821,12 @@ public void clear() {
}

// Discard all entries
for (Node<K, V> node : data.values()) {
removeNode(node, now);
var pending = refreshes.get();
for (var entry : data.entrySet()) {
removeNode(entry.getValue(), now);
if (pending != null) {
pending.remove(entry.getKey());
}
}

// Discard all pending reads
Expand Down Expand Up @@ -2099,8 +2142,9 @@ public Map<K, V> getAllPresent(Iterable<?> keys) {
@SuppressWarnings("unchecked")
V[] oldValue = (V[]) new Object[1];
RemovalCause[] cause = new RemovalCause[1];
Object lookupKey = nodeFactory.newLookupKey(key);

data.computeIfPresent(nodeFactory.newLookupKey(key), (k, n) -> {
data.computeIfPresent(lookupKey, (k, n) -> {
synchronized (n) {
oldValue[0] = n.getValue();
if (oldValue[0] == null) {
Expand All @@ -2118,6 +2162,11 @@ public Map<K, V> getAllPresent(Iterable<?> keys) {
});

if (cause[0] != null) {
var pending = refreshes.get();
if (pending != null) {
pending.remove(lookupKey);
}

afterWrite(new RemovalTask(node[0]));
if (hasRemovalListener()) {
notifyRemoval(castKey, oldValue[0], cause[0]);
Expand All @@ -2140,8 +2189,9 @@ public boolean remove(Object key, Object value) {
@SuppressWarnings("unchecked")
V[] oldValue = (V[]) new Object[1];
RemovalCause[] cause = new RemovalCause[1];
Object lookupKey = nodeFactory.newLookupKey(key);

data.computeIfPresent(nodeFactory.newLookupKey(key), (kR, node) -> {
data.computeIfPresent(lookupKey, (kR, node) -> {
synchronized (node) {
oldKey[0] = node.getKey();
oldValue[0] = node.getValue();
Expand All @@ -2163,7 +2213,13 @@ public boolean remove(Object key, Object value) {

if (removed[0] == null) {
return false;
} else if (hasRemovalListener()) {
}

var pending = refreshes.get();
if (pending != null) {
pending.remove(lookupKey);
}
if (hasRemovalListener()) {
notifyRemoval(oldKey[0], oldValue[0], cause[0]);
}
afterWrite(new RemovalTask(removed[0]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.github.benmanes.caffeine.cache;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

import org.checkerframework.checker.nullness.qual.NonNull;
Expand Down Expand Up @@ -101,9 +102,13 @@ public interface LoadingCache<K, V> extends Cache<K, V> {
* Caches loaded by a {@link CacheLoader} will call {@link CacheLoader#reload} if the cache
* currently contains a value for the {@code key}, and {@link CacheLoader#load} otherwise. Loading
* is asynchronous by delegating to the default executor.
* <p>
* Returns an existing future without doing anything if another thread is currently loading the
* value for {@code key}.
*
* @param key key with which a value may be associated
* @return the future that is loading the value
* @throws NullPointerException if the specified key is null
*/
void refresh(@NonNull K key);
CompletableFuture<V> refresh(@NonNull K key);
}
Loading

0 comments on commit 5a5e37c

Please sign in to comment.