From 28b104ad376c43113ab24694742e19d972144702 Mon Sep 17 00:00:00 2001 From: Ben Manes Date: Thu, 29 Apr 2021 23:11:39 -0700 Subject: [PATCH] Fix timer wheel cascading and support cancelating a scheduled task When the timer wheel advances then higher level buckets evict to the lower wheels. In some cases this reorganization failed, such as when the nano time transitioned from a negative to a postive clock value. The number of steps to turn the wheel is now determined by a more accurate calculation. (fixes #541) When a scheduler is enabled then a future task may exist against the next expiration event to perform cache maintenance. If the cache becomes empty beforehand, e.g. Map.clear(), then this future can be canceled. (fixes #542) --- .../caffeine/cache/BoundedLocalCache.java | 10 +- .../caffeine/cache/LocalAsyncCache.java | 2 +- .../github/benmanes/caffeine/cache/Pacer.java | 15 +- .../benmanes/caffeine/cache/TimerWheel.java | 23 +-- .../caffeine/cache/BoundedLocalCacheTest.java | 57 ++++++ .../benmanes/caffeine/cache/PacerTest.java | 81 ++++++++- .../caffeine/cache/TimerWheelTest.java | 171 ++++++++++-------- 7 files changed, 262 insertions(+), 97 deletions(-) diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index 38595edf6c..c0e50da3b4 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -794,7 +794,9 @@ void expireEntries() { Pacer pacer = pacer(); if (pacer != null) { long delay = getExpirationDelay(now); - if (delay != Long.MAX_VALUE) { + if (delay == Long.MAX_VALUE) { + pacer.cancel(); + } else { pacer.schedule(executor, drainBuffersTask, now, delay); } } @@ -1816,6 +1818,12 @@ public void clear() { removeNode(node, now); } + // Cancel the scheduled cleanup + Pacer pacer = pacer(); + if (pacer != null) { + pacer.cancel(); + } + // Discard all pending reads readBuffer.drainTo(e -> {}); } finally { diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncCache.java index 48ce3a9743..3706ec9fef 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncCache.java @@ -555,7 +555,7 @@ public void invalidateAll() { @Override public long estimatedSize() { - return asyncCache().cache().size(); + return asyncCache().cache().estimatedSize(); } @Override diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Pacer.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Pacer.java index 7bededbcc0..6d5b331ced 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Pacer.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Pacer.java @@ -49,10 +49,10 @@ public void schedule(Executor executor, Runnable command, long now, long delay) if (future == null) { // short-circuit an immediate scheduler causing an infinite loop during initialization - if (nextFireTime != 0) { + if (nextFireTime != 0L) { return; } - } else if ((nextFireTime - now) > 0) { + } else if ((nextFireTime - now) > 0L) { // Determine whether to reschedule if (maySkip(scheduleAt)) { return; @@ -63,12 +63,21 @@ public void schedule(Executor executor, Runnable command, long now, long delay) future = scheduler.schedule(executor, command, actualDelay, TimeUnit.NANOSECONDS); } + /** Attempts to cancel execution of the scheduled task, if present. */ + public void cancel() { + if (future != null) { + future.cancel(/* mayInterruptIfRunning */ false); + nextFireTime = 0L; + future = null; + } + } + /** * Returns if the current fire time is sooner, or if it is later and within the tolerance limit. */ boolean maySkip(long scheduleAt) { long delta = (scheduleAt - nextFireTime); - return (delta >= 0) || (-delta <= TOLERANCE); + return (delta >= 0L) || (-delta <= TOLERANCE); } /** Returns the delay and sets the next fire time. */ diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/TimerWheel.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/TimerWheel.java index 105826b596..a92c707eb6 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/TimerWheel.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/TimerWheel.java @@ -98,6 +98,13 @@ public void advance(long currentTimeNanos) { long previousTimeNanos = nanos; try { nanos = currentTimeNanos; + + // If wrapping, temporarily shift the clock for a positive comparison + if ((previousTimeNanos < 0) && (currentTimeNanos > 0)) { + previousTimeNanos += Long.MAX_VALUE; + currentTimeNanos += Long.MAX_VALUE; + } + for (int i = 0; i < SHIFT.length; i++) { long previousTicks = (previousTimeNanos >>> SHIFT[i]); long currentTicks = (currentTimeNanos >>> SHIFT[i]); @@ -121,20 +128,14 @@ public void advance(long currentTimeNanos) { */ void expire(int index, long previousTicks, long currentTicks) { Node[] timerWheel = wheel[index]; + int mask = timerWheel.length - 1; - int start, end; - if ((currentTicks - previousTicks) >= timerWheel.length) { - end = timerWheel.length; - start = 0; - } else { - long mask = SPANS[index] - 1; - start = (int) (previousTicks & mask); - end = 1 + (int) (currentTicks & mask); - } + int steps = Math.min(1 + Math.abs((int) (currentTicks - previousTicks)), timerWheel.length); + int start = (int) (previousTicks & mask); + int end = start + steps; - int mask = timerWheel.length - 1; for (int i = start; i < end; i++) { - Node sentinel = timerWheel[(i & mask)]; + Node sentinel = timerWheel[i & mask]; Node prev = sentinel.getPreviousInVariableOrder(); Node node = sentinel.getNextInVariableOrder(); sentinel.setPreviousInVariableOrder(sentinel); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java index 22dd7bf13c..9d3926ec32 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java @@ -21,6 +21,9 @@ import static com.github.benmanes.caffeine.cache.BLCHeader.DrainStatusRef.REQUIRED; import static com.github.benmanes.caffeine.cache.BoundedLocalCache.EXPIRE_WRITE_TOLERANCE; import static com.github.benmanes.caffeine.cache.BoundedLocalCache.PERCENT_MAIN_PROTECTED; +import static com.github.benmanes.caffeine.cache.testing.CacheSpec.Expiration.AFTER_ACCESS; +import static com.github.benmanes.caffeine.cache.testing.CacheSpec.Expiration.AFTER_WRITE; +import static com.github.benmanes.caffeine.cache.testing.CacheSpec.Expiration.VARIABLE; import static com.github.benmanes.caffeine.cache.testing.RemovalListenerVerifier.verifyRemovalListener; import static com.github.benmanes.caffeine.cache.testing.StatsVerifier.verifyStats; import static com.github.benmanes.caffeine.testing.Awaits.await; @@ -38,11 +41,14 @@ import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -60,6 +66,7 @@ import com.github.benmanes.caffeine.cache.testing.CacheSpec; import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheExecutor; import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheExpiry; +import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheScheduler; import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheWeigher; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Compute; import com.github.benmanes.caffeine.cache.testing.CacheSpec.ExecutorFailure; @@ -854,4 +861,54 @@ public void put_expireTolerance_expiry(Cache cache, CacheConte assertThat(localCache.readBuffer.writes(), is(1)); assertThat(localCache.writeBuffer().producerIndex, is(8L)); } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, scheduler = CacheScheduler.MOCKITO, + mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE }, + expiry = { CacheExpiry.DISABLED, CacheExpiry.CREATE, CacheExpiry.WRITE, CacheExpiry.ACCESS }, + expireAfterAccess = {Expire.DISABLED, Expire.ONE_MINUTE}, + expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE}, expiryTime = Expire.ONE_MINUTE) + public void unschedule_cleanUp(Cache cache, CacheContext context) { + Future future = Mockito.mock(Future.class); + BoundedLocalCache localCache = asBoundedLocalCache(cache); + doReturn(future).when(context.scheduler()).schedule(any(), any(), anyLong(), any()); + + for (int i = 0; i < 10; i++) { + cache.put(i, -i); + } + assertThat(localCache.pacer().nextFireTime, is(not(0L))); + assertThat(localCache.pacer().future, is(not(nullValue()))); + + context.ticker().advance(1, TimeUnit.HOURS); + cache.cleanUp(); + + verify(future).cancel(false); + assertThat(localCache.pacer().nextFireTime, is(0L)); + assertThat(localCache.pacer().future, is(nullValue())); + } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, scheduler = CacheScheduler.MOCKITO, + mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE }, + expiry = { CacheExpiry.DISABLED, CacheExpiry.CREATE, CacheExpiry.WRITE, CacheExpiry.ACCESS }, + expireAfterAccess = {Expire.DISABLED, Expire.ONE_MINUTE}, + expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE}, expiryTime = Expire.ONE_MINUTE) + public void unschedule_invalidateAll(Cache cache, CacheContext context) { + Future future = Mockito.mock(Future.class); + BoundedLocalCache localCache = asBoundedLocalCache(cache); + doReturn(future).when(context.scheduler()).schedule(any(), any(), anyLong(), any()); + + for (int i = 0; i < 10; i++) { + cache.put(i, -i); + } + assertThat(localCache.pacer().nextFireTime, is(not(0L))); + assertThat(localCache.pacer().future, is(not(nullValue()))); + + cache.invalidateAll(); + verify(future).cancel(false); + assertThat(localCache.pacer().nextFireTime, is(0L)); + assertThat(localCache.pacer().future, is(nullValue())); + } } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/PacerTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/PacerTest.java index d371b8ae97..7eea700269 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/PacerTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/PacerTest.java @@ -17,7 +17,9 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; -import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; @@ -65,7 +67,54 @@ public void afterMethod() throws Exception { } @Test - public void scheduledAfterNextFireTime_skip() { + public void schedule_initialize() { + long delay = random.nextInt(Ints.saturatedCast(Pacer.TOLERANCE)); + doReturn(DisabledFuture.INSTANCE) + .when(scheduler).schedule(executor, command, Pacer.TOLERANCE, TimeUnit.NANOSECONDS); + pacer.schedule(executor, command, NOW, delay); + + assertThat(pacer.future, is(DisabledFuture.INSTANCE)); + assertThat(pacer.nextFireTime, is(NOW + Pacer.TOLERANCE)); + } + + @Test + public void schedule_initialize_recurse() { + long delay = random.nextInt(Ints.saturatedCast(Pacer.TOLERANCE)); + doAnswer(invocation -> { + assertThat(pacer.future, is(nullValue())); + assertThat(pacer.nextFireTime, is(not(0L))); + pacer.schedule(executor, command, NOW, delay); + return DisabledFuture.INSTANCE; + }).when(scheduler).schedule(executor, command, Pacer.TOLERANCE, TimeUnit.NANOSECONDS); + + pacer.schedule(executor, command, NOW, delay); + assertThat(pacer.future, is(DisabledFuture.INSTANCE)); + assertThat(pacer.nextFireTime, is(NOW + Pacer.TOLERANCE)); + } + + @Test + public void schedule_cancel_schedule() { + long fireTime = NOW + Pacer.TOLERANCE; + long delay = random.nextInt(Ints.saturatedCast(Pacer.TOLERANCE)); + doReturn(future) + .when(scheduler).schedule(executor, command, Pacer.TOLERANCE, TimeUnit.NANOSECONDS); + + pacer.schedule(executor, command, NOW, delay); + assertThat(pacer.nextFireTime, is(fireTime)); + assertThat(pacer.future, is(future)); + + pacer.cancel(); + verify(future).cancel(false); + assertThat(pacer.nextFireTime, is(0L)); + assertThat(pacer.future, is(nullValue())); + + pacer.schedule(executor, command, NOW, delay); + assertThat(pacer.nextFireTime, is(fireTime)); + assertThat(pacer.future, is(future)); + } + + @Test + public void scheduled_afterNextFireTime_skip() { pacer.nextFireTime = NOW + ONE_MINUTE_IN_NANOS; pacer.future = future; @@ -78,7 +127,7 @@ public void scheduledAfterNextFireTime_skip() { } @Test - public void scheduledBeforeNextFireTime_skip() { + public void schedule_beforeNextFireTime_skip() { pacer.nextFireTime = NOW + ONE_MINUTE_IN_NANOS; pacer.future = future; @@ -93,7 +142,7 @@ public void scheduledBeforeNextFireTime_skip() { } @Test - public void scheduledBeforeNextFireTime_minimumDelay() { + public void schedule_beforeNextFireTime_minimumDelay() { pacer.nextFireTime = NOW + ONE_MINUTE_IN_NANOS; pacer.future = future; @@ -105,7 +154,7 @@ public void scheduledBeforeNextFireTime_minimumDelay() { assertThat(pacer.future, is(DisabledFuture.INSTANCE)); assertThat(pacer.nextFireTime, is(NOW + Pacer.TOLERANCE)); - verify(future).cancel(anyBoolean()); + verify(future).cancel(false); verify(scheduler).schedule(executor, command, Pacer.TOLERANCE, TimeUnit.NANOSECONDS); verifyNoInteractions(executor, command); @@ -113,7 +162,7 @@ public void scheduledBeforeNextFireTime_minimumDelay() { } @Test - public void scheduledBeforeNextFireTime_customDelay() { + public void schedule_beforeNextFireTime_customDelay() { pacer.nextFireTime = NOW + ONE_MINUTE_IN_NANOS; pacer.future = future; @@ -125,10 +174,28 @@ public void scheduledBeforeNextFireTime_customDelay() { assertThat(pacer.future, is(DisabledFuture.INSTANCE)); assertThat(pacer.nextFireTime, is(NOW + delay)); - verify(future).cancel(anyBoolean()); + verify(future).cancel(false); verify(scheduler).schedule(executor, command, delay, TimeUnit.NANOSECONDS); verifyNoInteractions(executor, command); verifyNoMoreInteractions(scheduler, future); } + + @Test + public void cancel_initialize() { + pacer.cancel(); + assertThat(pacer.nextFireTime, is(0L)); + assertThat(pacer.future, is(nullValue())); + } + + @Test + public void cancel_scheduled() { + pacer.nextFireTime = NOW + ONE_MINUTE_IN_NANOS; + pacer.future = future; + + pacer.cancel(); + verify(future).cancel(false); + assertThat(pacer.nextFireTime, is(0L)); + assertThat(pacer.future, is(nullValue())); + } } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/TimerWheelTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/TimerWheelTest.java index e66e704444..4c1888cb4e 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/TimerWheelTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/TimerWheelTest.java @@ -54,6 +54,7 @@ import org.testng.annotations.Test; import com.google.common.collect.ImmutableList; +import com.google.common.primitives.Longs; import it.unimi.dsi.fastutil.longs.LongArrayList; @@ -64,7 +65,8 @@ @SuppressWarnings("GuardedBy") public final class TimerWheelTest { private static final Random random = new Random(); - private static final long NOW = random.nextLong(); + private static final long[] CLOCKS = { -SPANS[0] + 1, 0L, 0xfffffffc0000000L, + Long.MAX_VALUE - SPANS[0] + 1, random.nextLong() }; @Captor ArgumentCaptor> captor; @Mock BoundedLocalCache cache; @@ -85,93 +87,96 @@ public void afterMethod() throws Exception { } @Test(dataProvider = "schedule") - public void schedule(long nanos, int expired) { + public void schedule(long clock, long duration, int expired) { when(cache.evictEntry(captor.capture(), any(), anyLong())).thenReturn(true); - timerWheel.nanos = NOW; + timerWheel.nanos = clock; for (int timeout : new int[] { 25, 90, 240 }) { - timerWheel.schedule(new Timer(NOW + TimeUnit.SECONDS.toNanos(timeout))); + timerWheel.schedule(new Timer(clock + TimeUnit.SECONDS.toNanos(timeout))); } - timerWheel.advance(NOW + nanos); + timerWheel.advance(clock + duration); verify(cache, times(expired)).evictEntry(any(), any(), anyLong()); for (Node node : captor.getAllValues()) { - assertThat(node.getVariableTime(), is(lessThan(NOW + nanos))); + assertThat(node.getVariableTime(), is(lessThan(clock + duration))); } } - @DataProvider(name = "schedule") - public Object[][] providesSchedule() { - return new Object[][] { - { TimeUnit.SECONDS.toNanos(10), 0 }, - { TimeUnit.MINUTES.toNanos(3), 2 }, - { TimeUnit.MINUTES.toNanos(10), 3 } - }; - } - @Test(dataProvider = "fuzzySchedule") - public void schedule_fuzzy(long clock, long nanos, long[] times) { + public void schedule_fuzzy(long clock, long duration, long[] times) { when(cache.evictEntry(captor.capture(), any(), anyLong())).thenReturn(true); timerWheel.nanos = clock; int expired = 0; for (long timeout : times) { - if (timeout <= nanos) { + if (timeout <= duration) { expired++; } timerWheel.schedule(new Timer(timeout)); } - timerWheel.advance(nanos); + timerWheel.advance(duration); verify(cache, times(expired)).evictEntry(any(), any(), anyLong()); for (Node node : captor.getAllValues()) { - assertThat(node.getVariableTime(), is(lessThan(nanos))); + assertThat(node.getVariableTime(), is(lessThan(duration))); } - checkTimerWheel(nanos); + checkTimerWheel(duration); + } + + + @Test(dataProvider = "clock") + public void advance(long clock) { + when(cache.evictEntry(captor.capture(), any(), anyLong())).thenReturn(true); + + timerWheel.nanos = clock; + timerWheel.schedule(new Timer(timerWheel.nanos + SPANS[0])); + + timerWheel.advance(clock + 13 * SPANS[0]); + verify(cache).evictEntry(any(), any(), anyLong()); } - @Test - public void getExpirationDelay_empty() { + @Test(dataProvider = "clock") + public void getExpirationDelay_empty(long clock) { when(cache.evictEntry(any(), any(), anyLong())).thenReturn(true); - timerWheel.nanos = NOW; + timerWheel.nanos = clock; assertThat(timerWheel.getExpirationDelay(), is(Long.MAX_VALUE)); } - @Test - public void getExpirationDelay_firstWheel() { + @Test(dataProvider = "clock") + public void getExpirationDelay_firstWheel(long clock) { when(cache.evictEntry(any(), any(), anyLong())).thenReturn(true); - timerWheel.nanos = NOW; + timerWheel.nanos = clock; long delay = Duration.ofSeconds(1).toNanos(); - timerWheel.schedule(new Timer(NOW + delay)); + timerWheel.schedule(new Timer(clock + delay)); assertThat(timerWheel.getExpirationDelay(), is(lessThanOrEqualTo(SPANS[0]))); } - @Test - public void getExpirationDelay_lastWheel() { + @Test(dataProvider = "clock") + public void getExpirationDelay_lastWheel(long clock) { when(cache.evictEntry(any(), any(), anyLong())).thenReturn(true); - timerWheel.nanos = NOW; + timerWheel.nanos = clock; long delay = Duration.ofDays(14).toNanos(); - timerWheel.schedule(new Timer(NOW + delay)); + timerWheel.schedule(new Timer(clock + delay)); assertThat(timerWheel.getExpirationDelay(), is(lessThanOrEqualTo(delay))); } - @Test - public void getExpirationDelay_hierarchy() { + @Test(dataProvider = "clock") + public void getExpirationDelay_hierarchy(long clock) { when(cache.evictEntry(any(), any(), anyLong())).thenReturn(true); - timerWheel.nanos = NOW; + timerWheel.nanos = clock; - long t15 = NOW + Duration.ofSeconds(15).toNanos(); // in wheel[0] - long t80 = NOW + Duration.ofSeconds(80).toNanos(); // in wheel[1] + long t15 = clock + Duration.ofSeconds(15).toNanos(); // in wheel[0] + long t80 = clock + Duration.ofSeconds(80).toNanos(); // in wheel[1] timerWheel.schedule(new Timer(t15)); timerWheel.schedule(new Timer(t80)); - long t45 = NOW + Duration.ofSeconds(45).toNanos(); // discard T15, T80 in wheel[1] + long t45 = clock + Duration.ofSeconds(45).toNanos(); // discard T15, T80 in wheel[1] timerWheel.advance(t45); - long t95 = NOW + Duration.ofSeconds(95).toNanos(); // in wheel[0], but expires after T80 + long t95 = clock + Duration.ofSeconds(95).toNanos(); // in wheel[0], but expires after T80 timerWheel.schedule(new Timer(t95)); long expectedDelay = (t80 - t45); @@ -180,13 +185,13 @@ public void getExpirationDelay_hierarchy() { } @Test(dataProvider = "fuzzySchedule", invocationCount = 25) - public void getExpirationDelay_fuzzy(long clock, long nanos, long[] times) { + public void getExpirationDelay_fuzzy(long clock, long duration, long[] times) { when(cache.evictEntry(any(), any(), anyLong())).thenReturn(true); timerWheel.nanos = clock; for (long timeout : times) { timerWheel.schedule(new Timer(timeout)); } - timerWheel.advance(nanos); + timerWheel.advance(duration); long minDelay = Long.MAX_VALUE; int minSpan = Integer.MAX_VALUE; @@ -219,6 +224,22 @@ public void getExpirationDelay_fuzzy(long clock, long nanos, long[] times) { } } + @DataProvider(name = "clock") + public Iterator providesClock() { + return Longs.asList(CLOCKS).stream().map(o -> (Object) o).iterator(); + } + + @DataProvider(name = "schedule") + public Iterator providesSchedule() { + List args = new ArrayList<>(); + for (long clock : CLOCKS) { + args.add(new Object[] { clock, TimeUnit.SECONDS.toNanos(10), 0 }); + args.add(new Object[] { clock, TimeUnit.MINUTES.toNanos(3), 2 }); + args.add(new Object[] { clock, TimeUnit.MINUTES.toNanos(10), 3 }); + } + return args.iterator(); + } + @DataProvider(name = "fuzzySchedule") public Object[][] providesFuzzySchedule() { long[] times = new long[5_000]; @@ -227,17 +248,17 @@ public Object[][] providesFuzzySchedule() { for (int i = 0; i < times.length; i++) { times[i] = ThreadLocalRandom.current().nextLong(clock + 1, bound); } - long nanos = ThreadLocalRandom.current().nextLong(clock + 1, bound); - return new Object[][] {{ clock, nanos, times }}; + long duration = ThreadLocalRandom.current().nextLong(clock + 1, bound); + return new Object[][] {{ clock, duration, times }}; } - private void checkTimerWheel(long nanos) { + private void checkTimerWheel(long duration) { for (int i = 0; i < timerWheel.wheel.length; i++) { for (int j = 0; j < timerWheel.wheel[i].length; j++) { for (long timer : getTimers(timerWheel.wheel[i][j])) { - if (timer <= nanos) { + if (timer <= duration) { throw new AssertionError(String.format("wheel[%s][%d] by %ss", i, j, - TimeUnit.NANOSECONDS.toSeconds(nanos - timer))); + TimeUnit.NANOSECONDS.toSeconds(duration - timer))); } } } @@ -253,20 +274,20 @@ private LongArrayList getTimers(Node sentinel) { return timers; } - @Test - public void reschedule() { + @Test(dataProvider = "clock") + public void reschedule(long clock) { when(cache.evictEntry(captor.capture(), any(), anyLong())).thenReturn(true); - timerWheel.nanos = NOW; + timerWheel.nanos = clock; - Timer timer = new Timer(NOW + TimeUnit.MINUTES.toNanos(15)); + Timer timer = new Timer(clock + TimeUnit.MINUTES.toNanos(15)); timerWheel.schedule(timer); Node startBucket = timer.getNextInVariableOrder(); - timer.setVariableTime(NOW + TimeUnit.HOURS.toNanos(2)); + timer.setVariableTime(clock + TimeUnit.HOURS.toNanos(2)); timerWheel.reschedule(timer); assertThat(timer.getNextInVariableOrder(), is(not(startBucket))); - timerWheel.advance(NOW + TimeUnit.DAYS.toNanos(1)); + timerWheel.advance(clock + TimeUnit.DAYS.toNanos(1)); checkEmpty(); } @@ -280,20 +301,20 @@ private void checkEmpty() { } } - @Test - public void deschedule() { - Timer timer = new Timer(NOW + 100); - timerWheel.nanos = NOW; + @Test(dataProvider = "clock") + public void deschedule(long clock) { + Timer timer = new Timer(clock + 100); + timerWheel.nanos = clock; timerWheel.schedule(timer); timerWheel.deschedule(timer); assertThat(timer.getNextInVariableOrder(), is(nullValue())); assertThat(timer.getPreviousInVariableOrder(), is(nullValue())); } - @Test - public void deschedule_notScheduled() { - timerWheel.nanos = NOW; - timerWheel.deschedule(new Timer(NOW + 100)); + @Test(dataProvider = "clock") + public void deschedule_notScheduled(long clock) { + timerWheel.nanos = clock; + timerWheel.deschedule(new Timer(clock + 100)); } @Test(dataProvider = "fuzzySchedule") @@ -312,17 +333,17 @@ public void deschedule_fuzzy(long clock, long nanos, long[] times) { checkTimerWheel(nanos); } - @Test - public void expire_reschedule() { + @Test(dataProvider = "clock") + public void expire_reschedule(long clock) { when(cache.evictEntry(captor.capture(), any(), anyLong())).thenAnswer(invocation -> { Timer timer = (Timer) invocation.getArgument(0); timer.setVariableTime(timerWheel.nanos + 100); return false; }); - timerWheel.nanos = NOW; - timerWheel.schedule(new Timer(NOW + 100)); - timerWheel.advance(NOW + TimerWheel.SPANS[0]); + timerWheel.nanos = clock; + timerWheel.schedule(new Timer(clock + 100)); + timerWheel.advance(clock + SPANS[0]); verify(cache).evictEntry(any(), any(), anyLong()); assertThat(captor.getValue().getNextInVariableOrder(), is(not(nullValue()))); @@ -330,10 +351,10 @@ public void expire_reschedule() { } @Test(dataProvider = "cascade") - public void cascade(long nanos, long timeout, int span) { - timerWheel.nanos = NOW; - timerWheel.schedule(new Timer(NOW + timeout)); - timerWheel.advance(NOW + nanos); + public void cascade(long clock, long duration, long timeout, int span) { + timerWheel.nanos = clock; + timerWheel.schedule(new Timer(clock + timeout)); + timerWheel.advance(clock + duration); int count = 0; for (int i = 0; i <= span; i++) { @@ -347,11 +368,13 @@ public void cascade(long nanos, long timeout, int span) { @DataProvider(name = "cascade") public Iterator providesCascade() { List args = new ArrayList<>(); - for (int i = 1; i < TimerWheel.SPANS.length - 1; i++) { - long duration = TimerWheel.SPANS[i]; - long timeout = ThreadLocalRandom.current().nextLong(duration + 1, 2 * duration); - long nanos = ThreadLocalRandom.current().nextLong(duration + 1, timeout - 1); - args.add(new Object[] { nanos, timeout, i}); + for (int i = 1; i < SPANS.length - 1; i++) { + long span = SPANS[i]; + long timeout = ThreadLocalRandom.current().nextLong(span + 1, 2 * span); + long duration = ThreadLocalRandom.current().nextLong(span + 1, timeout - 1); + for (long clock : CLOCKS) { + args.add(new Object[] { clock, duration, timeout, i}); + } } return args.iterator(); } @@ -380,7 +403,7 @@ private List snapshot(boolean ascending, int limit, Function t @DataProvider(name="snapshot") public Iterator providesSnaphot() { List scenarios = new ArrayList<>(); - for (long clock : new long[] {0L, NOW }) { + for (long clock : CLOCKS) { for (int limit : new int[] { 10, 100 }) { scenarios.addAll(Arrays.asList( new Object[] { /* ascending */ true, limit, clock, Mockito.mock(Function.class) },