Skip to content

Commit

Permalink
Avoid notifying the removal listener for no-op replacements (fixes #593)
Browse files Browse the repository at this point in the history
When a write operation replaces an entry's value with the same instance
then a removal notification should not be published. Otherwise the
listener may perform some cleanup work by incorrectly assuming that the
value is no longer in the cache. That may be a resource being closed. We
deviate from Guava in this regard which overlooked that case and our
semantics are documented in the migration guide.

Unfortunately there were cases when this suppression was not performed.
When the entry is refreshed or if using an async cache then some cases
might notify, while others would not. Note that an equal but different
instance will be notified on as suppression is identity based.

The additional tests uncovered a bug with the Map.entrySet()'s
contains(o) method for weak/soft values. This used object equality, but
should have used identity to be consistent for reference caching. This
behavior now matches Guava.
  • Loading branch information
ben-manes committed Sep 12, 2021
1 parent 4f6ec75 commit c42ab53
Show file tree
Hide file tree
Showing 20 changed files with 709 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
Expand Down Expand Up @@ -261,7 +260,12 @@ protected BoundedLocalCache(Caffeine<K, V> builder,

/* --------------- Shared --------------- */

/** Returns if the node's value is currently being computed, asynchronously. */
@Override
public boolean isAsync() {
return isAsync;
}

/** Returns if the node's value is currently being computed asynchronously. */
final boolean isComputingAsync(Node<?, ?> node) {
return isAsync && !Async.isReady((CompletableFuture<?>) node.getValue());
}
Expand Down Expand Up @@ -1243,8 +1247,8 @@ void refreshIfNeeded(Node<K, V> node, long now) {
Object keyReference = node.getKeyReference();
if (((now - writeTime) > refreshAfterWriteNanos()) && (keyReference != null)
&& ((key = node.getKey()) != null) && ((oldValue = node.getValue()) != null)
&& !refreshes().containsKey(keyReference)
&& ((writeTime & 1L) == 0L) && node.casWriteTime(writeTime, refreshWriteTime)) {
&& ((writeTime & 1L) == 0L) && !refreshes().containsKey(keyReference)
&& node.casWriteTime(writeTime, refreshWriteTime)) {
long[] startTime = new long[1];
@SuppressWarnings({"unchecked", "rawtypes"})
CompletableFuture<? extends V>[] refreshFuture = new CompletableFuture[1];
Expand Down Expand Up @@ -1298,14 +1302,21 @@ void refreshIfNeeded(Node<K, V> node, long now) {
boolean[] discard = new boolean[1];
compute(key, (k, currentValue) -> {
if (currentValue == null) {
if (value == null) {
return null;
} else if (refreshes().get(key) == refreshFuture[0]) {
return value;
}
// If the entry is absent then discard the refresh and maybe notifying the listener
discard[0] = (value != null);
return null;
} else if (currentValue == value) {
// If the reloaded value is the same instance then no-op
return currentValue;
} else if (isAsync &&
(newValue == Async.getIfReady((CompletableFuture<?>) currentValue))) {
// If the completed futures hold the same value instance then no-op
return currentValue;
} else if ((currentValue == oldValue) && (node.getWriteTime() == writeTime)) {
// If the entry was not modified while in-flight (no ABA) then replace
return value;
}
// Otherwise, a write invalidated the refresh so discard it and notify the listener
discard[0] = true;
return currentValue;
}, expiry(), /* recordMiss */ false,
Expand Down Expand Up @@ -2178,8 +2189,8 @@ public Map<K, V> getAllPresent(Iterable<? extends K> keys) {
notifyRemoval(key, oldValue, RemovalCause.EXPIRED);
} else if (oldValue == null) {
notifyRemoval(key, /* oldValue */ null, RemovalCause.COLLECTED);
} else if (mayUpdate && (value != oldValue)) {
notifyRemoval(key, oldValue, RemovalCause.REPLACED);
} else if (mayUpdate) {
notifyOnReplace(key, oldValue, value);
}

int weightedDifference = mayUpdate ? (newWeight - oldWeight) : 0;
Expand Down Expand Up @@ -2330,9 +2341,7 @@ public boolean remove(Object key, Object value) {
afterRead(node, now[0], /* recordHit */ false);
}

if (value != oldValue[0]) {
notifyRemoval(nodeKey[0], oldValue[0], RemovalCause.REPLACED);
}
notifyOnReplace(nodeKey[0], oldValue[0], value);
return oldValue[0];
}

Expand Down Expand Up @@ -2384,9 +2393,7 @@ public boolean replace(K key, V oldValue, V newValue) {
afterRead(node, now[0], /* recordHit */ false);
}

if (oldValue != newValue) {
notifyRemoval(nodeKey[0], prevValue[0], RemovalCause.REPLACED);
}
notifyOnReplace(nodeKey[0], prevValue[0], newValue);
return true;
}

Expand Down Expand Up @@ -2500,8 +2507,14 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
return null;
}
if (cause[0] != null) {
notifyRemoval(nodeKey[0], oldValue[0], cause[0]);
statsCounter().recordEviction(weight[0], cause[0]);
if (cause[0] == RemovalCause.REPLACED) {
notifyOnReplace(key, oldValue[0], newValue[0]);
} else {
if (cause[0].wasEvicted()) {
statsCounter().recordEviction(weight[0], cause[0]);
}
notifyRemoval(nodeKey[0], oldValue[0], cause[0]);
}
}
if (newValue[0] == null) {
if (!isComputingAsync(node)) {
Expand Down Expand Up @@ -2675,10 +2688,14 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
});

if (cause[0] != null) {
if (cause[0].wasEvicted()) {
statsCounter().recordEviction(weight[0], cause[0]);
if (cause[0] == RemovalCause.REPLACED) {
notifyOnReplace(key, oldValue[0], newValue[0]);
} else {
if (cause[0].wasEvicted()) {
statsCounter().recordEviction(weight[0], cause[0]);
}
notifyRemoval(nodeKey[0], oldValue[0], cause[0]);
}
notifyRemoval(nodeKey[0], oldValue[0], cause[0]);
}

if (removed[0] != null) {
Expand Down Expand Up @@ -3188,7 +3205,7 @@ public boolean contains(Object obj) {
}
var entry = (Entry<?, ?>) obj;
Node<K, V> node = cache.data.get(cache.nodeFactory.newLookupKey(entry.getKey()));
return (node != null) && Objects.equals(node.getValue(), entry.getValue());
return (node != null) && node.containsValue(entry.getValue());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public CompletableFuture<Map<K, V>> refreshAll(Iterable<? extends K> keys) {
var castedFuture = (CompletableFuture<V>) future;
if (refreshed[0]) {
castedFuture.whenComplete((newValue, error) -> {
boolean removed = asyncCache.cache().refreshes().remove(keyReference, castedFuture);
asyncCache.cache().refreshes().remove(keyReference, castedFuture);
long loadTime = asyncCache.cache().statsTicker().read() - startTime[0];
if (error != null) {
logger.log(Level.WARNING, "Exception thrown during refresh", error);
Expand All @@ -290,12 +290,17 @@ public CompletableFuture<Map<K, V>> refreshAll(Iterable<? extends K> keys) {
var value = asyncCache.cache().compute(key, (ignored, currentValue) -> {
if (currentValue == oldValueFuture[0]) {
if (currentValue == null) {
if (newValue == null) {
return null;
} else if (removed) {
return castedFuture;
}
// If the entry is absent then discard the refresh and maybe notifying the listener
discard[0] = (newValue != null);
return null;
} else if (currentValue == newValue) {
// If the reloaded value is the same instance then no-op
return currentValue;
} else if (newValue == Async.getIfReady((CompletableFuture<?>) currentValue)) {
// If the completed futures hold the same value instance then no-op
return currentValue;
} else {
// If the entry was not modified while in-flight (no ABA) then replace
long expectedWriteTime = writeTime[0];
if (asyncCache.cache().hasWriteTime()) {
asyncCache.cache().getIfPresentQuietly(key, writeTime);
Expand All @@ -305,6 +310,7 @@ public CompletableFuture<Map<K, V>> refreshAll(Iterable<? extends K> keys) {
}
}
}
// Otherwise, a write invalidated the refresh so discard it and notify the listener
discard[0] = true;
return currentValue;
}, asyncCache.cache().expiry(), /* recordMiss */ false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
*/
interface LocalCache<K, V> extends ConcurrentMap<K, V> {

/** Returns whether this cache is asynchronous. */
boolean isAsync();

/** Returns whether this cache has statistics enabled. */
boolean isRecordingStats();

Expand Down Expand Up @@ -121,6 +124,30 @@ default void invalidateAll(Iterable<?> keys) {
/** See {@link Cache#cleanUp}. */
void cleanUp();

/** Notify the removal listener of a replacement if the value reference was changed. */
@SuppressWarnings("FutureReturnValueIgnored")
default void notifyOnReplace(K key, V oldValue, V newValue) {
if ((oldValue == null) || (oldValue == newValue)) {
return;
} else if (isAsync()) {
var oldFuture = (CompletableFuture<?>) oldValue;
var newFuture = (CompletableFuture<?>) newValue;
newFuture.whenCompleteAsync((nv, e) -> {
if (e == null) {
oldFuture.thenAcceptAsync(ov -> {
if (nv != ov) {
notifyRemoval(key, oldValue, RemovalCause.REPLACED);
}
}, executor());
} else {
notifyRemoval(key, oldValue, RemovalCause.REPLACED);
}
}, executor());
} else {
notifyRemoval(key, oldValue, RemovalCause.REPLACED);
}
}

/** Decorates the remapping function to record statistics if enabled. */
default <T, R> Function<? super T, ? extends R> statsAware(
Function<? super T, ? extends R> mappingFunction, boolean recordLoad) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ default CompletableFuture<V> refresh(K key) {
}
}
}
discard[0] = true;
discard[0] = (currentValue != newValue);
return currentValue;
}, cache().expiry(), /* recordMiss */ false,
/* recordLoad */ false, /* recordLoadFailure */ true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,22 @@ final class UnboundedLocalCache<K, V> implements LocalCache<K, V> {
final StatsCounter statsCounter;
final boolean isRecordingStats;
final Executor executor;
final boolean isAsync;
final Ticker ticker;

@Nullable Set<K> keySet;
@Nullable Collection<V> values;
@Nullable Set<Entry<K, V>> entrySet;
@Nullable volatile ConcurrentMap<Object, CompletableFuture<?>> refreshes;

UnboundedLocalCache(Caffeine<? super K, ? super V> builder, boolean async) {
UnboundedLocalCache(Caffeine<? super K, ? super V> builder, boolean isAsync) {
this.data = new ConcurrentHashMap<>(builder.getInitialCapacity());
this.statsCounter = builder.getStatsCounterSupplier().get();
this.removalListener = builder.getRemovalListener(async);
this.removalListener = builder.getRemovalListener(isAsync);
this.isRecordingStats = builder.isRecordingStats();
this.executor = builder.getExecutor();
this.ticker = builder.getTicker();
this.isAsync = isAsync;
}

static {
Expand All @@ -90,6 +92,11 @@ final class UnboundedLocalCache<K, V> implements LocalCache<K, V> {
}
}

@Override
public boolean isAsync() {
return isAsync;
}

@Override
@SuppressWarnings("NullAway")
public Expiry<K, V> expiry() {
Expand Down Expand Up @@ -308,22 +315,24 @@ public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction
// ensures that the removal notification is processed after the removal has completed
@SuppressWarnings({"unchecked", "rawtypes"})
V[] oldValue = (V[]) new Object[1];
RemovalCause[] cause = new RemovalCause[1];
boolean[] replaced = new boolean[1];
V nv = data.computeIfPresent(key, (K k, V value) -> {
BiFunction<? super K, ? super V, ? extends V> function = statsAware(remappingFunction,
/* recordMiss */ false, /* recordLoad */ true, /* recordLoadFailure */ true);
V newValue = function.apply(k, value);

cause[0] = (newValue == null) ? RemovalCause.EXPLICIT : RemovalCause.REPLACED;
replaced[0] = (newValue != null);
if (newValue != value) {
oldValue[0] = value;
}

discardRefresh(k);
return newValue;
});
if (oldValue[0] != null) {
notifyRemoval(key, oldValue[0], cause[0]);
if (replaced[0]) {
notifyOnReplace(key, oldValue[0], nv);
} else if (oldValue[0] != null) {
notifyRemoval(key, oldValue[0], RemovalCause.EXPLICIT);
}
return nv;
}
Expand Down Expand Up @@ -356,23 +365,25 @@ V remap(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction)
// ensures that the removal notification is processed after the removal has completed
@SuppressWarnings({"unchecked", "rawtypes"})
V[] oldValue = (V[]) new Object[1];
RemovalCause[] cause = new RemovalCause[1];
boolean[] replaced = new boolean[1];
V nv = data.compute(key, (K k, V value) -> {
V newValue = remappingFunction.apply(k, value);
if ((value == null) && (newValue == null)) {
return null;
}

cause[0] = (newValue == null) ? RemovalCause.EXPLICIT : RemovalCause.REPLACED;
replaced[0] = (newValue != null);
if ((value != null) && (newValue != value)) {
oldValue[0] = value;
}

discardRefresh(k);
return newValue;
});
if (oldValue[0] != null) {
notifyRemoval(key, oldValue[0], cause[0]);
if (replaced[0]) {
notifyOnReplace(key, oldValue[0], nv);
} else if (oldValue[0] != null) {
notifyRemoval(key, oldValue[0], RemovalCause.EXPLICIT);
}
return nv;
}
Expand Down Expand Up @@ -421,9 +432,7 @@ public boolean containsValue(Object value) {

// ensures that the removal notification is processed after the removal has completed
V oldValue = data.put(key, value);
if ((oldValue != null) && (oldValue != value)) {
notifyRemoval(key, oldValue, RemovalCause.REPLACED);
}
notifyOnReplace(key, oldValue, value);
return oldValue;
}

Expand Down
Loading

0 comments on commit c42ab53

Please sign in to comment.