From a1a5de80779780dc3844b473fe14b55180572cbf Mon Sep 17 00:00:00 2001 From: Ben Manes Date: Wed, 30 Jun 2021 21:57:44 -0700 Subject: [PATCH] Improve robustness in racy scenarios (fixes #568) 1. When an entry is updated then a concurrent reader should observe either the old or new value. This operation replaces the j.l.Reference instance stored on the entry and the old referent becomes eligible for garbage collection. A reader holding the stale Reference may therefore return a null value, which is more likely due to the cache proactively clearing the referent to assist the garbage collector. When a null value is read then an extra volatile read is used to validate that the Reference instance is still held by the entry. This retry loop has negligible cost. 2. When an entry is eligible for removal due to its value being garbage collected, then during the eviction's atomic map operation this eligibility must be verified. If concurrently the entry was resurrected and a new value set, then the cache writer has already dispatched the removal notification and established a live mapping. If the evictor does not detect that the cause is no longer valid, then it would incorrectly discard the mapping with a removal notification containing a non-null key, non-null value, and collected removal cause. Like expiration and size policies, the reference eviction policy will now validate and no-op if the entry is no longer eligible. 3. When the fixed expiration setting is dynamically adjusted, an expired entry may be resurrected as no longer eligible for removal. While the map operation detected this case, stemming from the entry itself being updated and its lifetime reset, the outer eviction loop could retry indefinitely due to a stale read of the fixed duration. This caused the loop to retry the ineligible entry, but instead it can terminate when eviction fails because it scans a queue ordered by the expiration timestamp. Co-authored-by: Justin Horvitz --- .../caffeine/cache/node/AddValue.java | 29 ++- .../caffeine/cache/BoundedLocalCache.java | 24 +- .../caffeine/cache/BoundedLocalCacheTest.java | 238 ++++++++++++++++++ .../caffeine/cache/issues/Issue568Test.java | 128 ++++++++++ checksum.xml | 13 + 5 files changed, 422 insertions(+), 10 deletions(-) create mode 100644 caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue568Test.java diff --git a/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/node/AddValue.java b/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/node/AddValue.java index 9298667655..54774d7a56 100644 --- a/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/node/AddValue.java +++ b/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/node/AddValue.java @@ -26,6 +26,7 @@ import javax.lang.model.element.Modifier; +import com.squareup.javapoet.CodeBlock; import com.squareup.javapoet.FieldSpec; import com.squareup.javapoet.MethodSpec; @@ -47,7 +48,7 @@ protected void execute() { context.nodeSubtype .addField(newFieldOffset(context.className, "value")) .addField(newValueField()) - .addMethod(newGetter(valueStrength(), vTypeVar, "value", Visibility.LAZY)) + .addMethod(makeGetValue()) .addMethod(newGetRef("value")) .addMethod(makeSetValue()) .addMethod(makeContainsValue()); @@ -60,6 +61,29 @@ private FieldSpec newValueField() { return fieldSpec.build(); } + /** Creates the getValue method. */ + private MethodSpec makeGetValue() { + MethodSpec.Builder getter = MethodSpec.methodBuilder("getValue") + .addModifiers(context.publicFinalModifiers()) + .returns(vTypeVar); + if (valueStrength() == Strength.STRONG) { + getter.addStatement("return value"); + return getter.build(); + } + + CodeBlock code = CodeBlock.builder() + .beginControlFlow("for (;;)") + .addStatement("$1T ref = ($1T) $2T.UNSAFE.getObject(this, $3N)", + Reference.class, UNSAFE_ACCESS, offsetName("value")) + .addStatement("V referent = ref.get()") + .beginControlFlow("if ((referent != null) || (ref == value))") + .addStatement("return referent") + .endControlFlow() + .endControlFlow() + .build(); + return getter.addCode(code).build(); + } + /** Creates the setValue method. */ private MethodSpec makeSetValue() { MethodSpec.Builder setter = MethodSpec.methodBuilder("setValue") @@ -71,9 +95,10 @@ private MethodSpec makeSetValue() { setter.addStatement("$T.UNSAFE.putObject(this, $N, $N)", UNSAFE_ACCESS, offsetName("value"), "value"); } else { - setter.addStatement("(($T) getValueReference()).clear()", Reference.class); + setter.addStatement("$1T ref = ($1T) getValueReference()", Reference.class); setter.addStatement("$T.UNSAFE.putObject(this, $N, new $T($L, $N, referenceQueue))", UNSAFE_ACCESS, offsetName("value"), valueReferenceType(), "getKeyReference()", "value"); + setter.addStatement("ref.clear()"); } return setter.build(); diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index c0e50da3b4..e5c2d65237 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -822,10 +822,10 @@ void expireAfterAccessEntries(AccessOrderDeque> accessOrderDeque, lon long duration = expiresAfterAccessNanos(); for (;;) { Node node = accessOrderDeque.peekFirst(); - if ((node == null) || ((now - node.getAccessTime()) < duration)) { + if ((node == null) || ((now - node.getAccessTime()) < duration) + || !evictEntry(node, RemovalCause.EXPIRED, now)) { return; } - evictEntry(node, RemovalCause.EXPIRED, now); } } @@ -837,11 +837,11 @@ void expireAfterWriteEntries(long now) { } long duration = expiresAfterWriteNanos(); for (;;) { - final Node node = writeOrderDeque().peekFirst(); - if ((node == null) || ((now - node.getWriteTime()) < duration)) { + Node node = writeOrderDeque().peekFirst(); + if ((node == null) || ((now - node.getWriteTime()) < duration) + || !evictEntry(node, RemovalCause.EXPIRED, now)) { break; } - evictEntry(node, RemovalCause.EXPIRED, now); } } @@ -894,8 +894,8 @@ boolean hasExpired(Node node, long now) { } /** - * Attempts to evict the entry based on the given removal cause. A removal due to expiration or - * size may be ignored if the entry was updated and is no longer eligible for eviction. + * Attempts to evict the entry based on the given removal cause. A removal due to may be ignored + * if the entry was updated and is no longer eligible for eviction. * * @param node the entry to evict * @param cause the reason to evict @@ -919,7 +919,15 @@ boolean evictEntry(Node node, RemovalCause cause, long now) { synchronized (n) { value[0] = n.getValue(); - actualCause[0] = (key == null) || (value[0] == null) ? RemovalCause.COLLECTED : cause; + if ((key == null) || (value[0] == null)) { + actualCause[0] = RemovalCause.COLLECTED; + } else if (cause == RemovalCause.COLLECTED) { + resurrect[0] = true; + return n; + } else { + actualCause[0] = cause; + } + if (actualCause[0] == RemovalCause.EXPIRED) { boolean expired = false; if (expiresAfterAccess()) { diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java index 9d3926ec32..c0b175db0d 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java @@ -45,6 +45,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.lang.Thread.State; +import java.lang.ref.Reference; +import java.time.Duration; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -52,6 +56,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import org.mockito.Mockito; @@ -454,6 +459,239 @@ public void clear_update() { await().untilAtomic(removedValues, is(oldValue + newValue)); } + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, values = {ReferenceType.WEAK, ReferenceType.SOFT}, + removalListener = Listener.CONSUMING) + public void evict_resurrect_collected(Cache cache, CacheContext context) { + Integer key = Integer.valueOf(1); + Integer oldValue = Integer.valueOf(2); + Integer newValue = Integer.valueOf(3); + BoundedLocalCache localCache = asBoundedLocalCache(cache); + + cache.put(key, oldValue); + Node node = localCache.data.get( + localCache.nodeFactory.newReferenceKey(key, localCache.keyReferenceQueue())); + @SuppressWarnings("unchecked") + Reference ref = (Reference) node.getValueReference(); + ref.enqueue(); + ref.clear(); + + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + AtomicReference evictor = new AtomicReference<>(); + cache.asMap().compute(key, (k, v) -> { + assertThat(v, is(nullValue())); + ConcurrentTestHarness.execute(() -> { + evictor.set(Thread.currentThread()); + started.set(true); + cache.cleanUp(); + done.set(true); + }); + await().untilTrue(started); + EnumSet threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.get().getState())); + + return newValue; + }); + await().untilTrue(done); + + assertThat(node.getValue(), is(newValue)); + assertThat(context.removalNotifications(), is(equalTo(ImmutableList.of( + new RemovalNotification<>(key, null, RemovalCause.COLLECTED))))); + } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, maximumSize = Maximum.UNREACHABLE, + weigher = CacheWeigher.COLLECTION) + public void evict_resurrect_weight(Cache> cache, CacheContext context) { + Integer key = Integer.valueOf(1); + cache.put(key, ImmutableList.of(key)); + + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + AtomicReference evictor = new AtomicReference<>(); + cache.asMap().compute(key, (k, v) -> { + ConcurrentTestHarness.execute(() -> { + evictor.set(Thread.currentThread()); + started.set(true); + cache.policy().eviction().get().setMaximum(0); + done.set(true); + }); + + await().untilTrue(started); + EnumSet threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.get().getState())); + + return ImmutableList.of(); + }); + await().untilTrue(done); + + assertThat(cache.getIfPresent(key), is(ImmutableList.of())); + verifyRemovalListener(context, verifier -> verifier.hasCount(0, RemovalCause.SIZE)); + } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, mustExpireWithAnyOf = {AFTER_ACCESS, AFTER_WRITE}, + expireAfterAccess = {Expire.DISABLED, Expire.ONE_MINUTE}, + expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE}) + public void evict_resurrect_expireAfter(Cache cache, CacheContext context) { + Integer key = Integer.valueOf(1); + cache.put(key, key); + + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + AtomicReference evictor = new AtomicReference<>(); + context.ticker().advance(Duration.ofHours(1)); + cache.asMap().compute(key, (k, v) -> { + ConcurrentTestHarness.execute(() -> { + evictor.set(Thread.currentThread()); + started.set(true); + cache.cleanUp(); + done.set(true); + }); + + await().untilTrue(started); + EnumSet threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.get().getState())); + return -key; + }); + await().untilTrue(done); + + assertThat(cache.getIfPresent(key), is(-key)); + verifyRemovalListener(context, verifier -> verifier.hasOnly(1, RemovalCause.EXPIRED)); + } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, expireAfterAccess = Expire.FOREVER) + public void evict_resurrect_expireAfterAccess( + Cache cache, CacheContext context) { + Integer key = Integer.valueOf(1); + cache.put(key, key); + + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + AtomicReference evictor = new AtomicReference<>(); + context.ticker().advance(Duration.ofMinutes(1)); + cache.asMap().compute(key, (k, v) -> { + ConcurrentTestHarness.execute(() -> { + evictor.set(Thread.currentThread()); + started.set(true); + cache.policy().expireAfterAccess().get().setExpiresAfter(Duration.ZERO); + done.set(true); + }); + + await().untilTrue(started); + EnumSet threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.get().getState())); + cache.policy().expireAfterAccess().get().setExpiresAfter(Duration.ofHours(1)); + return v; + }); + await().untilTrue(done); + + assertThat(cache.getIfPresent(key), is(key)); + verifyRemovalListener(context, verifier -> verifier.noInteractions()); + } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, expireAfterWrite = Expire.FOREVER) + public void evict_resurrect_expireAfterWrite( + Cache cache, CacheContext context) { + Integer key = Integer.valueOf(1); + cache.put(key, key); + + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + AtomicReference evictor = new AtomicReference<>(); + context.ticker().advance(Duration.ofMinutes(1)); + cache.asMap().compute(key, (k, v) -> { + ConcurrentTestHarness.execute(() -> { + evictor.set(Thread.currentThread()); + started.set(true); + cache.policy().expireAfterWrite().get().setExpiresAfter(Duration.ZERO); + done.set(true); + }); + + await().untilTrue(started); + EnumSet threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.get().getState())); + cache.policy().expireAfterWrite().get().setExpiresAfter(Duration.ofHours(1)); + return v; + }); + await().untilTrue(done); + + assertThat(cache.getIfPresent(key), is(key)); + verifyRemovalListener(context, verifier -> verifier.noInteractions()); + } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, expireAfterWrite = Expire.ONE_MINUTE) + public void evict_resurrect_expireAfterWrite_entry( + Cache cache, CacheContext context) { + Integer key = Integer.valueOf(1); + cache.put(key, key); + + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + AtomicReference evictor = new AtomicReference<>(); + context.ticker().advance(Duration.ofHours(1)); + cache.asMap().compute(key, (k, v) -> { + ConcurrentTestHarness.execute(() -> { + evictor.set(Thread.currentThread()); + started.set(true); + cache.cleanUp(); + done.set(true); + }); + + await().untilTrue(started); + EnumSet threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.get().getState())); + return -key; + }); + await().untilTrue(done); + + assertThat(cache.getIfPresent(key), is(-key)); + verifyRemovalListener(context, verifier -> verifier.hasOnly(1, RemovalCause.EXPIRED)); + } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, expiry = CacheExpiry.CREATE, expiryTime = Expire.ONE_MINUTE) + public void evict_resurrect_expireAfterVar(Cache cache, CacheContext context) { + BoundedLocalCache localCache = asBoundedLocalCache(cache); + Integer key = Integer.valueOf(1); + cache.put(key, key); + Node node = localCache.data.get( + localCache.nodeFactory.newReferenceKey(key, localCache.keyReferenceQueue())); + + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + AtomicReference evictor = new AtomicReference<>(); + synchronized (node) { + context.ticker().advance(Duration.ofHours(1)); + ConcurrentTestHarness.execute(() -> { + evictor.set(Thread.currentThread()); + started.set(true); + cache.cleanUp(); + done.set(true); + }); + + await().untilTrue(started); + EnumSet threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.get().getState())); + cache.policy().expireVariably().get().setExpiresAfter(key, Duration.ofDays(1)); + } + await().untilTrue(done); + + assertThat(cache.getIfPresent(key), is(key)); + verifyRemovalListener(context, verifier -> verifier.noInteractions()); + } + @Test(dataProvider = "caches") @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, population = Population.FULL, maximumSize = Maximum.FULL) diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue568Test.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue568Test.java new file mode 100644 index 0000000000..3175d389be --- /dev/null +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue568Test.java @@ -0,0 +1,128 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache.issues; + +import java.lang.ref.Reference; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.testng.annotations.Test; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; + +/** + * Issue #568: Incorrect handling of weak/soft reference caching. + * + * @author jhorvitz@google.com (Justin Horvitz) + */ +@Test(groups = "isolated") +public class Issue568Test { + + /** + * When an entry is updated then a concurrent reader should observe either the old or new value. + * This operation replaces the {@link Reference} instance stored on the entry and the old referent + * becomes eligible for garbage collection. A reader holding the stale Reference may therefore + * return a null value, which is more likely due to the cache proactively clearing the referent to + * assist the garbage collector. + */ + @Test + public void intermittentNull() throws InterruptedException { + Cache cache = Caffeine.newBuilder().weakValues().build(); + + String key = "key"; + String val = "val"; + cache.put("key", "val"); + + AtomicReference error = new AtomicReference<>(); + List threads = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + int name = i; + Thread thread = new Thread(() -> { + for (int j = 0; j < 1000; j++) { + if (Math.random() < .5) { + cache.put(key, val); + } else if (cache.getIfPresent(key) == null) { + error.compareAndSet(null, new IllegalStateException( + "Thread " + name + " observed null on iteration " + j)); + break; + } + } + }); + threads.add(thread); + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + if (error.get() != null) { + throw error.get(); + } + } + + /** + * When an entry is eligible for removal due to its value being garbage collected, then during the + * eviction's atomic map operation this eligibility must be verified. If concurrently the entry + * was resurrected and a new value set, then the cache writer has already dispatched the removal + * notification and established a live mapping. If the evictor does not detect that the cause is + * no longer valid, then it would incorrectly discard the mapping with a removal notification + * containing a non-null key, non-null value, and collected removal cause. + */ + @Test + public void resurrect() throws InterruptedException { + AtomicReference error = new AtomicReference<>(); + Cache cache = Caffeine.newBuilder() + .weakValues() + .removalListener((k, v, cause) -> { + if (cause == RemovalCause.COLLECTED && (v != null)) { + error.compareAndSet(null, new IllegalStateException("Evicted a live value: " + v)); + } + }).build(); + + String key = "key"; + cache.put(key, new Object()); + + AtomicBoolean missing = new AtomicBoolean(); + List threads = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + Thread thread = new Thread(() -> { + for (int j = 0; j < 1000; j++) { + if (error.get() != null) { + break; + } + if (Math.random() < .01) { + System.gc(); + cache.cleanUp(); + } else if ((cache.getIfPresent(key) == null) && !missing.getAndSet(true)) { + cache.put(key, new Object()); + missing.set(false); + } + } + }); + threads.add(thread); + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + if (error.get() != null) { + throw error.get(); + } + } +} diff --git a/checksum.xml b/checksum.xml index ce1b3fa771..fdb1f25ff5 100644 --- a/checksum.xml +++ b/checksum.xml @@ -8,6 +8,7 @@ + @@ -361,6 +362,9 @@ 3C76A1066218F9C54B83D167E34B563C09BA2F72DB330F5843B2324DD2E40EE1F7E7887416B9C7A8DC03C07223E42216B76569F5D3962B911F922A6963C0C917 + + 9AEC8CAD28C018AEA65F3C2C57FD0D4B82C00A8F6C778F6DBAA61C961AF3A7C369655F5A5E93B360FAE297ADFA3205BC40EAF16847B6EDB56F01DF9AD655A18C + 1A47AAF2442159C1CBD22521F31C74B4C71C4168AF5B22D04B4691FDD286E90F02B2792DEDAD3EEEC12B5034ADA1A9EE751C975B5A169AE0B33EE800A8D96E7F @@ -406,6 +410,15 @@ 7245456641F2BF87960CF859D31827EBF937739932C59E135635579199122A5763F437A19DED6B9CCAAF9E84FD698A675F60427138EF35C10BF40BBC3E7B68DA + + A4C943D4945189F4EB883D764E63CF5C5F8F2A17EA1BBFE6C3F05A4B28E1E011D0358246618A39EC705E7CEE60D30ABFDD82A36A621BC7A3F5455E47D0EDAF2B + + + 7F18348A8EDC88BB4B0D6EB5412BC4F6D79D1E5843F28CA72F717BE50BF0BF6A19AE1C3416E5EAB40CC1FD27C697CB0F9B5E10A51CFA574093EF0A4F57CB93CF + + + 5EA9CA94A3682E090E28895ECAAE1E020C48DD249EE5040FB6EBE4B01DA027B86F94450E30692253DFA787371D4B4286FE257CEBF02E184AC149A746952D669C + 779F0D784A11834392C65DA375CF5F9612FD89B0540C665BCE1009EFAA7C35642E38D381AF7362703378306C95D24669B02CA27A2CCAFB574173BF9FA273F625