Skip to content

Commit

Permalink
Fix write-time optimization for variable expiration (fixes #478)
Browse files Browse the repository at this point in the history
A high write rate to the same key can overwhelm the write buffer as it
may not drop entries and has a maximum capacity. When full this causes
backpressure to allow for the mainance task to catch up. A write only
needs to be recorded in this buffer when a major event occurs, such as
the entry's size changed or expiration time differs.

To improve throughput for expireAfterWrite a tolerance of 1s is used
to allow for skipping the write buffer and recording into the read
buffer instead. This improves throughput by 5x in a same key write
benchmark.

This optimization was not updated to take into account variable
expiration, where the expire time may change based on the calculation
determined by configured Expiry. This would cause the entry to not be
reordered in the TimerWheel, possibly delaying the automic removal
indefinitely. Now when the variable time differs by +/- 1s then the
write-time reordering is required.

Co-authored-by: Christopher Ng <[email protected]>
  • Loading branch information
ben-manes and facboy committed Dec 7, 2020
1 parent a54afaa commit 83ae0dc
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2026,7 +2026,7 @@ public Map<K, V> getAllPresent(Iterable<?> keys) {
int oldWeight;
boolean expired = false;
boolean mayUpdate = true;
boolean exceedsTolerance = true;
boolean exceedsTolerance = false;
synchronized (prior) {
if (!prior.isAlive()) {
continue;
Expand All @@ -2051,7 +2051,10 @@ public Map<K, V> getAllPresent(Iterable<?> keys) {
writer.write(key, value);
}
if (mayUpdate) {
exceedsTolerance = ((now - prior.getWriteTime()) > EXPIRE_WRITE_TOLERANCE);
exceedsTolerance =
(expiresAfterWrite() && (now - prior.getWriteTime()) > EXPIRE_WRITE_TOLERANCE)
|| (expiresVariable()
&& Math.abs(varTime - prior.getVariableTime()) > EXPIRE_WRITE_TOLERANCE);

setWriteTime(prior, now);
prior.setWeight(newWeight);
Expand All @@ -2075,7 +2078,7 @@ public Map<K, V> getAllPresent(Iterable<?> keys) {
int weightedDifference = mayUpdate ? (newWeight - oldWeight) : 0;
if ((oldValue == null) || (weightedDifference != 0) || expired) {
afterWrite(new UpdateTask(prior, weightedDifference));
} else if (!onlyIfAbsent && expiresAfterWrite() && exceedsTolerance) {
} else if (!onlyIfAbsent && exceedsTolerance) {
afterWrite(new UpdateTask(prior, weightedDifference));
} else {
if (mayUpdate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.github.benmanes.caffeine.cache.BLCHeader.DrainStatusRef.PROCESSING_TO_IDLE;
import static com.github.benmanes.caffeine.cache.BLCHeader.DrainStatusRef.PROCESSING_TO_REQUIRED;
import static com.github.benmanes.caffeine.cache.BLCHeader.DrainStatusRef.REQUIRED;
import static com.github.benmanes.caffeine.cache.BoundedLocalCache.EXPIRE_WRITE_TOLERANCE;
import static com.github.benmanes.caffeine.cache.BoundedLocalCache.PERCENT_MAIN_PROTECTED;
import static com.github.benmanes.caffeine.cache.testing.HasRemovalNotifications.hasRemovalNotifications;
import static com.github.benmanes.caffeine.cache.testing.HasStats.hasEvictionCount;
Expand All @@ -34,10 +35,13 @@
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.when;

import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -85,6 +89,8 @@ static BoundedLocalCache<Integer, Integer> asBoundedLocalCache(Cache<Integer, In
return (BoundedLocalCache<Integer, Integer>) cache.asMap();
}

/* --------------- Maintenance --------------- */

@Test
@SuppressWarnings("UnusedVariable")
public void cleanupTask_allowGc() {
Expand Down Expand Up @@ -169,6 +175,8 @@ public void scheduleDrainBuffers_rejected(Cache<Integer, Integer> cache, CacheCo
cache.put(context.absentKey(), context.absentValue());
}

/* --------------- Eviction --------------- */

@Test
public void putWeighted_noOverflow() {
Cache<Integer, Integer> cache = Caffeine.newBuilder()
Expand Down Expand Up @@ -708,4 +716,75 @@ private void adapt(Cache<Integer, Integer> cache,
// Fill main protected space
cache.asMap().keySet().stream().forEach(cache::getIfPresent);
}

/* --------------- Expiration --------------- */

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.EMPTY, initialCapacity = InitialCapacity.FULL,
expireAfterWrite = Expire.ONE_MINUTE)
public void put_expireTolerance_expireAfterWrite(
Cache<Integer, Integer> cache, CacheContext context) {
BoundedLocalCache<Integer, Integer> localCache = asBoundedLocalCache(cache);
boolean mayCheckReads = context.isStrongKeys() && context.isStrongValues()
&& localCache.readBuffer != Buffer.<Node<Integer, Integer>>disabled();

cache.put(1, 1);
assertThat(localCache.writeBuffer().producerIndex, is(2L));

// If within the tolerance, treat the update as a read
cache.put(1, 2);
if (mayCheckReads) {
assertThat(localCache.readBuffer.reads(), is(0));
assertThat(localCache.readBuffer.writes(), is(1));
}
assertThat(localCache.writeBuffer().producerIndex, is(2L));

// If exceeds the tolerance, treat the update as a write
context.ticker().advance(EXPIRE_WRITE_TOLERANCE + 1, TimeUnit.NANOSECONDS);
cache.put(1, 3);
if (mayCheckReads) {
assertThat(localCache.readBuffer.reads(), is(1));
assertThat(localCache.readBuffer.writes(), is(1));
}
assertThat(localCache.writeBuffer().producerIndex, is(4L));
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.EMPTY, expiry = CacheExpiry.MOCKITO, expiryTime = Expire.ONE_MINUTE)
public void put_expireTolerance_expiry(Cache<Integer, Integer> cache, CacheContext context) {
BoundedLocalCache<Integer, Integer> localCache = asBoundedLocalCache(cache);
cache.put(1, 1);
assertThat(localCache.writeBuffer().producerIndex, is(2L));

// If within the tolerance, treat the update as a read
cache.put(1, 2);
assertThat(localCache.readBuffer.reads(), is(0));
assertThat(localCache.readBuffer.writes(), is(1));
assertThat(localCache.writeBuffer().producerIndex, is(2L));

// If exceeds the tolerance, treat the update as a write
context.ticker().advance(EXPIRE_WRITE_TOLERANCE + 1, TimeUnit.NANOSECONDS);
cache.put(1, 3);
assertThat(localCache.readBuffer.reads(), is(1));
assertThat(localCache.readBuffer.writes(), is(1));
assertThat(localCache.writeBuffer().producerIndex, is(4L));

// If the expire time reduces by more than the tolerance, treat the update as a write
when(context.expiry().expireAfterUpdate(any(), any(), anyLong(), anyLong()))
.thenReturn(Expire.ONE_MILLISECOND.timeNanos());
cache.put(1, 4);
assertThat(localCache.readBuffer.reads(), is(1));
assertThat(localCache.readBuffer.writes(), is(1));
assertThat(localCache.writeBuffer().producerIndex, is(6L));

// If the expire time increases by more than the tolerance, treat the update as a write
when(context.expiry().expireAfterUpdate(any(), any(), anyLong(), anyLong()))
.thenReturn(Expire.FOREVER.timeNanos());
cache.put(1, 4);
assertThat(localCache.readBuffer.reads(), is(1));
assertThat(localCache.readBuffer.writes(), is(1));
assertThat(localCache.writeBuffer().producerIndex, is(8L));
}
}
1 change: 1 addition & 0 deletions checksum.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
<trusted-key id='94b291aef984a085' group='io.reactivex.rxjava3' />
<trusted-key id='5e2f2b3d474efe6b' group='it.unimi.dsi' />
<trusted-key id='823dba283a93ee3c' group='it.unimi.dsi' />
<trusted-key id='ca85ffe638d4407a' group='it.unimi.dsi' />
<trusted-key id='d908a43fb7ec07ac' group='jakarta.activation' />
<trusted-key id='0e325becb6962a24' group='jakarta.annotation' />
<trusted-key id='0aa3e5c3d232e79b' group='jakarta.inject' />
Expand Down
8 changes: 4 additions & 4 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
ext {
versions = [
akka: '2.6.10',
cache2k: '1.9.3.Alpha',
checkerFramework: '3.7.1',
cache2k: '1.9.4.Beta',
checkerFramework: '3.8.0',
coherence: '20.06',
collision: '0.3.3',
commonsCompress: '1.20',
Expand All @@ -41,7 +41,7 @@ ext {
elasticSearch: '7.10.0',
expiringMap: '0.5.9',
fastfilter: 'bf0b02297f',
fastutil: '8.4.3',
fastutil: '8.4.4',
flipTables: '1.1.0',
googleJavaFormat: '1.7',
guava: '30.0-jre',
Expand Down Expand Up @@ -97,7 +97,7 @@ ext {
semanticVersioning: '1.1.0',
shadow: '6.1.0',
sonarqube: '3.0',
spotbugs: '4.1.4',
spotbugs: '4.2.0',
spotbugsPlugin: '4.6.0',
stats: '0.2.2',
versions: '0.36.0',
Expand Down

0 comments on commit 83ae0dc

Please sign in to comment.