Skip to content

Commit

Permalink
Fix async expiration race for create + read (fixes #298)
Browse files Browse the repository at this point in the history
When the future is in-flight, the expiration is dsabled by using an
infinite timestamp. When finished, a callback updates it through a
no-op write and called expireAfterCreate. If a read comes in after
the future is done but before the timestamp is reset, it was allowed
to set the value. This could cause an ABA race where the infinite
timestamp was read, replaced, and written back. This fix now avoids
that by using guards to skip the read and CAS when updating.
  • Loading branch information
ben-manes committed Feb 23, 2019
1 parent 45feb12 commit 801e532
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,28 @@ private void addLink(String method, String varName) {
private void addVariableTime(String varName) {
MethodSpec getter = MethodSpec.methodBuilder("getVariableTime")
.addModifiers(Modifier.PUBLIC)
.addStatement("return $N", varName)
.addStatement("return $T.UNSAFE.getLong(this, $N)",
UNSAFE_ACCESS, offsetName(varName))
.returns(long.class)
.build();
MethodSpec setter = MethodSpec.methodBuilder("setVariableTime")
.addModifiers(Modifier.PUBLIC)
.addParameter(long.class, varName)
.addStatement("this.$N = $N", varName, varName)
.addStatement("$T.UNSAFE.putLong(this, $N, $N)",
UNSAFE_ACCESS, offsetName(varName), varName)
.build();
MethodSpec cas = MethodSpec.methodBuilder("casVariableTime")
.addModifiers(Modifier.PUBLIC)
.addParameter(long.class, "expect")
.addParameter(long.class, "update")
.returns(boolean.class)
.addStatement("return ($N == $N)\n&& $T.UNSAFE.compareAndSwapLong(this, $N, $N, $N)",
varName, "expect", UNSAFE_ACCESS, offsetName(varName), "expect", "update")
.build();
context.nodeSubtype
.addMethod(getter)
.addMethod(setter);
.addMethod(setter)
.addMethod(cas);
}

private void addAccessExpiration() {
Expand Down Expand Up @@ -132,8 +143,8 @@ private void addRefreshExpiration() {
.addParameter(long.class, "expect")
.addParameter(long.class, "update")
.returns(boolean.class)
.addStatement("return $T.UNSAFE.compareAndSwapLong(this, $N, $N, $N)",
UNSAFE_ACCESS, offsetName("writeTime"), "expect", "update")
.addStatement("return ($N == $N)\n&& $T.UNSAFE.compareAndSwapLong(this, $N, $N, $N)",
"writeTime", "expect", UNSAFE_ACCESS, offsetName("writeTime"), "expect", "update")
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,35 @@ long expireAfterRead(Node<K, V> node, @Nullable K key,
return 0L;
}

/**
* Attempts to update the access time for the entry after a read.
*
* @param node the entry in the page replacement policy
* @param key the key of the entry that was read
* @param value the value of the entry that was read
* @param expiry the calculator for the expiration time
* @param now the current time, in nanoseconds
*/
void tryExpireAfterRead(Node<K, V> node, @Nullable K key,
@Nullable V value, Expiry<K, V> expiry, long now) {
if (!expiresVariable() || (key == null) || (value == null)) {
return;
}

long variableTime = node.getVariableTime();
long currentDuration = Math.max(1, variableTime - now);
if (isAsync && (currentDuration > MAXIMUM_EXPIRY)) {
// expireAfterCreate has not yet set the duration after completion
return;
}

long duration = expiry.expireAfterRead(key, value, now, currentDuration);
if (duration != currentDuration) {
long expirationTime = isAsync ? (now + duration) : (now + Math.min(duration, MAXIMUM_EXPIRY));
node.casVariableTime(variableTime, expirationTime);
}
}

void setVariableTime(Node<K, V> node, long expirationTime) {
if (expiresVariable()) {
node.setVariableTime(expirationTime);
Expand Down Expand Up @@ -1823,7 +1852,7 @@ public boolean containsValue(Object value) {
@SuppressWarnings("unchecked")
K castedKey = (K) key;
setAccessTime(node, now);
setVariableTime(node, expireAfterRead(node, castedKey, value, expiry(), now));
tryExpireAfterRead(node, castedKey, value, expiry(), now);
}
afterRead(node, now, recordStats);
return value;
Expand Down Expand Up @@ -1862,7 +1891,7 @@ public Map<K, V> getAllPresent(Iterable<?> keys) {
if (!isComputingAsync(node)) {
@SuppressWarnings("unchecked")
K castedKey = (K) key;
setVariableTime(node, expireAfterRead(node, castedKey, value, expiry(), now));
tryExpireAfterRead(node, castedKey, value, expiry(), now);
setAccessTime(node, now);
}
afterRead(node, now, /* recordHit */ false);
Expand Down Expand Up @@ -2274,7 +2303,7 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
V value = node.getValue();
if ((value != null) && !hasExpired(node, now)) {
if (!isComputingAsync(node)) {
setVariableTime(node, expireAfterRead(node, key, value, expiry(), now));
tryExpireAfterRead(node, key, value, expiry(), now);
setAccessTime(node, now);
}

Expand Down Expand Up @@ -2362,7 +2391,7 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
}
if (newValue[0] == null) {
if (!isComputingAsync(node)) {
setVariableTime(node, expireAfterRead(node, key, oldValue[0], expiry(), now[0]));
tryExpireAfterRead(node, key, oldValue[0], expiry(), now[0]);
setAccessTime(node, now[0]);
}

Expand Down Expand Up @@ -2547,7 +2576,7 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
} else {
if (cause[0] == null) {
if (!isComputingAsync(node)) {
setVariableTime(node, expireAfterRead(node, key, newValue[0], expiry(), now[0]));
tryExpireAfterRead(node, key, newValue[0], expiry(), now[0]);
setAccessTime(node, now[0]);
}
} else if (cause[0] == RemovalCause.COLLECTED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ public long getVariableTime() {
*/
public void setVariableTime(long time) {}

/**
* Atomically sets the variable time to the given updated value if the current value equals the
* expected value and returns if the update was successful.
*/
public boolean casVariableTime(long expect, long update) {
throw new UnsupportedOperationException();
}

@GuardedBy("evictionLock")
public Node<K, V> getPreviousInVariableOrder() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright 2019 Ben Manes. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.benmanes.caffeine.cache.issues;

import static com.github.benmanes.caffeine.testing.Awaits.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.Nonnull;

import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import com.github.benmanes.caffeine.cache.Policy.VarExpiration;

/**
* Issue #298: Stale data when using Expiry
* <p>
* When a future value in an AsyncCache is in-flight, the entry has an infinite expiration time to
* disable eviction. When it completes, a callback performs a no-op write into the cache to
* update its metadata (expiration, weight, etc). This may race with a reader who obtains a
* completed future, reads the current duration as infinite, and tries to set the expiration time
* accordingly (to indicate no change). If the writer completes before the reader updates, then we
* encounter an ABA problem where the entry is set to never expire.
*
* @author [email protected] (Ben Manes)
*/
@Test(groups = "isolated")
public final class Issue298Test {
static final long EXPIRE_NS = Duration.ofDays(1).toNanos();

AtomicBoolean startedLoad;
AtomicBoolean doLoad;

AtomicBoolean startedCreate;
AtomicBoolean doCreate;

AtomicBoolean startedRead;
AtomicBoolean doRead;
AtomicBoolean endRead;

AsyncLoadingCache<String, String> cache;
VarExpiration<String, String> policy;
String key;

@BeforeMethod
public void before() {
startedCreate = new AtomicBoolean();
startedLoad = new AtomicBoolean();
startedRead = new AtomicBoolean();
doCreate = new AtomicBoolean();
endRead = new AtomicBoolean();
doLoad = new AtomicBoolean();
doRead = new AtomicBoolean();

key = "key";
cache = makeAsyncCache();
policy = cache.synchronous().policy().expireVariably().get();
}

@AfterMethod
public void after() {
endRead.set(true);
}

@Test
public void readDuringCreate() {
// Loaded value and waiting at expireAfterCreate (expire: infinite)
cache.get(key);
await().untilTrue(startedLoad);
doLoad.set(true);
await().untilTrue(startedCreate);

// Async read trying to wait at expireAfterRead
CompletableFuture<Void> reader = CompletableFuture.runAsync(() -> {
do {
cache.get(key);
} while (!endRead.get());
});

// Ran expireAfterCreate (expire: infinite -> create)
doCreate.set(true);
await().until(() -> policy.getExpiresAfter(key).get().toNanos() <= EXPIRE_NS);
await().untilTrue(startedRead);

// Ran reader (expire: create -> ?)
doRead.set(true);
endRead.set(true);
reader.join();

// Ensure expire is [expireAfterCreate], not [infinite]
assertThat(policy.getExpiresAfter(key).get().toNanos(), is(lessThanOrEqualTo(EXPIRE_NS)));
}

private AsyncLoadingCache<String, String> makeAsyncCache() {
return Caffeine.newBuilder()
.expireAfter(new Expiry<String, String>() {
@Override public long expireAfterCreate(@Nonnull String key,
@Nonnull String value, long currentTime) {
startedCreate.set(true);
await().untilTrue(doCreate);
return EXPIRE_NS;
}
@Override public long expireAfterUpdate(@Nonnull String key,
@Nonnull String value, long currentTime, long currentDuration) {
return currentDuration;
}
@Override public long expireAfterRead(@Nonnull String key,
@Nonnull String value, long currentTime, long currentDuration) {
startedRead.set(true);
await().untilTrue(doRead);
return currentDuration;
}
})
.buildAsync(key -> {
startedLoad.set(true);
await().untilTrue(doLoad);
return key + "'s value";
});
}
}

0 comments on commit 801e532

Please sign in to comment.