Skip to content

Commit

Permalink
Fix timer wheel cascading and support cancelating a scheduled task
Browse files Browse the repository at this point in the history
When the timer wheel advances then higher level buckets evict to the
lower wheels. In some cases this reorganization failed, such as when
the nano time transitioned from a negative to a postive clock value.
The number of steps to turn the wheel is now determined by a more
accurate calculation. (fixes #541)

When a scheduler is enabled then a future task may exist against the
next expiration event to perform cache maintenance. If the cache
becomes empty beforehand, e.g. Map.clear(), then this future can be
canceled. (fixes #542)
  • Loading branch information
ben-manes committed May 1, 2021
1 parent f5a5e32 commit 28b104a
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,9 @@ void expireEntries() {
Pacer pacer = pacer();
if (pacer != null) {
long delay = getExpirationDelay(now);
if (delay != Long.MAX_VALUE) {
if (delay == Long.MAX_VALUE) {
pacer.cancel();
} else {
pacer.schedule(executor, drainBuffersTask, now, delay);
}
}
Expand Down Expand Up @@ -1816,6 +1818,12 @@ public void clear() {
removeNode(node, now);
}

// Cancel the scheduled cleanup
Pacer pacer = pacer();
if (pacer != null) {
pacer.cancel();
}

// Discard all pending reads
readBuffer.drainTo(e -> {});
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ public void invalidateAll() {

@Override
public long estimatedSize() {
return asyncCache().cache().size();
return asyncCache().cache().estimatedSize();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ public void schedule(Executor executor, Runnable command, long now, long delay)

if (future == null) {
// short-circuit an immediate scheduler causing an infinite loop during initialization
if (nextFireTime != 0) {
if (nextFireTime != 0L) {
return;
}
} else if ((nextFireTime - now) > 0) {
} else if ((nextFireTime - now) > 0L) {
// Determine whether to reschedule
if (maySkip(scheduleAt)) {
return;
Expand All @@ -63,12 +63,21 @@ public void schedule(Executor executor, Runnable command, long now, long delay)
future = scheduler.schedule(executor, command, actualDelay, TimeUnit.NANOSECONDS);
}

/** Attempts to cancel execution of the scheduled task, if present. */
public void cancel() {
if (future != null) {
future.cancel(/* mayInterruptIfRunning */ false);
nextFireTime = 0L;
future = null;
}
}

/**
* Returns if the current fire time is sooner, or if it is later and within the tolerance limit.
*/
boolean maySkip(long scheduleAt) {
long delta = (scheduleAt - nextFireTime);
return (delta >= 0) || (-delta <= TOLERANCE);
return (delta >= 0L) || (-delta <= TOLERANCE);
}

/** Returns the delay and sets the next fire time. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ public void advance(long currentTimeNanos) {
long previousTimeNanos = nanos;
try {
nanos = currentTimeNanos;

// If wrapping, temporarily shift the clock for a positive comparison
if ((previousTimeNanos < 0) && (currentTimeNanos > 0)) {
previousTimeNanos += Long.MAX_VALUE;
currentTimeNanos += Long.MAX_VALUE;
}

for (int i = 0; i < SHIFT.length; i++) {
long previousTicks = (previousTimeNanos >>> SHIFT[i]);
long currentTicks = (currentTimeNanos >>> SHIFT[i]);
Expand All @@ -121,20 +128,14 @@ public void advance(long currentTimeNanos) {
*/
void expire(int index, long previousTicks, long currentTicks) {
Node<K, V>[] timerWheel = wheel[index];
int mask = timerWheel.length - 1;

int start, end;
if ((currentTicks - previousTicks) >= timerWheel.length) {
end = timerWheel.length;
start = 0;
} else {
long mask = SPANS[index] - 1;
start = (int) (previousTicks & mask);
end = 1 + (int) (currentTicks & mask);
}
int steps = Math.min(1 + Math.abs((int) (currentTicks - previousTicks)), timerWheel.length);
int start = (int) (previousTicks & mask);
int end = start + steps;

int mask = timerWheel.length - 1;
for (int i = start; i < end; i++) {
Node<K, V> sentinel = timerWheel[(i & mask)];
Node<K, V> sentinel = timerWheel[i & mask];
Node<K, V> prev = sentinel.getPreviousInVariableOrder();
Node<K, V> node = sentinel.getNextInVariableOrder();
sentinel.setPreviousInVariableOrder(sentinel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import static com.github.benmanes.caffeine.cache.BLCHeader.DrainStatusRef.REQUIRED;
import static com.github.benmanes.caffeine.cache.BoundedLocalCache.EXPIRE_WRITE_TOLERANCE;
import static com.github.benmanes.caffeine.cache.BoundedLocalCache.PERCENT_MAIN_PROTECTED;
import static com.github.benmanes.caffeine.cache.testing.CacheSpec.Expiration.AFTER_ACCESS;
import static com.github.benmanes.caffeine.cache.testing.CacheSpec.Expiration.AFTER_WRITE;
import static com.github.benmanes.caffeine.cache.testing.CacheSpec.Expiration.VARIABLE;
import static com.github.benmanes.caffeine.cache.testing.RemovalListenerVerifier.verifyRemovalListener;
import static com.github.benmanes.caffeine.cache.testing.StatsVerifier.verifyStats;
import static com.github.benmanes.caffeine.testing.Awaits.await;
Expand All @@ -38,11 +41,14 @@
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -60,6 +66,7 @@
import com.github.benmanes.caffeine.cache.testing.CacheSpec;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheExecutor;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheExpiry;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheScheduler;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheWeigher;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Compute;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.ExecutorFailure;
Expand Down Expand Up @@ -854,4 +861,54 @@ public void put_expireTolerance_expiry(Cache<Integer, Integer> cache, CacheConte
assertThat(localCache.readBuffer.writes(), is(1));
assertThat(localCache.writeBuffer().producerIndex, is(8L));
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.EMPTY, scheduler = CacheScheduler.MOCKITO,
mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE },
expiry = { CacheExpiry.DISABLED, CacheExpiry.CREATE, CacheExpiry.WRITE, CacheExpiry.ACCESS },
expireAfterAccess = {Expire.DISABLED, Expire.ONE_MINUTE},
expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE}, expiryTime = Expire.ONE_MINUTE)
public void unschedule_cleanUp(Cache<Integer, Integer> cache, CacheContext context) {
Future<?> future = Mockito.mock(Future.class);
BoundedLocalCache<Integer, Integer> localCache = asBoundedLocalCache(cache);
doReturn(future).when(context.scheduler()).schedule(any(), any(), anyLong(), any());

for (int i = 0; i < 10; i++) {
cache.put(i, -i);
}
assertThat(localCache.pacer().nextFireTime, is(not(0L)));
assertThat(localCache.pacer().future, is(not(nullValue())));

context.ticker().advance(1, TimeUnit.HOURS);
cache.cleanUp();

verify(future).cancel(false);
assertThat(localCache.pacer().nextFireTime, is(0L));
assertThat(localCache.pacer().future, is(nullValue()));
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.EMPTY, scheduler = CacheScheduler.MOCKITO,
mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE },
expiry = { CacheExpiry.DISABLED, CacheExpiry.CREATE, CacheExpiry.WRITE, CacheExpiry.ACCESS },
expireAfterAccess = {Expire.DISABLED, Expire.ONE_MINUTE},
expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE}, expiryTime = Expire.ONE_MINUTE)
public void unschedule_invalidateAll(Cache<Integer, Integer> cache, CacheContext context) {
Future<?> future = Mockito.mock(Future.class);
BoundedLocalCache<Integer, Integer> localCache = asBoundedLocalCache(cache);
doReturn(future).when(context.scheduler()).schedule(any(), any(), anyLong(), any());

for (int i = 0; i < 10; i++) {
cache.put(i, -i);
}
assertThat(localCache.pacer().nextFireTime, is(not(0L)));
assertThat(localCache.pacer().future, is(not(nullValue())));

cache.invalidateAll();
verify(future).cancel(false);
assertThat(localCache.pacer().nextFireTime, is(0L));
assertThat(localCache.pacer().future, is(nullValue()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
Expand Down Expand Up @@ -65,7 +67,54 @@ public void afterMethod() throws Exception {
}

@Test
public void scheduledAfterNextFireTime_skip() {
public void schedule_initialize() {
long delay = random.nextInt(Ints.saturatedCast(Pacer.TOLERANCE));
doReturn(DisabledFuture.INSTANCE)
.when(scheduler).schedule(executor, command, Pacer.TOLERANCE, TimeUnit.NANOSECONDS);
pacer.schedule(executor, command, NOW, delay);

assertThat(pacer.future, is(DisabledFuture.INSTANCE));
assertThat(pacer.nextFireTime, is(NOW + Pacer.TOLERANCE));
}

@Test
public void schedule_initialize_recurse() {
long delay = random.nextInt(Ints.saturatedCast(Pacer.TOLERANCE));
doAnswer(invocation -> {
assertThat(pacer.future, is(nullValue()));
assertThat(pacer.nextFireTime, is(not(0L)));
pacer.schedule(executor, command, NOW, delay);
return DisabledFuture.INSTANCE;
}).when(scheduler).schedule(executor, command, Pacer.TOLERANCE, TimeUnit.NANOSECONDS);

pacer.schedule(executor, command, NOW, delay);
assertThat(pacer.future, is(DisabledFuture.INSTANCE));
assertThat(pacer.nextFireTime, is(NOW + Pacer.TOLERANCE));
}

@Test
public void schedule_cancel_schedule() {
long fireTime = NOW + Pacer.TOLERANCE;
long delay = random.nextInt(Ints.saturatedCast(Pacer.TOLERANCE));
doReturn(future)
.when(scheduler).schedule(executor, command, Pacer.TOLERANCE, TimeUnit.NANOSECONDS);

pacer.schedule(executor, command, NOW, delay);
assertThat(pacer.nextFireTime, is(fireTime));
assertThat(pacer.future, is(future));

pacer.cancel();
verify(future).cancel(false);
assertThat(pacer.nextFireTime, is(0L));
assertThat(pacer.future, is(nullValue()));

pacer.schedule(executor, command, NOW, delay);
assertThat(pacer.nextFireTime, is(fireTime));
assertThat(pacer.future, is(future));
}

@Test
public void scheduled_afterNextFireTime_skip() {
pacer.nextFireTime = NOW + ONE_MINUTE_IN_NANOS;
pacer.future = future;

Expand All @@ -78,7 +127,7 @@ public void scheduledAfterNextFireTime_skip() {
}

@Test
public void scheduledBeforeNextFireTime_skip() {
public void schedule_beforeNextFireTime_skip() {
pacer.nextFireTime = NOW + ONE_MINUTE_IN_NANOS;
pacer.future = future;

Expand All @@ -93,7 +142,7 @@ public void scheduledBeforeNextFireTime_skip() {
}

@Test
public void scheduledBeforeNextFireTime_minimumDelay() {
public void schedule_beforeNextFireTime_minimumDelay() {
pacer.nextFireTime = NOW + ONE_MINUTE_IN_NANOS;
pacer.future = future;

Expand All @@ -105,15 +154,15 @@ public void scheduledBeforeNextFireTime_minimumDelay() {
assertThat(pacer.future, is(DisabledFuture.INSTANCE));
assertThat(pacer.nextFireTime, is(NOW + Pacer.TOLERANCE));

verify(future).cancel(anyBoolean());
verify(future).cancel(false);
verify(scheduler).schedule(executor, command, Pacer.TOLERANCE, TimeUnit.NANOSECONDS);

verifyNoInteractions(executor, command);
verifyNoMoreInteractions(scheduler, future);
}

@Test
public void scheduledBeforeNextFireTime_customDelay() {
public void schedule_beforeNextFireTime_customDelay() {
pacer.nextFireTime = NOW + ONE_MINUTE_IN_NANOS;
pacer.future = future;

Expand All @@ -125,10 +174,28 @@ public void scheduledBeforeNextFireTime_customDelay() {
assertThat(pacer.future, is(DisabledFuture.INSTANCE));
assertThat(pacer.nextFireTime, is(NOW + delay));

verify(future).cancel(anyBoolean());
verify(future).cancel(false);
verify(scheduler).schedule(executor, command, delay, TimeUnit.NANOSECONDS);

verifyNoInteractions(executor, command);
verifyNoMoreInteractions(scheduler, future);
}

@Test
public void cancel_initialize() {
pacer.cancel();
assertThat(pacer.nextFireTime, is(0L));
assertThat(pacer.future, is(nullValue()));
}

@Test
public void cancel_scheduled() {
pacer.nextFireTime = NOW + ONE_MINUTE_IN_NANOS;
pacer.future = future;

pacer.cancel();
verify(future).cancel(false);
assertThat(pacer.nextFireTime, is(0L));
assertThat(pacer.future, is(nullValue()));
}
}
Loading

0 comments on commit 28b104a

Please sign in to comment.