diff --git a/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/node/AddExpiration.java b/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/node/AddExpiration.java index 6e147d00e8..45b8e35b0c 100644 --- a/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/node/AddExpiration.java +++ b/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/node/AddExpiration.java @@ -86,17 +86,28 @@ private void addLink(String method, String varName) { private void addVariableTime(String varName) { MethodSpec getter = MethodSpec.methodBuilder("getVariableTime") .addModifiers(Modifier.PUBLIC) - .addStatement("return $N", varName) + .addStatement("return $T.UNSAFE.getLong(this, $N)", + UNSAFE_ACCESS, offsetName(varName)) .returns(long.class) .build(); MethodSpec setter = MethodSpec.methodBuilder("setVariableTime") .addModifiers(Modifier.PUBLIC) .addParameter(long.class, varName) - .addStatement("this.$N = $N", varName, varName) + .addStatement("$T.UNSAFE.putLong(this, $N, $N)", + UNSAFE_ACCESS, offsetName(varName), varName) + .build(); + MethodSpec cas = MethodSpec.methodBuilder("casVariableTime") + .addModifiers(Modifier.PUBLIC) + .addParameter(long.class, "expect") + .addParameter(long.class, "update") + .returns(boolean.class) + .addStatement("return ($N == $N)\n&& $T.UNSAFE.compareAndSwapLong(this, $N, $N, $N)", + varName, "expect", UNSAFE_ACCESS, offsetName(varName), "expect", "update") .build(); context.nodeSubtype .addMethod(getter) - .addMethod(setter); + .addMethod(setter) + .addMethod(cas); } private void addAccessExpiration() { @@ -132,8 +143,8 @@ private void addRefreshExpiration() { .addParameter(long.class, "expect") .addParameter(long.class, "update") .returns(boolean.class) - .addStatement("return $T.UNSAFE.compareAndSwapLong(this, $N, $N, $N)", - UNSAFE_ACCESS, offsetName("writeTime"), "expect", "update") + .addStatement("return ($N == $N)\n&& $T.UNSAFE.compareAndSwapLong(this, $N, $N, $N)", + "writeTime", "expect", UNSAFE_ACCESS, offsetName("writeTime"), "expect", "update") .build()); } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index f40016a10b..d39398219e 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -1255,6 +1255,35 @@ long expireAfterRead(Node node, @Nullable K key, return 0L; } + /** + * Attempts to update the access time for the entry after a read. + * + * @param node the entry in the page replacement policy + * @param key the key of the entry that was read + * @param value the value of the entry that was read + * @param expiry the calculator for the expiration time + * @param now the current time, in nanoseconds + */ + void tryExpireAfterRead(Node node, @Nullable K key, + @Nullable V value, Expiry expiry, long now) { + if (!expiresVariable() || (key == null) || (value == null)) { + return; + } + + long variableTime = node.getVariableTime(); + long currentDuration = Math.max(1, variableTime - now); + if (isAsync && (currentDuration > MAXIMUM_EXPIRY)) { + // expireAfterCreate has not yet set the duration after completion + return; + } + + long duration = expiry.expireAfterRead(key, value, now, currentDuration); + if (duration != currentDuration) { + long expirationTime = isAsync ? (now + duration) : (now + Math.min(duration, MAXIMUM_EXPIRY)); + node.casVariableTime(variableTime, expirationTime); + } + } + void setVariableTime(Node node, long expirationTime) { if (expiresVariable()) { node.setVariableTime(expirationTime); @@ -1823,7 +1852,7 @@ public boolean containsValue(Object value) { @SuppressWarnings("unchecked") K castedKey = (K) key; setAccessTime(node, now); - setVariableTime(node, expireAfterRead(node, castedKey, value, expiry(), now)); + tryExpireAfterRead(node, castedKey, value, expiry(), now); } afterRead(node, now, recordStats); return value; @@ -1862,7 +1891,7 @@ public Map getAllPresent(Iterable keys) { if (!isComputingAsync(node)) { @SuppressWarnings("unchecked") K castedKey = (K) key; - setVariableTime(node, expireAfterRead(node, castedKey, value, expiry(), now)); + tryExpireAfterRead(node, castedKey, value, expiry(), now); setAccessTime(node, now); } afterRead(node, now, /* recordHit */ false); @@ -2274,7 +2303,7 @@ public void replaceAll(BiFunction function) { V value = node.getValue(); if ((value != null) && !hasExpired(node, now)) { if (!isComputingAsync(node)) { - setVariableTime(node, expireAfterRead(node, key, value, expiry(), now)); + tryExpireAfterRead(node, key, value, expiry(), now); setAccessTime(node, now); } @@ -2362,7 +2391,7 @@ public void replaceAll(BiFunction function) { } if (newValue[0] == null) { if (!isComputingAsync(node)) { - setVariableTime(node, expireAfterRead(node, key, oldValue[0], expiry(), now[0])); + tryExpireAfterRead(node, key, oldValue[0], expiry(), now[0]); setAccessTime(node, now[0]); } @@ -2547,7 +2576,7 @@ public void replaceAll(BiFunction function) { } else { if (cause[0] == null) { if (!isComputingAsync(node)) { - setVariableTime(node, expireAfterRead(node, key, newValue[0], expiry(), now[0])); + tryExpireAfterRead(node, key, newValue[0], expiry(), now[0]); setAccessTime(node, now[0]); } } else if (cause[0] == RemovalCause.COLLECTED) { diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Node.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Node.java index 07b439a960..2df2b1e19e 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Node.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Node.java @@ -129,6 +129,14 @@ public long getVariableTime() { */ public void setVariableTime(long time) {} + /** + * Atomically sets the variable time to the given updated value if the current value equals the + * expected value and returns if the update was successful. + */ + public boolean casVariableTime(long expect, long update) { + throw new UnsupportedOperationException(); + } + @GuardedBy("evictionLock") public Node getPreviousInVariableOrder() { throw new UnsupportedOperationException(); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue298Test.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue298Test.java new file mode 100644 index 0000000000..49b57d53b0 --- /dev/null +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue298Test.java @@ -0,0 +1,143 @@ +/* + * Copyright 2019 Ben Manes. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache.issues; + +import static com.github.benmanes.caffeine.testing.Awaits.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.annotation.Nonnull; + +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Expiry; +import com.github.benmanes.caffeine.cache.Policy.VarExpiration; + +/** + * Issue #298: Stale data when using Expiry + *

+ * When a future value in an AsyncCache is in-flight, the entry has an infinite expiration time to + * disable eviction. When it completes, a callback performs a no-op write into the cache to + * update its metadata (expiration, weight, etc). This may race with a reader who obtains a + * completed future, reads the current duration as infinite, and tries to set the expiration time + * accordingly (to indicate no change). If the writer completes before the reader updates, then we + * encounter an ABA problem where the entry is set to never expire. + * + * @author ben.manes@gmail.com (Ben Manes) + */ +@Test(groups = "isolated") +public final class Issue298Test { + static final long EXPIRE_NS = Duration.ofDays(1).toNanos(); + + AtomicBoolean startedLoad; + AtomicBoolean doLoad; + + AtomicBoolean startedCreate; + AtomicBoolean doCreate; + + AtomicBoolean startedRead; + AtomicBoolean doRead; + AtomicBoolean endRead; + + AsyncLoadingCache cache; + VarExpiration policy; + String key; + + @BeforeMethod + public void before() { + startedCreate = new AtomicBoolean(); + startedLoad = new AtomicBoolean(); + startedRead = new AtomicBoolean(); + doCreate = new AtomicBoolean(); + endRead = new AtomicBoolean(); + doLoad = new AtomicBoolean(); + doRead = new AtomicBoolean(); + + key = "key"; + cache = makeAsyncCache(); + policy = cache.synchronous().policy().expireVariably().get(); + } + + @AfterMethod + public void after() { + endRead.set(true); + } + + @Test + public void readDuringCreate() { + // Loaded value and waiting at expireAfterCreate (expire: infinite) + cache.get(key); + await().untilTrue(startedLoad); + doLoad.set(true); + await().untilTrue(startedCreate); + + // Async read trying to wait at expireAfterRead + CompletableFuture reader = CompletableFuture.runAsync(() -> { + do { + cache.get(key); + } while (!endRead.get()); + }); + + // Ran expireAfterCreate (expire: infinite -> create) + doCreate.set(true); + await().until(() -> policy.getExpiresAfter(key).get().toNanos() <= EXPIRE_NS); + await().untilTrue(startedRead); + + // Ran reader (expire: create -> ?) + doRead.set(true); + endRead.set(true); + reader.join(); + + // Ensure expire is [expireAfterCreate], not [infinite] + assertThat(policy.getExpiresAfter(key).get().toNanos(), is(lessThanOrEqualTo(EXPIRE_NS))); + } + + private AsyncLoadingCache makeAsyncCache() { + return Caffeine.newBuilder() + .expireAfter(new Expiry() { + @Override public long expireAfterCreate(@Nonnull String key, + @Nonnull String value, long currentTime) { + startedCreate.set(true); + await().untilTrue(doCreate); + return EXPIRE_NS; + } + @Override public long expireAfterUpdate(@Nonnull String key, + @Nonnull String value, long currentTime, long currentDuration) { + return currentDuration; + } + @Override public long expireAfterRead(@Nonnull String key, + @Nonnull String value, long currentTime, long currentDuration) { + startedRead.set(true); + await().untilTrue(doRead); + return currentDuration; + } + }) + .buildAsync(key -> { + startedLoad.set(true); + await().untilTrue(doLoad); + return key + "'s value"; + }); + } +}