From 419cfe7289d94f5bd394215e684f8cde7454c8a1 Mon Sep 17 00:00:00 2001 From: Ben Manes Date: Sun, 12 Jun 2016 19:41:19 -0700 Subject: [PATCH] Assist maintenance task when executor is exhausted (fixes #90) The write buffer is used to allow writers to update the eviction policy in a non-blocking manner. The maintenance work is delegated to an async task when possible to minimize request latencies. Previous iterations (Guava, CLHM) amortized it on the calling thread due to not having a system-wide executor to take advantage of. Previously when the write buffer was full the writing threads would spin, yield, and wait for the maintenance task to catch up. This was under the assumption that the task was running but starved out due to synthetic load testing, e.g. running `cache.put` with more threads than cores. The belief was that the write buffer would be full under normal usage and the maintenance task would be scheduled promptly. This assumption fails for workloads where every worker in the executor is updating the cache. This can happen in a synthetic refresh test, but also with an AsyncLoadingCache when futures complete. In that case the maintenance task is scheduled but unable to run, and all of the worker threads are spinnining endlessly trying to append to the write buffer. In this case we degrade to amortize the maintenance work on the caller. This allows progress to be made, avoids wasteful busy waiting, and should not increase the response penalty in most cases. That is because writers would have had to wait anyway and this would typically happen only on asynchronous non-user facing tasks (completers, refresh). This also removes the ugly Thread.yield() hack, which did look unnatural. Thanks goes to @DougLea for identifying the oversight that the executor may exhaust its threads, causing this problem. --- .../caffeine/cache/BoundedLocalCache.java | 41 ++++++++------ .../caffeine/cache/BoundedLocalCacheTest.java | 19 +++++++ .../benmanes/caffeine/cache/Stresser.java | 56 ++++++++++++------- gradle/dependencies.gradle | 6 +- gradle/wrapper/gradle-wrapper.properties | 2 +- 5 files changed, 83 insertions(+), 41 deletions(-) diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index 20362fc07d..587d04b00a 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -899,23 +899,23 @@ void afterWrite(@Nullable Node node, Runnable task, long now) { node.setWriteTime(now); } if (buffersWrites()) { - boolean submitted = false; - for (;;) { - for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) { - submitted = writeBuffer().offer(task); - if (submitted) { - break; - } - scheduleDrainBuffers(); - } - if (submitted) { - break; - } else { - Thread.yield(); + for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) { + if (writeBuffer().offer(task)) { + scheduleAfterWrite(); + return; } + scheduleDrainBuffers(); + } + + // The maintenance task may be scheduled but not running due to all of the executor's threads + // being busy. If all of the threads are writing into the cache then no progress can be made + // without assistance. + try { + performCleanUp(task); + } catch (RuntimeException e) { + logger.log(Level.SEVERE, "Exception thrown when performing the maintenance task", e); } } - scheduleAfterWrite(); } /** @@ -965,7 +965,7 @@ void scheduleDrainBuffers() { executor().execute(drainBuffersTask); } catch (Throwable t) { logger.log(Level.WARNING, "Exception thrown when submitting maintenance task", t); - performCleanUp(); + performCleanUp(/* ignored */ null); } finally { evictionLock.unlock(); } @@ -975,7 +975,7 @@ void scheduleDrainBuffers() { @Override public void cleanUp() { try { - performCleanUp(); + performCleanUp(/* ignored */ null); } catch (RuntimeException e) { logger.log(Level.SEVERE, "Exception thrown when performing the maintenance task", e); } @@ -985,11 +985,16 @@ public void cleanUp() { * Performs the maintenance work, blocking until the lock is acquired, and sets the state flags * to avoid excess scheduling attempts. Any exception thrown, such as by * {@link CacheWriter#delete()}, is propagated to the caller. + * + * @param task an additional pending task to run, or {@code null} if not present */ - void performCleanUp() { + void performCleanUp(@Nullable Runnable task) { evictionLock.lock(); try { lazySetDrainStatus(PROCESSING_TO_IDLE); + if (task != null) { + task.run(); + } maintenance(); } finally { if ((drainStatus() != PROCESSING_TO_IDLE) || !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) { @@ -2827,7 +2832,7 @@ public boolean exec() { @Override public void run() { - performCleanUp(); + performCleanUp(/* ignored */ null); } /** diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java index 36937059cc..7dd890e1bb 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java @@ -28,6 +28,8 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.util.List; import java.util.Map; @@ -370,6 +372,23 @@ public void fastpath(Cache cache, CacheContext context) { assertThat(localCache.readBuffer.reads(), is(1)); } + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.FULL, maximumSize = Maximum.FULL) + public void afterWrite_drainFullWriteBuffer(Cache cache, CacheContext context) { + BoundedLocalCache localCache = asBoundedLocalCache(cache); + Runnable task = Mockito.mock(Runnable.class); + localCache.drainStatus = PROCESSING_TO_IDLE; + int expectedCount = 1; + + while (localCache.writeBuffer().offer(task)) { + expectedCount++; + } + + localCache.afterWrite(null, task, 0L); + verify(task, times(expectedCount)).run(); + } + @Test(dataProvider = "caches") @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, population = Population.FULL, maximumSize = Maximum.FULL) diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java index c7bacb3150..c4ee4c1c34 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java @@ -32,27 +32,40 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; /** - * A stress test to observe if the cache has a memory leak by not being able to drain the buffers - * fast enough. + * A stress test to observe if the cache is able to able to drain the buffers fast enough under a + * synthetic load. * * @author ben.manes@gmail.com (Ben Manes) */ public final class Stresser { private static final String[] STATUS = { "Idle", "Required", "Processing -> Idle", "Processing -> Required" }; - private static final int THREADS = 2 * Runtime.getRuntime().availableProcessors(); + private static final int MAX_THREADS = 2 * Runtime.getRuntime().availableProcessors(); private static final int WRITE_MAX_SIZE = (1 << 12); private static final int TOTAL_KEYS = (1 << 20); private static final int MASK = TOTAL_KEYS - 1; private static final int STATUS_INTERVAL = 5; private final BoundedLocalCache local; - private final Cache cache; + private final LoadingCache cache; + private final Stopwatch stopwatch; private final Integer[] ints; - private final int maximum; - private final Stopwatch stopwatch; - private final boolean reads = false; + private enum Operation { + READ(MAX_THREADS, TOTAL_KEYS), + WRITE(MAX_THREADS, WRITE_MAX_SIZE), + REFRESH(1, WRITE_MAX_SIZE); + + private final int maxThreads; + private final int maxEntries; + + private Operation(int maxThreads, int maxEntries) { + this.maxThreads = maxThreads; + this.maxEntries = maxEntries; + } + } + + private static final Operation operation = Operation.REFRESH; public Stresser() { ThreadFactory threadFactory = new ThreadFactoryBuilder() @@ -61,11 +74,10 @@ public Stresser() { .build(); Executors.newSingleThreadScheduledExecutor(threadFactory) .scheduleAtFixedRate(this::status, STATUS_INTERVAL, STATUS_INTERVAL, SECONDS); - maximum = reads ? TOTAL_KEYS : WRITE_MAX_SIZE; cache = Caffeine.newBuilder() - .maximumSize(maximum) + .maximumSize(operation.maxEntries) .recordStats() - .build(); + .build(key -> key); local = (BoundedLocalCache) cache.asMap(); ints = new Integer[TOTAL_KEYS]; Arrays.setAll(ints, key -> { @@ -78,15 +90,20 @@ public Stresser() { } public void run() throws InterruptedException { - ConcurrentTestHarness.timeTasks(THREADS, () -> { + ConcurrentTestHarness.timeTasks(operation.maxThreads, () -> { int index = ThreadLocalRandom.current().nextInt(); for (;;) { Integer key = ints[index++ & MASK]; - if (reads) { - cache.getIfPresent(key); - } else { - cache.put(key, key); - //Thread.yield(); + switch (operation) { + case READ: + cache.getIfPresent(key); + break; + case WRITE: + cache.put(key, key); + break; + case REFRESH: + cache.refresh(key); + break; } } }); @@ -95,14 +112,15 @@ public void run() throws InterruptedException { private void status() { local.evictionLock.lock(); int pendingWrites = local.writeBuffer().size(); + int drainStatus = local.drainStatus(); local.evictionLock.unlock(); LocalTime elapsedTime = LocalTime.ofSecondOfDay(stopwatch.elapsed(TimeUnit.SECONDS)); System.out.printf("---------- %s ----------%n", elapsedTime); System.out.printf("Pending reads: %,d; writes: %,d%n", local.readBuffer.size(), pendingWrites); - System.out.printf("Drain status = %s%n", STATUS[local.drainStatus]); + System.out.printf("Drain status = %s (%s)%n", STATUS[drainStatus], drainStatus); System.out.printf("Evictions = %,d%n", cache.stats().evictionCount()); - System.out.printf("Size = %,d (max: %,d)%n", local.data.mappingCount(), maximum); + System.out.printf("Size = %,d (max: %,d)%n", local.data.mappingCount(), operation.maxEntries); System.out.printf("Lock = [%s%n", StringUtils.substringAfter( local.evictionLock.toString(), "[")); System.out.printf("Pending tasks = %,d%n", @@ -121,4 +139,4 @@ private void status() { public static void main(String[] args) throws Exception { new Stresser().run(); } -} +} \ No newline at end of file diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 7a847dad2d..0cf2a7d1bc 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -36,7 +36,7 @@ ext { jcache: '1.0.0', jsr305: '3.0.1', stream: '2.9.2', - univocity_parsers: '2.1.1', + univocity_parsers: '2.1.2', ycsb: '0.9.0', xz: '1.5', ] @@ -47,7 +47,7 @@ ext { jcache_tck: '1.0.1', jctools: '1.2', junit: '4.12', - mockito: '2.0.54-beta', + mockito: '2.0.55-beta', pax_exam: '4.9.1', testng: '6.9.11', truth: '0.24', @@ -59,7 +59,7 @@ ext { ehcache3: '3.0.2', elastic_search: '5.0.0-alpha3', infinispan: '9.0.0.Alpha2', - jackrabbit: '1.5.2', + jackrabbit: '1.5.3', jamm: '0.3.1', java_object_layout: '0.5', koloboke: '0.6.8', diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index c93683f4f8..1bc8925a1c 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-2.14-rc-2-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-2.14-rc-6-bin.zip