Skip to content

Commit

Permalink
Protect reference caches from a discarding executor
Browse files Browse the repository at this point in the history
Backport of f143764
  • Loading branch information
ben-manes committed Dec 2, 2021
1 parent 336ef93 commit 8c7160d
Show file tree
Hide file tree
Showing 13 changed files with 142 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,6 @@ public static boolean usesAccessOrderMainDeque(Set<Feature> features) {
|| features.contains(Feature.MAXIMUM_WEIGHT);
}

public static boolean usesWriteQueue(Set<Feature> features) {
return features.contains(Feature.MAXIMUM_SIZE)
|| features.contains(Feature.MAXIMUM_WEIGHT)
|| features.contains(Feature.EXPIRE_ACCESS)
|| features.contains(Feature.EXPIRE_WRITE)
|| features.contains(Feature.REFRESH_WRITE);
}

public static boolean useWriteTime(Set<Feature> features) {
return features.contains(Feature.EXPIRE_WRITE)
|| features.contains(Feature.REFRESH_WRITE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import com.github.benmanes.caffeine.cache.local.AddRemovalListener;
import com.github.benmanes.caffeine.cache.local.AddStats;
import com.github.benmanes.caffeine.cache.local.AddSubtype;
import com.github.benmanes.caffeine.cache.local.AddWriteBuffer;
import com.github.benmanes.caffeine.cache.local.Finalize;
import com.github.benmanes.caffeine.cache.local.LocalCacheContext;
import com.github.benmanes.caffeine.cache.local.LocalCacheRule;
Expand Down Expand Up @@ -90,7 +89,7 @@ public final class LocalCacheFactoryGenerator {
new AddKeyValueStrength(), new AddRemovalListener(), new AddStats(),
new AddExpirationTicker(), new AddMaximum(), new AddFastPath(), new AddDeques(),
new AddExpireAfterAccess(), new AddExpireAfterWrite(), new AddRefreshAfterWrite(),
new AddWriteBuffer(), new AddPacer(), new Finalize());
new AddPacer(), new Finalize());
final ZoneId timeZone = ZoneId.of("America/Los_Angeles");
final Path directory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ public final class Specifications {
public static final TypeName WRITE_ORDER_DEQUE =
ParameterizedTypeName.get(ClassName.get(PACKAGE_NAME, "WriteOrderDeque"), NODE);

public static final ClassName WRITE_QUEUE_TYPE =
ClassName.get(PACKAGE_NAME, "MpscGrowableArrayQueue");
public static final TypeName WRITE_QUEUE =
ParameterizedTypeName.get(WRITE_QUEUE_TYPE, ClassName.get(Runnable.class));

public static final TypeName EXPIRY = ParameterizedTypeName.get(
ClassName.get(PACKAGE_NAME, "Expiry"), kTypeVar, vTypeVar);
public static final TypeName TIMER_WHEEL = ParameterizedTypeName.get(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef<K, V>
/** The maximum duration before an entry expires. */
static final long MAXIMUM_EXPIRY = (Long.MAX_VALUE >> 1); // 150 years

final MpscGrowableArrayQueue<Runnable> writeBuffer;
final ConcurrentHashMap<Object, Node<K, V>> data;
@Nullable final CacheLoader<K, V> cacheLoader;
final PerformCleanupTask drainBuffersTask;
Expand Down Expand Up @@ -244,6 +245,7 @@ protected BoundedLocalCache(Caffeine<K, V> builder,
? new BoundedBuffer<>()
: Buffer.disabled();
accessPolicy = (evicts() || expiresAfterAccess()) ? this::onAccess : e -> {};
writeBuffer = new MpscGrowableArrayQueue<>(WRITE_BUFFER_MIN, WRITE_BUFFER_MAX);

if (evicts()) {
setMaximumSize(builder.getMaximum());
Expand Down Expand Up @@ -277,15 +279,6 @@ protected WriteOrderDeque<Node<K, V>> writeOrderDeque() {
throw new UnsupportedOperationException();
}

/** If the page replacement policy buffers writes. */
protected boolean buffersWrites() {
return false;
}

protected MpscGrowableArrayQueue<Runnable> writeBuffer() {
throw new UnsupportedOperationException();
}

@Override
public final Executor executor() {
return executor;
Expand Down Expand Up @@ -1372,25 +1365,23 @@ void setAccessTime(Node<K, V> node, long now) {
* @param task the pending operation to be applied
*/
void afterWrite(Runnable task) {
if (buffersWrites()) {
for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {
if (writeBuffer().offer(task)) {
scheduleAfterWrite();
return;
}
scheduleDrainBuffers();
for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {
if (writeBuffer.offer(task)) {
scheduleAfterWrite();
return;
}
scheduleDrainBuffers();
}

// The maintenance task may be scheduled but not running due to all of the executor's threads
// being busy. If all of the threads are writing into the cache then no progress can be made
// without assistance.
try {
performCleanUp(task);
} catch (RuntimeException e) {
logger.log(Level.SEVERE, "Exception thrown when performing the maintenance task", e);
}
} else {
scheduleAfterWrite();
// The maintenance task may be scheduled but not running due. This might occur due to all of the
// executor's threads being busy (perhaps writing into this cache), the write rate greatly
// exceeds the consuming rate, priority inversion, or if the executor silently discarded the
// maintenance task. In these scenarios then the writing threads cannot make progress and
// instead writers provide assistance by performing this work directly.
try {
performCleanUp(task);
} catch (RuntimeException e) {
logger.log(Level.SEVERE, "Exception thrown when performing the maintenance task", e);
}
}

Expand Down Expand Up @@ -1604,12 +1595,8 @@ static <K, V> void reorder(LinkedDeque<Node<K, V>> deque, Node<K, V> node) {
/** Drains the write buffer. */
@GuardedBy("evictionLock")
void drainWriteBuffer() {
if (!buffersWrites()) {
return;
}

for (int i = 0; i < WRITE_BUFFER_MAX; i++) {
Runnable task = writeBuffer().poll();
for (int i = 0; i <= WRITE_BUFFER_MAX; i++) {
Runnable task = writeBuffer.poll();
if (task == null) {
return;
}
Expand Down Expand Up @@ -1823,7 +1810,7 @@ public void clear() {

// Apply all pending writes
Runnable task;
while (buffersWrites() && (task = writeBuffer().poll()) != null) {
while ((task = writeBuffer.poll()) != null) {
task.run();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand Down Expand Up @@ -290,8 +289,8 @@ int getInitialCapacity() {
* with {@link #removalListener} or utilize asynchronous computations. A test may instead prefer
* to configure the cache to execute tasks directly on the same thread.
* <p>
* Beware that configuring a cache with an executor that throws {@link RejectedExecutionException}
* may experience non-deterministic behavior.
* Beware that configuring a cache with an executor that discards tasks or never runs them may
* experience non-deterministic behavior.
*
* @param executor the executor to use for asynchronous execution
* @return this {@code Caffeine} instance (for chaining)
Expand Down
Loading

0 comments on commit 8c7160d

Please sign in to comment.