From 801e532935dd822aab75e9625a520df7b62bb0b0 Mon Sep 17 00:00:00 2001 From: Ben Manes Date: Sat, 23 Feb 2019 13:10:35 -0800 Subject: [PATCH] Fix async expiration race for create + read (fixes #298) When the future is in-flight, the expiration is dsabled by using an infinite timestamp. When finished, a callback updates it through a no-op write and called expireAfterCreate. If a read comes in after the future is done but before the timestamp is reset, it was allowed to set the value. This could cause an ABA race where the infinite timestamp was read, replaced, and written back. This fix now avoids that by using guards to skip the read and CAS when updating. --- .../caffeine/cache/node/AddExpiration.java | 21 ++- .../caffeine/cache/BoundedLocalCache.java | 39 ++++- .../github/benmanes/caffeine/cache/Node.java | 8 + .../caffeine/cache/issues/Issue298Test.java | 143 ++++++++++++++++++ 4 files changed, 201 insertions(+), 10 deletions(-) create mode 100644 caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue298Test.java diff --git a/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/node/AddExpiration.java b/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/node/AddExpiration.java index 6e147d00e8..45b8e35b0c 100644 --- a/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/node/AddExpiration.java +++ b/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/node/AddExpiration.java @@ -86,17 +86,28 @@ private void addLink(String method, String varName) { private void addVariableTime(String varName) { MethodSpec getter = MethodSpec.methodBuilder("getVariableTime") .addModifiers(Modifier.PUBLIC) - .addStatement("return $N", varName) + .addStatement("return $T.UNSAFE.getLong(this, $N)", + UNSAFE_ACCESS, offsetName(varName)) .returns(long.class) .build(); MethodSpec setter = MethodSpec.methodBuilder("setVariableTime") .addModifiers(Modifier.PUBLIC) .addParameter(long.class, varName) - .addStatement("this.$N = $N", varName, varName) + .addStatement("$T.UNSAFE.putLong(this, $N, $N)", + UNSAFE_ACCESS, offsetName(varName), varName) + .build(); + MethodSpec cas = MethodSpec.methodBuilder("casVariableTime") + .addModifiers(Modifier.PUBLIC) + .addParameter(long.class, "expect") + .addParameter(long.class, "update") + .returns(boolean.class) + .addStatement("return ($N == $N)\n&& $T.UNSAFE.compareAndSwapLong(this, $N, $N, $N)", + varName, "expect", UNSAFE_ACCESS, offsetName(varName), "expect", "update") .build(); context.nodeSubtype .addMethod(getter) - .addMethod(setter); + .addMethod(setter) + .addMethod(cas); } private void addAccessExpiration() { @@ -132,8 +143,8 @@ private void addRefreshExpiration() { .addParameter(long.class, "expect") .addParameter(long.class, "update") .returns(boolean.class) - .addStatement("return $T.UNSAFE.compareAndSwapLong(this, $N, $N, $N)", - UNSAFE_ACCESS, offsetName("writeTime"), "expect", "update") + .addStatement("return ($N == $N)\n&& $T.UNSAFE.compareAndSwapLong(this, $N, $N, $N)", + "writeTime", "expect", UNSAFE_ACCESS, offsetName("writeTime"), "expect", "update") .build()); } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index f40016a10b..d39398219e 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -1255,6 +1255,35 @@ long expireAfterRead(Node node, @Nullable K key, return 0L; } + /** + * Attempts to update the access time for the entry after a read. + * + * @param node the entry in the page replacement policy + * @param key the key of the entry that was read + * @param value the value of the entry that was read + * @param expiry the calculator for the expiration time + * @param now the current time, in nanoseconds + */ + void tryExpireAfterRead(Node node, @Nullable K key, + @Nullable V value, Expiry expiry, long now) { + if (!expiresVariable() || (key == null) || (value == null)) { + return; + } + + long variableTime = node.getVariableTime(); + long currentDuration = Math.max(1, variableTime - now); + if (isAsync && (currentDuration > MAXIMUM_EXPIRY)) { + // expireAfterCreate has not yet set the duration after completion + return; + } + + long duration = expiry.expireAfterRead(key, value, now, currentDuration); + if (duration != currentDuration) { + long expirationTime = isAsync ? (now + duration) : (now + Math.min(duration, MAXIMUM_EXPIRY)); + node.casVariableTime(variableTime, expirationTime); + } + } + void setVariableTime(Node node, long expirationTime) { if (expiresVariable()) { node.setVariableTime(expirationTime); @@ -1823,7 +1852,7 @@ public boolean containsValue(Object value) { @SuppressWarnings("unchecked") K castedKey = (K) key; setAccessTime(node, now); - setVariableTime(node, expireAfterRead(node, castedKey, value, expiry(), now)); + tryExpireAfterRead(node, castedKey, value, expiry(), now); } afterRead(node, now, recordStats); return value; @@ -1862,7 +1891,7 @@ public Map getAllPresent(Iterable keys) { if (!isComputingAsync(node)) { @SuppressWarnings("unchecked") K castedKey = (K) key; - setVariableTime(node, expireAfterRead(node, castedKey, value, expiry(), now)); + tryExpireAfterRead(node, castedKey, value, expiry(), now); setAccessTime(node, now); } afterRead(node, now, /* recordHit */ false); @@ -2274,7 +2303,7 @@ public void replaceAll(BiFunction function) { V value = node.getValue(); if ((value != null) && !hasExpired(node, now)) { if (!isComputingAsync(node)) { - setVariableTime(node, expireAfterRead(node, key, value, expiry(), now)); + tryExpireAfterRead(node, key, value, expiry(), now); setAccessTime(node, now); } @@ -2362,7 +2391,7 @@ public void replaceAll(BiFunction function) { } if (newValue[0] == null) { if (!isComputingAsync(node)) { - setVariableTime(node, expireAfterRead(node, key, oldValue[0], expiry(), now[0])); + tryExpireAfterRead(node, key, oldValue[0], expiry(), now[0]); setAccessTime(node, now[0]); } @@ -2547,7 +2576,7 @@ public void replaceAll(BiFunction function) { } else { if (cause[0] == null) { if (!isComputingAsync(node)) { - setVariableTime(node, expireAfterRead(node, key, newValue[0], expiry(), now[0])); + tryExpireAfterRead(node, key, newValue[0], expiry(), now[0]); setAccessTime(node, now[0]); } } else if (cause[0] == RemovalCause.COLLECTED) { diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Node.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Node.java index 07b439a960..2df2b1e19e 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Node.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Node.java @@ -129,6 +129,14 @@ public long getVariableTime() { */ public void setVariableTime(long time) {} + /** + * Atomically sets the variable time to the given updated value if the current value equals the + * expected value and returns if the update was successful. + */ + public boolean casVariableTime(long expect, long update) { + throw new UnsupportedOperationException(); + } + @GuardedBy("evictionLock") public Node getPreviousInVariableOrder() { throw new UnsupportedOperationException(); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue298Test.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue298Test.java new file mode 100644 index 0000000000..49b57d53b0 --- /dev/null +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue298Test.java @@ -0,0 +1,143 @@ +/* + * Copyright 2019 Ben Manes. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache.issues; + +import static com.github.benmanes.caffeine.testing.Awaits.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.annotation.Nonnull; + +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Expiry; +import com.github.benmanes.caffeine.cache.Policy.VarExpiration; + +/** + * Issue #298: Stale data when using Expiry + *

+ * When a future value in an AsyncCache is in-flight, the entry has an infinite expiration time to + * disable eviction. When it completes, a callback performs a no-op write into the cache to + * update its metadata (expiration, weight, etc). This may race with a reader who obtains a + * completed future, reads the current duration as infinite, and tries to set the expiration time + * accordingly (to indicate no change). If the writer completes before the reader updates, then we + * encounter an ABA problem where the entry is set to never expire. + * + * @author ben.manes@gmail.com (Ben Manes) + */ +@Test(groups = "isolated") +public final class Issue298Test { + static final long EXPIRE_NS = Duration.ofDays(1).toNanos(); + + AtomicBoolean startedLoad; + AtomicBoolean doLoad; + + AtomicBoolean startedCreate; + AtomicBoolean doCreate; + + AtomicBoolean startedRead; + AtomicBoolean doRead; + AtomicBoolean endRead; + + AsyncLoadingCache cache; + VarExpiration policy; + String key; + + @BeforeMethod + public void before() { + startedCreate = new AtomicBoolean(); + startedLoad = new AtomicBoolean(); + startedRead = new AtomicBoolean(); + doCreate = new AtomicBoolean(); + endRead = new AtomicBoolean(); + doLoad = new AtomicBoolean(); + doRead = new AtomicBoolean(); + + key = "key"; + cache = makeAsyncCache(); + policy = cache.synchronous().policy().expireVariably().get(); + } + + @AfterMethod + public void after() { + endRead.set(true); + } + + @Test + public void readDuringCreate() { + // Loaded value and waiting at expireAfterCreate (expire: infinite) + cache.get(key); + await().untilTrue(startedLoad); + doLoad.set(true); + await().untilTrue(startedCreate); + + // Async read trying to wait at expireAfterRead + CompletableFuture reader = CompletableFuture.runAsync(() -> { + do { + cache.get(key); + } while (!endRead.get()); + }); + + // Ran expireAfterCreate (expire: infinite -> create) + doCreate.set(true); + await().until(() -> policy.getExpiresAfter(key).get().toNanos() <= EXPIRE_NS); + await().untilTrue(startedRead); + + // Ran reader (expire: create -> ?) + doRead.set(true); + endRead.set(true); + reader.join(); + + // Ensure expire is [expireAfterCreate], not [infinite] + assertThat(policy.getExpiresAfter(key).get().toNanos(), is(lessThanOrEqualTo(EXPIRE_NS))); + } + + private AsyncLoadingCache makeAsyncCache() { + return Caffeine.newBuilder() + .expireAfter(new Expiry() { + @Override public long expireAfterCreate(@Nonnull String key, + @Nonnull String value, long currentTime) { + startedCreate.set(true); + await().untilTrue(doCreate); + return EXPIRE_NS; + } + @Override public long expireAfterUpdate(@Nonnull String key, + @Nonnull String value, long currentTime, long currentDuration) { + return currentDuration; + } + @Override public long expireAfterRead(@Nonnull String key, + @Nonnull String value, long currentTime, long currentDuration) { + startedRead.set(true); + await().untilTrue(doRead); + return currentDuration; + } + }) + .buildAsync(key -> { + startedLoad.set(true); + await().untilTrue(doLoad); + return key + "'s value"; + }); + } +}