-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Assist maintenance task when executor is exhausted (fixes #90)
The write buffer is used to allow writers to update the eviction policy in a non-blocking manner. The maintenance work is delegated to an async task when possible to minimize request latencies. Previous iterations (Guava, CLHM) amortized it on the calling thread due to not having a system-wide executor to take advantage of. Previously when the write buffer was full the writing threads would spin, yield, and wait for the maintenance task to catch up. This was under the assumption that the task was running but starved out due to synthetic load testing, e.g. running `cache.put` with more threads than cores. The belief was that the write buffer would be full under normal usage and the maintenance task would be scheduled promptly. This assumption fails for workloads where every worker in the executor is updating the cache. This can happen in a synthetic refresh test, but also with an AsyncLoadingCache when futures complete. In that case the maintenance task is scheduled but unable to run, and all of the worker threads are spinnining endlessly trying to append to the write buffer. In this case we degrade to amortize the maintenance work on the caller. This allows progress to be made, avoids wasteful busy waiting, and should not increase the response penalty in most cases. That is because writers would have had to wait anyway and this would typically happen only on asynchronous non-user facing tasks (completers, refresh). This also removes the ugly Thread.yield() hack, which did look unnatural. Thanks goes to @DougLea for identifying the oversight that the executor may exhaust its threads, causing this problem.
- Loading branch information
Showing
5 changed files
with
83 additions
and
41 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,27 +32,40 @@ | |
import com.google.common.util.concurrent.ThreadFactoryBuilder; | ||
|
||
/** | ||
* A stress test to observe if the cache has a memory leak by not being able to drain the buffers | ||
* fast enough. | ||
* A stress test to observe if the cache is able to able to drain the buffers fast enough under a | ||
* synthetic load. | ||
* | ||
* @author [email protected] (Ben Manes) | ||
*/ | ||
public final class Stresser { | ||
private static final String[] STATUS = | ||
{ "Idle", "Required", "Processing -> Idle", "Processing -> Required" }; | ||
private static final int THREADS = 2 * Runtime.getRuntime().availableProcessors(); | ||
private static final int MAX_THREADS = 2 * Runtime.getRuntime().availableProcessors(); | ||
private static final int WRITE_MAX_SIZE = (1 << 12); | ||
private static final int TOTAL_KEYS = (1 << 20); | ||
private static final int MASK = TOTAL_KEYS - 1; | ||
private static final int STATUS_INTERVAL = 5; | ||
|
||
private final BoundedLocalCache<Integer, Integer> local; | ||
private final Cache<Integer, Integer> cache; | ||
private final LoadingCache<Integer, Integer> cache; | ||
private final Stopwatch stopwatch; | ||
private final Integer[] ints; | ||
|
||
private final int maximum; | ||
private final Stopwatch stopwatch; | ||
private final boolean reads = false; | ||
private enum Operation { | ||
READ(MAX_THREADS, TOTAL_KEYS), | ||
WRITE(MAX_THREADS, WRITE_MAX_SIZE), | ||
REFRESH(1, WRITE_MAX_SIZE); | ||
|
||
private final int maxThreads; | ||
private final int maxEntries; | ||
|
||
private Operation(int maxThreads, int maxEntries) { | ||
this.maxThreads = maxThreads; | ||
this.maxEntries = maxEntries; | ||
} | ||
} | ||
|
||
private static final Operation operation = Operation.REFRESH; | ||
|
||
public Stresser() { | ||
ThreadFactory threadFactory = new ThreadFactoryBuilder() | ||
|
@@ -61,11 +74,10 @@ public Stresser() { | |
.build(); | ||
Executors.newSingleThreadScheduledExecutor(threadFactory) | ||
.scheduleAtFixedRate(this::status, STATUS_INTERVAL, STATUS_INTERVAL, SECONDS); | ||
maximum = reads ? TOTAL_KEYS : WRITE_MAX_SIZE; | ||
cache = Caffeine.newBuilder() | ||
.maximumSize(maximum) | ||
.maximumSize(operation.maxEntries) | ||
.recordStats() | ||
.build(); | ||
.build(key -> key); | ||
local = (BoundedLocalCache<Integer, Integer>) cache.asMap(); | ||
ints = new Integer[TOTAL_KEYS]; | ||
Arrays.setAll(ints, key -> { | ||
|
@@ -78,15 +90,20 @@ public Stresser() { | |
} | ||
|
||
public void run() throws InterruptedException { | ||
ConcurrentTestHarness.timeTasks(THREADS, () -> { | ||
ConcurrentTestHarness.timeTasks(operation.maxThreads, () -> { | ||
int index = ThreadLocalRandom.current().nextInt(); | ||
for (;;) { | ||
Integer key = ints[index++ & MASK]; | ||
if (reads) { | ||
cache.getIfPresent(key); | ||
} else { | ||
cache.put(key, key); | ||
//Thread.yield(); | ||
switch (operation) { | ||
case READ: | ||
cache.getIfPresent(key); | ||
break; | ||
case WRITE: | ||
cache.put(key, key); | ||
break; | ||
case REFRESH: | ||
cache.refresh(key); | ||
break; | ||
} | ||
} | ||
}); | ||
|
@@ -95,14 +112,15 @@ public void run() throws InterruptedException { | |
private void status() { | ||
local.evictionLock.lock(); | ||
int pendingWrites = local.writeBuffer().size(); | ||
int drainStatus = local.drainStatus(); | ||
local.evictionLock.unlock(); | ||
|
||
LocalTime elapsedTime = LocalTime.ofSecondOfDay(stopwatch.elapsed(TimeUnit.SECONDS)); | ||
System.out.printf("---------- %s ----------%n", elapsedTime); | ||
System.out.printf("Pending reads: %,d; writes: %,d%n", local.readBuffer.size(), pendingWrites); | ||
System.out.printf("Drain status = %s%n", STATUS[local.drainStatus]); | ||
System.out.printf("Drain status = %s (%s)%n", STATUS[drainStatus], drainStatus); | ||
System.out.printf("Evictions = %,d%n", cache.stats().evictionCount()); | ||
System.out.printf("Size = %,d (max: %,d)%n", local.data.mappingCount(), maximum); | ||
System.out.printf("Size = %,d (max: %,d)%n", local.data.mappingCount(), operation.maxEntries); | ||
System.out.printf("Lock = [%s%n", StringUtils.substringAfter( | ||
local.evictionLock.toString(), "[")); | ||
System.out.printf("Pending tasks = %,d%n", | ||
|
@@ -121,4 +139,4 @@ private void status() { | |
public static void main(String[] args) throws Exception { | ||
new Stresser().run(); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters