From de72842ab7577af6725ba35be992c75daa37defc Mon Sep 17 00:00:00 2001 From: Alex Peck Date: Tue, 7 May 2024 18:36:30 -0700 Subject: [PATCH] ConcurrentLfu time-based expiry (#516) * wheel+nodes * fix typo * 64bit * +ConcurrentLfuCore * undo bitops * undo bitops * node * cleanup merge * simplify generics * outline tests * more tests * comments * use Duration, cleanup * rough end to end * nullability static analysis * schedule * assert wheel pos * port all tests * test e2e * policy * cleanup * test coverage * explicit interface impl * rem comment * mem layout --------- --- .../BitFaster.Caching.UnitTests.csproj | 1 + .../ExpireAfterAccessTests.cs | 41 ++ .../ExpireAfterWriteTests.cs | 42 ++ .../Lfu/ConcurrentLfuCoreTests.cs | 202 +++++++ .../Lfu/ConcurrentTLfuSoakTests.cs | 44 ++ .../Lfu/ConcurrentTLfuTests.cs | 184 +++++++ .../Lfu/NodeMemoryLayoutDumps.cs | 96 ++++ .../Lfu/TimerWheelTests.cs | 423 +++++++++++++++ BitFaster.Caching.UnitTests/Timed.cs | 6 +- BitFaster.Caching/Duration.cs | 18 + BitFaster.Caching/ExpireAfterAccess.cs | 31 ++ BitFaster.Caching/ExpireAfterWrite.cs | 31 ++ BitFaster.Caching/Lfu/ConcurrentLfu.cs | 498 +++++++++--------- BitFaster.Caching/Lfu/ConcurrentLfuCore.cs | 77 ++- BitFaster.Caching/Lfu/ConcurrentTLfu.cs | 193 +++++++ BitFaster.Caching/Lfu/LfuNode.cs | 199 ++++--- BitFaster.Caching/Lfu/NodePolicy.cs | 114 +++- BitFaster.Caching/Lfu/TimerWheel.cs | 287 ++++++++++ 18 files changed, 2147 insertions(+), 340 deletions(-) create mode 100644 BitFaster.Caching.UnitTests/ExpireAfterAccessTests.cs create mode 100644 BitFaster.Caching.UnitTests/ExpireAfterWriteTests.cs create mode 100644 BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuCoreTests.cs create mode 100644 BitFaster.Caching.UnitTests/Lfu/ConcurrentTLfuSoakTests.cs create mode 100644 BitFaster.Caching.UnitTests/Lfu/ConcurrentTLfuTests.cs create mode 100644 BitFaster.Caching.UnitTests/Lfu/NodeMemoryLayoutDumps.cs create mode 100644 BitFaster.Caching.UnitTests/Lfu/TimerWheelTests.cs create mode 100644 BitFaster.Caching/ExpireAfterAccess.cs create mode 100644 BitFaster.Caching/ExpireAfterWrite.cs create mode 100644 BitFaster.Caching/Lfu/ConcurrentTLfu.cs create mode 100644 BitFaster.Caching/Lfu/TimerWheel.cs diff --git a/BitFaster.Caching.UnitTests/BitFaster.Caching.UnitTests.csproj b/BitFaster.Caching.UnitTests/BitFaster.Caching.UnitTests.csproj index 03625289..40a51510 100644 --- a/BitFaster.Caching.UnitTests/BitFaster.Caching.UnitTests.csproj +++ b/BitFaster.Caching.UnitTests/BitFaster.Caching.UnitTests.csproj @@ -13,6 +13,7 @@ + all diff --git a/BitFaster.Caching.UnitTests/ExpireAfterAccessTests.cs b/BitFaster.Caching.UnitTests/ExpireAfterAccessTests.cs new file mode 100644 index 00000000..4c64737e --- /dev/null +++ b/BitFaster.Caching.UnitTests/ExpireAfterAccessTests.cs @@ -0,0 +1,41 @@ +using System; +using FluentAssertions; +using Xunit; + +namespace BitFaster.Caching.UnitTests +{ + public class ExpireAfterAccessTests + { + private readonly Duration expiry = Duration.FromMinutes(1); + private readonly ExpireAfterAccess expiryCalculator; + + public ExpireAfterAccessTests() + { + expiryCalculator = new(expiry.ToTimeSpan()); + } + + [Fact] + public void TimeToExpireReturnsCtorArg() + { + expiryCalculator.TimeToExpire.Should().Be(expiry.ToTimeSpan()); + } + + [Fact] + public void AfterCreateReturnsTimeToExpire() + { + expiryCalculator.GetExpireAfterCreate(1, 2).Should().Be(expiry); + } + + [Fact] + public void AfteReadReturnsTimeToExpire() + { + expiryCalculator.GetExpireAfterRead(1, 2, Duration.SinceEpoch()).Should().Be(expiry); + } + + [Fact] + public void AfteUpdateReturnsTimeToExpire() + { + expiryCalculator.GetExpireAfterUpdate(1, 2, Duration.SinceEpoch()).Should().Be(expiry); + } + } +} diff --git a/BitFaster.Caching.UnitTests/ExpireAfterWriteTests.cs b/BitFaster.Caching.UnitTests/ExpireAfterWriteTests.cs new file mode 100644 index 00000000..57774eed --- /dev/null +++ b/BitFaster.Caching.UnitTests/ExpireAfterWriteTests.cs @@ -0,0 +1,42 @@ +using System; +using FluentAssertions; +using Xunit; + +namespace BitFaster.Caching.UnitTests +{ + public class ExpireAfterWriteTests + { + private readonly Duration expiry = Duration.FromMinutes(1); + private readonly ExpireAfterWrite expiryCalculator; + + public ExpireAfterWriteTests() + { + expiryCalculator = new(expiry.ToTimeSpan()); + } + + [Fact] + public void TimeToExpireReturnsCtorArg() + { + expiryCalculator.TimeToExpire.Should().Be(expiry.ToTimeSpan()); + } + + [Fact] + public void AfterCreateReturnsTimeToExpire() + { + expiryCalculator.GetExpireAfterCreate(1, 2).Should().Be(expiry); + } + + [Fact] + public void AfteReadReturnsCurrentTimeToExpire() + { + var current = new Duration(123); + expiryCalculator.GetExpireAfterRead(1, 2, current).Should().Be(current); + } + + [Fact] + public void AfteUpdateReturnsTimeToExpire() + { + expiryCalculator.GetExpireAfterUpdate(1, 2, Duration.SinceEpoch()).Should().Be(expiry); + } + } +} diff --git a/BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuCoreTests.cs b/BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuCoreTests.cs new file mode 100644 index 00000000..b4e97c7d --- /dev/null +++ b/BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuCoreTests.cs @@ -0,0 +1,202 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Threading.Tasks; +using BitFaster.Caching.Lfu; +using FluentAssertions; +using Xunit; + +namespace BitFaster.Caching.UnitTests.Lfu +{ + public abstract class ConcurrentLfuCoreTests + { + protected readonly TimeSpan timeToLive = TimeSpan.FromMilliseconds(200); + protected readonly int capacity = 20; + + private ConcurrentLfuTests.ValueFactory valueFactory = new(); + + private ICache lfu; + + public abstract ICache Create(); + public abstract void DoMaintenance(ICache cache); + + public ConcurrentLfuCoreTests() + { + lfu = Create(); + } + + [Fact] + public void EvictionPolicyCapacityReturnsCapacity() + { + lfu.Policy.Eviction.Value.Capacity.Should().Be(capacity); + } + + [Fact] + public void WhenKeyIsRequestedItIsCreatedAndCached() + { + var result1 = lfu.GetOrAdd(1, valueFactory.Create); + var result2 = lfu.GetOrAdd(1, valueFactory.Create); + + valueFactory.timesCalled.Should().Be(1); + result1.Should().Be(result2); + } +#if NETCOREAPP3_0_OR_GREATER + [Fact] + public void WhenKeyIsRequestedWithArgItIsCreatedAndCached() + { + var result1 = lfu.GetOrAdd(1, valueFactory.Create, 9); + var result2 = lfu.GetOrAdd(1, valueFactory.Create, 17); + + valueFactory.timesCalled.Should().Be(1); + result1.Should().Be(result2); + } +#endif + [Fact] + public async Task WhenKeyIsRequesteItIsCreatedAndCachedAsync() + { + var asyncLfu = lfu as IAsyncCache; + var result1 = await asyncLfu.GetOrAddAsync(1, valueFactory.CreateAsync); + var result2 = await asyncLfu.GetOrAddAsync(1, valueFactory.CreateAsync); + + valueFactory.timesCalled.Should().Be(1); + result1.Should().Be(result2); + } + +#if NETCOREAPP3_0_OR_GREATER + [Fact] + public async Task WhenKeyIsRequestedWithArgItIsCreatedAndCachedAsync() + { + var asyncLfu = lfu as IAsyncCache; + var result1 = await asyncLfu.GetOrAddAsync(1, valueFactory.CreateAsync, 9); + var result2 = await asyncLfu.GetOrAddAsync(1, valueFactory.CreateAsync, 17); + + valueFactory.timesCalled.Should().Be(1); + result1.Should().Be(result2); + } +#endif + + [Fact] + public void WhenItemIsUpdatedItIsUpdated() + { + lfu.GetOrAdd(1, k => k); + lfu.AddOrUpdate(1, 2); + + lfu.TryGet(1, out var value).Should().BeTrue(); + value.Should().Be(2); + } + + [Fact] + public void WhenItemDoesNotExistUpdatedAddsItem() + { + lfu.AddOrUpdate(1, 2); + + lfu.TryGet(1, out var value).Should().BeTrue(); + value.Should().Be(2); + } + + + [Fact] + public void WhenKeyExistsTryRemoveRemovesItem() + { + lfu.GetOrAdd(1, k => k); + + lfu.TryRemove(1).Should().BeTrue(); + lfu.TryGet(1, out _).Should().BeFalse(); + } + +#if NETCOREAPP3_0_OR_GREATER + [Fact] + public void WhenKeyExistsTryRemoveReturnsValue() + { + lfu.GetOrAdd(1, valueFactory.Create); + + lfu.TryRemove(1, out var value).Should().BeTrue(); + value.Should().Be(1); + } + + [Fact] + public void WhenItemExistsTryRemoveRemovesItem() + { + lfu.GetOrAdd(1, k => k); + + lfu.TryRemove(new KeyValuePair(1, 1)).Should().BeTrue(); + lfu.TryGet(1, out _).Should().BeFalse(); + } + + [Fact] + public void WhenItemDoesntMatchTryRemoveDoesNotRemove() + { + lfu.GetOrAdd(1, k => k); + + lfu.TryRemove(new KeyValuePair(1, 2)).Should().BeFalse(); + lfu.TryGet(1, out var value).Should().BeTrue(); + } +#endif + + [Fact] + public void WhenClearedCacheIsEmpty() + { + lfu.GetOrAdd(1, k => k); + lfu.GetOrAdd(2, k => k); + + lfu.Clear(); + + lfu.Count.Should().Be(0); + lfu.TryGet(1, out var _).Should().BeFalse(); + } + + [Fact] + public void TrimRemovesNItems() + { + for (int i = 0; i < 25; i++) + { + lfu.GetOrAdd(i, k => k); + } + DoMaintenance(lfu); + + lfu.Count.Should().Be(20); + + lfu.Policy.Eviction.Value.Trim(5); + DoMaintenance(lfu); + + lfu.Count.Should().Be(15); + } + + [Fact] + public void WhenItemsAddedGenericEnumerateContainsKvps() + { + lfu.GetOrAdd(1, k => k); + lfu.GetOrAdd(2, k => k); + + var enumerator = lfu.GetEnumerator(); + enumerator.MoveNext().Should().BeTrue(); + enumerator.Current.Should().Be(new KeyValuePair(1, 1)); + enumerator.MoveNext().Should().BeTrue(); + enumerator.Current.Should().Be(new KeyValuePair(2, 2)); + } + + [Fact] + public void WhenItemsAddedEnumerateContainsKvps() + { + lfu.GetOrAdd(1, k => k); + lfu.GetOrAdd(2, k => k); + + var enumerable = (IEnumerable)lfu; + enumerable.Should().BeEquivalentTo(new[] { new KeyValuePair(1, 1), new KeyValuePair(2, 2) }); + } + } + + public class ConcurrentTLfuWrapperTests : ConcurrentLfuCoreTests + { + public override ICache Create() + { + return new ConcurrentTLfu(capacity, new ExpireAfterWrite(timeToLive)); + } + + public override void DoMaintenance(ICache cache) + { + var tlfu = cache as ConcurrentTLfu; + tlfu?.DoMaintenance(); + } + } +} diff --git a/BitFaster.Caching.UnitTests/Lfu/ConcurrentTLfuSoakTests.cs b/BitFaster.Caching.UnitTests/Lfu/ConcurrentTLfuSoakTests.cs new file mode 100644 index 00000000..1fa61d3b --- /dev/null +++ b/BitFaster.Caching.UnitTests/Lfu/ConcurrentTLfuSoakTests.cs @@ -0,0 +1,44 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using BitFaster.Caching.Lfu; +using Xunit; +using Xunit.Abstractions; + +namespace BitFaster.Caching.UnitTests.Lfu +{ + [Collection("Soak")] + public class ConcurrentTLfuSoakTests + { + private const int soakIterations = 10; + private const int threads = 4; + private const int loopIterations = 100_000; + + private readonly ITestOutputHelper output; + + public ConcurrentTLfuSoakTests(ITestOutputHelper testOutputHelper) + { + this.output = testOutputHelper; + } + + [Theory] + [Repeat(soakIterations)] + public async Task GetOrAddWithExpiry(int iteration) + { + var lfu = new ConcurrentTLfu(20, new ExpireAfterWrite(TimeSpan.FromMilliseconds(10))); + + await Threaded.RunAsync(threads, async () => { + for (int i = 0; i < loopIterations; i++) + { + await lfu.GetOrAddAsync(i + 1, i => Task.FromResult(i.ToString())); + } + }); + + this.output.WriteLine($"iteration {iteration} keys={string.Join(" ", lfu.Keys)}"); + + // TODO: integrity check, including TimerWheel + } + } +} diff --git a/BitFaster.Caching.UnitTests/Lfu/ConcurrentTLfuTests.cs b/BitFaster.Caching.UnitTests/Lfu/ConcurrentTLfuTests.cs new file mode 100644 index 00000000..60b70079 --- /dev/null +++ b/BitFaster.Caching.UnitTests/Lfu/ConcurrentTLfuTests.cs @@ -0,0 +1,184 @@ +using System; +using System.Runtime.InteropServices; +using BitFaster.Caching.Lfu; +using BitFaster.Caching.Scheduler; +using FluentAssertions; +using Xunit; + +namespace BitFaster.Caching.UnitTests.Lfu +{ + // This could use foreground scheduler to make it more deterministic. + public class ConcurrentTLfuTests + { + private readonly TimeSpan timeToLive = TimeSpan.FromMilliseconds(200); + private readonly int capacity = 9; + private ConcurrentTLfu lfu; + + private Lru.ValueFactory valueFactory = new Lru.ValueFactory(); + + // on MacOS time measurement seems to be less stable, give longer pause + private int ttlWaitMlutiplier = RuntimeInformation.IsOSPlatform(OSPlatform.OSX) ? 8 : 2; + + public ConcurrentTLfuTests() + { + lfu = new ConcurrentTLfu(capacity, new ExpireAfterWrite(timeToLive)); + } + + [Fact] + public void ConstructAddAndRetrieveWithCustomComparerReturnsValue() + { + var lfu = new ConcurrentTLfu(9, 9, new NullScheduler(), StringComparer.OrdinalIgnoreCase, new ExpireAfterWrite(timeToLive)); + + lfu.GetOrAdd("foo", k => 1); + + lfu.TryGet("FOO", out var value).Should().BeTrue(); + value.Should().Be(1); + } + + [Fact] + public void MetricsHasValueIsTrue() + { + var x = new ConcurrentTLfu(3, new TestExpiryCalculator()); + x.Metrics.HasValue.Should().BeTrue(); + } + + [Fact] + public void EventsHasValueIsFalse() + { + var x = new ConcurrentTLfu(3, new TestExpiryCalculator()); + x.Events.HasValue.Should().BeFalse(); + } + + [Fact] + public void DefaultSchedulerIsThreadPool() + { + lfu.Scheduler.Should().BeOfType(); + } + + [Fact] + public void WhenCalculatorIsAfterWritePolicyIsAfterWrite() + { + lfu.Policy.ExpireAfterWrite.HasValue.Should().BeTrue(); + lfu.Policy.ExpireAfterWrite.Value.TimeToLive.Should().Be(timeToLive); + } + + [Fact] + public void WhenCalculatorIsAfterAccessPolicyIsAfterAccess() + { + lfu = new ConcurrentTLfu(capacity, new ExpireAfterAccess(timeToLive)); + + lfu.Policy.ExpireAfterAccess.HasValue.Should().BeTrue(); + lfu.Policy.ExpireAfterAccess.Value.TimeToLive.Should().Be(timeToLive); + } + + [Fact] + public void WhenCalculatorIsCustomPolicyIsAfter() + { + lfu = new ConcurrentTLfu(capacity, new TestExpiryCalculator()); + + lfu.Policy.ExpireAfter.HasValue.Should().BeTrue(); + (lfu as ITimePolicy).TimeToLive.Should().Be(TimeSpan.Zero); + } + + [Fact] + public void WhenKeyExistsTryGetTimeToExpireReturnsExpiry() + { + var calc = new TestExpiryCalculator(); + calc.ExpireAfterCreate = (k, v) => Duration.FromMinutes(1); + lfu = new ConcurrentTLfu(capacity, calc); + + lfu.GetOrAdd(1, k => "1"); + + lfu.Policy.ExpireAfter.Value.TryGetTimeToExpire(1, out var timeToExpire).Should().BeTrue(); + timeToExpire.Should().BeCloseTo(TimeSpan.FromMinutes(1), TimeSpan.FromMilliseconds(50)); + } + + [Fact] + public void WhenKeyDoesNotExistTryGetTimeToExpireReturnsFalse() + { + lfu = new ConcurrentTLfu(capacity, new TestExpiryCalculator()); + + lfu.Policy.ExpireAfter.Value.TryGetTimeToExpire(1, out _).Should().BeFalse(); + } + + [Fact] + public void WhenKeyTypeMismatchTryGetTimeToExpireReturnsFalse() + { + lfu = new ConcurrentTLfu(capacity, new TestExpiryCalculator()); + + lfu.Policy.ExpireAfter.Value.TryGetTimeToExpire("string", out _).Should().BeFalse(); + } + + // policy can expire after write + + [Fact] + public void WhenItemIsNotExpiredItIsNotRemoved() + { + lfu.GetOrAdd(1, valueFactory.Create); + + lfu.TryGet(1, out var value).Should().BeTrue(); + } + + [Fact] + public void WhenItemIsExpiredItIsRemoved() + { + Timed.Execute( + lfu, + lfu => + { + lfu.GetOrAdd(1, valueFactory.Create); + return lfu; + }, + timeToLive.MultiplyBy(ttlWaitMlutiplier), + lfu => + { + lfu.TryGet(1, out var value).Should().BeFalse(); + } + ); + } + + [Fact] + public void WhenItemIsExpiredItIsRemoved2() + { + Timed.Execute( + lfu, + lfu => + { + lfu.GetOrAdd(1, valueFactory.Create); + return lfu; + }, + TimeSpan.FromSeconds(2), + lfu => + { + // This is a bit flaky below 2 secs pause - seems like it doesnt always + // remove the item + lfu.Policy.ExpireAfterWrite.Value.TrimExpired(); + lfu.Count.Should().Be(0); + } + ); + } + + [Fact] + public void WhenItemIsUpdatedTtlIsExtended() + { + Timed.Execute( + lfu, + lfu => + { + lfu.GetOrAdd(1, valueFactory.Create); + return lfu; + }, + timeToLive.MultiplyBy(ttlWaitMlutiplier), + lfu => + { + lfu.TryUpdate(1, "3"); + + // If we defer computing time to the maintenance loop, we + // need to call maintenance here for the timestamp to be updated + lfu.DoMaintenance(); + lfu.TryGet(1, out var value).Should().BeTrue(); + } + ); + } + } +} diff --git a/BitFaster.Caching.UnitTests/Lfu/NodeMemoryLayoutDumps.cs b/BitFaster.Caching.UnitTests/Lfu/NodeMemoryLayoutDumps.cs new file mode 100644 index 00000000..3ed11a65 --- /dev/null +++ b/BitFaster.Caching.UnitTests/Lfu/NodeMemoryLayoutDumps.cs @@ -0,0 +1,96 @@ +using BitFaster.Caching.Lfu; +using ObjectLayoutInspector; +using Xunit; +using Xunit.Abstractions; + +namespace BitFaster.Caching.UnitTests.Lfu +{ + public class NodeMemoryLayoutDumps + { + private readonly ITestOutputHelper testOutputHelper; + + public NodeMemoryLayoutDumps(ITestOutputHelper testOutputHelper) + { + this.testOutputHelper = testOutputHelper; + } + + //Type layout for 'AccessOrderNode`2' + //Size: 48 bytes.Paddings: 2 bytes(%4 of empty space) + //|====================================================| + //| Object Header(8 bytes) | + //|----------------------------------------------------| + //| Method Table Ptr(8 bytes) | + //|====================================================| + //| 0-7: LfuNodeList`2 list(8 bytes) | + //|----------------------------------------------------| + //| 8-15: LfuNode`2 next(8 bytes) | + //|----------------------------------------------------| + //| 16-23: LfuNode`2 prev(8 bytes) | + //|----------------------------------------------------| + //| 24-31: Object Key(8 bytes) | + //|----------------------------------------------------| + //| 32-39: Object k__BackingField(8 bytes) | + //|----------------------------------------------------| + //| 40-43: Position k__BackingField(4 bytes) | + //| |===============================| | + //| | 0-3: Int32 value__(4 bytes) | | + //| |===============================| | + //|----------------------------------------------------| + //| 44: Boolean wasRemoved(1 byte) | + //|----------------------------------------------------| + //| 45: Boolean wasDeleted(1 byte) | + //|----------------------------------------------------| + //| 46-47: padding(2 bytes) | + //|====================================================| + [Fact] + public void DumpAccessOrderNode() + { + var layout = TypeLayout.GetLayout>(includePaddings: true); + testOutputHelper.WriteLine(layout.ToString()); + } + + //Type layout for 'TimeOrderNode`2' + //Size: 72 bytes.Paddings: 2 bytes(%2 of empty space) + //|====================================================| + //| Object Header(8 bytes) | + //|----------------------------------------------------| + //| Method Table Ptr(8 bytes) | + //|====================================================| + //| 0-7: LfuNodeList`2 list(8 bytes) | + //|----------------------------------------------------| + //| 8-15: LfuNode`2 next(8 bytes) | + //|----------------------------------------------------| + //| 16-23: LfuNode`2 prev(8 bytes) | + //|----------------------------------------------------| + //| 24-31: Object Key(8 bytes) | + //|----------------------------------------------------| + //| 32-39: Object k__BackingField(8 bytes) | + //|----------------------------------------------------| + //| 40-43: Position k__BackingField(4 bytes) | + //| |===============================| | + //| | 0-3: Int32 value__(4 bytes) | | + //| |===============================| | + //|----------------------------------------------------| + //| 44: Boolean wasRemoved(1 byte) | + //|----------------------------------------------------| + //| 45: Boolean wasDeleted(1 byte) | + //|----------------------------------------------------| + //| 46-47: padding(2 bytes) | + //|----------------------------------------------------| + //| 48-55: TimeOrderNode`2 prevTime(8 bytes) | + //|----------------------------------------------------| + //| 56-63: TimeOrderNode`2 nextTime(8 bytes) | + //|----------------------------------------------------| + //| 64-71: Duration timeToExpire(8 bytes) | + //| |===========================| | + //| | 0-7: Int64 raw(8 bytes) | | + //| |===========================| | + //|====================================================| + [Fact] + public void DumpTimeOrderNode() + { + var layout = TypeLayout.GetLayout>(includePaddings: true); + testOutputHelper.WriteLine(layout.ToString()); + } + } +} diff --git a/BitFaster.Caching.UnitTests/Lfu/TimerWheelTests.cs b/BitFaster.Caching.UnitTests/Lfu/TimerWheelTests.cs new file mode 100644 index 00000000..80fced34 --- /dev/null +++ b/BitFaster.Caching.UnitTests/Lfu/TimerWheelTests.cs @@ -0,0 +1,423 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq; +using System.Xml.Linq; +using BitFaster.Caching.Lfu; +using BitFaster.Caching.Lru; +using BitFaster.Caching.Scheduler; +using FluentAssertions; +using FluentAssertions.Common; +using Xunit; +using Xunit.Abstractions; + +namespace BitFaster.Caching.UnitTests.Lfu +{ + public class TimerWheelTests + { + private readonly ITestOutputHelper output; + + private readonly TimerWheel timerWheel; + private readonly WheelEnumerator wheelEnumerator; + private readonly LfuNodeList lfuNodeList; + private readonly ExpireAfterPolicy policy; + private ConcurrentLfuCore, ExpireAfterPolicy> cache; + + public TimerWheelTests(ITestOutputHelper testOutputHelper) + { + output = testOutputHelper; + lfuNodeList = new(); + timerWheel = new(); + wheelEnumerator = new(timerWheel, testOutputHelper); + policy = new ExpireAfterPolicy(new TestExpiryCalculator()); + cache = new( + Defaults.ConcurrencyLevel, 3, new ThreadPoolScheduler(), EqualityComparer.Default, () => { }, policy); + } + + [Theory] + [MemberData(nameof(ScheduleData))] + public void WhenAdvanceExpiredNodesExpire(long clock, Duration duration, int expiredCount) + { + var items = new List>(); + timerWheel.time = clock; + + foreach (int timeout in new int[] { 25, 90, 240 }) + { + var node = AddNode(1, new DisposeTracker(), new Duration(clock) + Duration.FromSeconds(timeout)); + items.Add(node); + timerWheel.Schedule(node); + } + + timerWheel.Advance(ref cache, new Duration(clock) + duration); + + var expired = items.Where(n => ((DisposeTracker)n.Value).Expired); + expired.Count().Should().Be(expiredCount); + + foreach (var node in expired) + { + node.GetTimestamp().Should().BeLessThanOrEqualTo(clock + duration.raw); + } + } + + [Theory] + [MemberData(nameof(ClockData))] + public void WhenAdvancedPastItemExpiryItemIsEvicted(long clock2) + { + timerWheel.time = clock2; + + var item = new DisposeTracker(); + timerWheel.Schedule(AddNode(1, item, new Duration(clock2 + TimerWheel.Spans[0]))); + + timerWheel.Advance(ref cache, new Duration(clock2 + 13 * TimerWheel.Spans[0])); + + item.Expired.Should().BeTrue(); + } + + [Theory] + [MemberData(nameof(ClockData))] + public void WhenAdvanceDifferentWheelsNodeIsRescheduled(long clock) + { + var clockD = new Duration(clock); + timerWheel.time = clock; + + Duration t15 = clockD + Duration.FromSeconds(15); + Duration t120 = clockD + Duration.FromSeconds(120); + + timerWheel.Schedule(AddNode(15, new DisposeTracker(), t15)); // wheel 0 + timerWheel.Schedule(AddNode(120, new DisposeTracker(), t120)); // wheel 1 + + wheelEnumerator.Count().Should().Be(2); + var initialPosition = wheelEnumerator.PositionOf(120); + + Duration t45 = clockD + Duration.FromSeconds(45); // discard T15, T120 in wheel[1] + timerWheel.Advance(ref cache, t45); + + lfuNodeList.Count.Should().Be(1); // verify discarded T15 + wheelEnumerator.PositionOf(15).Should().Be(WheelPosition.None); + + Duration t110 = clockD + Duration.FromSeconds(110); + timerWheel.Advance(ref cache, t110); + + lfuNodeList.Count.Should().Be(1); // verify not discarded, T120 in wheel[0] + var rescheduledPosition = wheelEnumerator.PositionOf(120); + + rescheduledPosition.Should().BeLessThan(initialPosition); + + Duration t130 = clockD + Duration.FromSeconds(130); + timerWheel.Advance(ref cache, t130); + + lfuNodeList.Count.Should().Be(0); // verify discarded T120 + wheelEnumerator.PositionOf(120).Should().Be(WheelPosition.None); + } + + [Fact] + public void WhenAdvanceOverflowsAndItemIsExpiredItemIsEvicted() + { + timerWheel.time = -(TimerWheel.Spans[3] * 365) / 2; + var item = new DisposeTracker(); + timerWheel.Schedule(AddNode(1, item, new Duration(timerWheel.time + TimerWheel.Spans[0]))); + + timerWheel.Advance(ref cache, new Duration(timerWheel.time + (TimerWheel.Spans[3] * 365))); + + this.lfuNodeList.Count.Should().Be(0); + } + +#if NET6_0_OR_GREATER + [Theory] + [MemberData(nameof(ClockData))] + public void WhenAdvanceBackwardsNothingIsEvicted(long clock) + { + var random = new Random(); + timerWheel.time = clock; + + long max = Duration.FromMinutes(60 * 24 * 10).raw; + for (int i = 0; i < 1_000; i++) + { + long duration = random.NextInt64(max); + timerWheel.Schedule(AddNode(i, new DisposeTracker(), new Duration(clock + duration))); + } + + for (int i = 0; i < TimerWheel.Buckets.Length; i++) + { + timerWheel.Advance(ref cache, new Duration(clock - 3 * TimerWheel.Spans[i])); + } + + this.lfuNodeList.Count.Should().Be(1_000); + } +#endif + + [Fact] + public void WhenAdvanceThrowsCurrentTimeIsNotAdvanced() + { + Duration clock = Duration.SinceEpoch(); + timerWheel.time = clock.raw; + + timerWheel.Schedule(AddNode(1, new DisposeThrows(), new Duration(clock.raw + TimerWheel.Spans[1]))); + + // This should expire the node, call evict, then throw via DisposeThrows.Dispose() + Action advance = () => timerWheel.Advance(ref cache, new Duration(clock.raw + int.MaxValue)); + advance.Should().Throw(); + + timerWheel.time.Should().Be(clock.raw); + } + + [Theory] + [MemberData(nameof(ClockData))] + public void WhenEmptyGetExpirationDelayIsMax(long clock) + { + timerWheel.time = clock; + timerWheel.GetExpirationDelay().raw.Should().Be(long.MaxValue); + } + + [Theory] + [MemberData(nameof(ClockData))] + public void WhenScheduledMaxNodeIsInOuterWheel(long clock) + { + var clockD = new Duration(clock); + timerWheel.time = clock; + + Duration tMax = clockD + new Duration(long.MaxValue); + + timerWheel.Schedule(AddNode(1, new DisposeTracker(), tMax)); + + var initialPosition = wheelEnumerator.PositionOf(1); + initialPosition.wheel.Should().Be(4); + } + + [Theory] + [MemberData(nameof(ClockData))] + public void WhenScheduledInFirstWheelDelayIsUpdated(long clock) + { + timerWheel.time = clock; + + Duration delay = Duration.FromSeconds(1); + + timerWheel.Schedule(new TimeOrderNode(1, new DisposeTracker()) { TimeToExpire = new Duration(clock) + delay }); + + timerWheel.GetExpirationDelay().raw.Should().BeLessThanOrEqualTo(TimerWheel.Spans[0]); + } + + [Theory] + [MemberData(nameof(ClockData))] + public void WhenScheduledInLastWheelDelayIsUpdated(long clock) + { + timerWheel.time = clock; + + Duration delay = Duration.FromMinutes(60 * 24 * 14); + + timerWheel.Schedule(new TimeOrderNode(1, new DisposeTracker()) { TimeToExpire = new Duration(clock) + delay }); + + timerWheel.GetExpirationDelay().raw.Should().BeLessThanOrEqualTo(delay.raw); + } + + [Theory] + [MemberData(nameof(ClockData))] + public void WhenScheduledInDifferentWheelsDelayIsCorrect(long clock) + { + var clockD = new Duration(clock); + timerWheel.time = clock; + + Duration t15 = clockD + Duration.FromSeconds(15); + Duration t80 = clockD + Duration.FromSeconds(80); + + timerWheel.Schedule(AddNode(1, new DisposeTracker(), t15)); // wheel 0 + timerWheel.Schedule(AddNode(2, new DisposeTracker(), t80)); // wheel 1 + + Duration t45 = clockD + Duration.FromSeconds(45); // discard T15, T80 in wheel[1] + timerWheel.Advance(ref cache, t45); + + lfuNodeList.Count.Should().Be(1); // verify discarded + + Duration t95 = clockD + Duration.FromSeconds(95); + timerWheel.Schedule(AddNode(3, new DisposeTracker(), t95)); // wheel 0 + + Duration expectedDelay = (t80 - t45); + var delay = timerWheel.GetExpirationDelay(); + delay.raw.Should().BeLessThan(expectedDelay.raw + TimerWheel.Spans[0]); + } + + [Fact] + public void WhenScheduledThenDescheduledNodeIsRemoved() + { + var node = AddNode(1, new DisposeTracker(), Duration.SinceEpoch()); + + timerWheel.Schedule(node); + wheelEnumerator.PositionOf(1).Should().NotBe(WheelPosition.None); + + TimerWheel.Deschedule(node); + wheelEnumerator.PositionOf(1).Should().Be(WheelPosition.None); + node.GetNextInTimeOrder().Should().BeNull(); + node.GetPreviousInTimeOrder().Should().BeNull(); + } + + [Fact] + public void WhenRescheduledLaterNodeIsMoved() + { + var time = Duration.SinceEpoch(); + var node = AddNode(1, new DisposeTracker(), time); + + timerWheel.Schedule(node); + var initial = wheelEnumerator.PositionOf(1); + + node.TimeToExpire = time + Duration.FromMinutes(60 * 30); + timerWheel.Reschedule(node); + wheelEnumerator.PositionOf(1).Should().BeGreaterThan(initial); + } + + [Fact] + public void WhenDetachedRescheduleIsNoOp() + { + var time = Duration.SinceEpoch(); + var node = AddNode(1, new DisposeTracker(), time); + + timerWheel.Reschedule(node); + wheelEnumerator.PositionOf(1).Should().Be(WheelPosition.None); + } + + private TimeOrderNode AddNode(int key, IDisposable value, Duration timeToExpire) + { + var node = new TimeOrderNode(key, value) { TimeToExpire = timeToExpire }; + this.lfuNodeList.AddLast(node); + return node; + } + + public static IEnumerable ClockData => + new List + { + new object[] { long.MinValue }, + new object[] { -TimerWheel.Spans[0] + 1 }, + new object[] { 0L }, + new object[] { 0xfffffffc0000000L }, + new object[] { long.MaxValue - TimerWheel.Spans[0] + 1 }, + new object[] { long.MaxValue }, + }; + + public static IEnumerable ScheduleData = CreateSchedule(); + + private static IEnumerable CreateSchedule() + { + var schedule = new List(); + + foreach (var clock in ClockData) + { + schedule.Add(new object[] { clock.First(), Duration.FromSeconds(10), 0 }); + schedule.Add(new object[] { clock.First(), Duration.FromMinutes(3), 2 }); + schedule.Add(new object[] { clock.First(), Duration.FromMinutes(10), 3 }); + } + + return schedule; + } + } + + public class DisposeTracker : IDisposable + { + public bool Expired { get; set; } + + public void Dispose() + { + Expired = true; + } + } + + public class DisposeThrows : IDisposable + { + public void Dispose() + { + throw new InvalidOperationException(); + } + } + + internal class WheelEnumerator : IEnumerable>> + where K : notnull + { + private readonly TimerWheel timerWheel; + private readonly ITestOutputHelper testOutputHelper; + + public WheelEnumerator(TimerWheel timerWheel, ITestOutputHelper testOutputHelper) + { + this.timerWheel = timerWheel; + this.testOutputHelper = testOutputHelper; + } + + public void Dump(string tag = null) + { + this.testOutputHelper.WriteLine(tag); + int count = 0; + + foreach (KeyValuePair> kvp in this) + { + this.testOutputHelper.WriteLine($"[{kvp.Key.wheel},{kvp.Key.bucket}] {kvp.Value.Key}"); + count++; + } + + if (count == 0) + { + this.testOutputHelper.WriteLine("empty"); + } + } + + public WheelPosition PositionOf(K key) + { + var v = this.Where(kvp => EqualityComparer.Default.Equals(kvp.Value.Key, key)); + + if (v.Any()) + { + return v.First().Key; + } + + return WheelPosition.None; + } + + IEnumerator IEnumerable.GetEnumerator() + { + return ((WheelEnumerator)this).GetEnumerator(); + } + + public IEnumerator>> GetEnumerator() + { + for (int w = 0; w < timerWheel.wheels.Length; w++) + { + var wheel = timerWheel.wheels[w]; + + for (int b = 0; b < wheel.Length; b++) + { + var sentinel = wheel[b]; + var node = sentinel.GetNextInTimeOrder(); + + while (node != sentinel) + { + yield return new KeyValuePair>(new WheelPosition(w, b), node); + node = node.GetNextInTimeOrder(); + } + } + } + } + } + + internal struct WheelPosition : IComparable + { + public readonly int wheel; + public readonly int bucket; + + public static readonly WheelPosition None = new(-1, -1); + + public WheelPosition(int wheel, int bucket) + { + this.wheel = wheel; + this.bucket = bucket; + } + + public static bool operator >(WheelPosition a, WheelPosition b) => a.wheel > b.wheel | (a.wheel == b.wheel && a.bucket > b.bucket); + public static bool operator <(WheelPosition a, WheelPosition b) => a.wheel < b.wheel | (a.wheel == b.wheel && a.bucket < b.bucket); + + public int CompareTo(WheelPosition that) + { + if (this.wheel == that.wheel) + { + return this.bucket.CompareTo(that.bucket); + } + + return this.wheel.CompareTo(that.wheel); + } + } +} diff --git a/BitFaster.Caching.UnitTests/Timed.cs b/BitFaster.Caching.UnitTests/Timed.cs index 41ea9bc1..dca8916c 100644 --- a/BitFaster.Caching.UnitTests/Timed.cs +++ b/BitFaster.Caching.UnitTests/Timed.cs @@ -35,7 +35,11 @@ public static void Execute(TArg arg, Func first, Tim } Thread.Sleep(200); - attempts++.Should().BeLessThan(128, "Unable to run test within verification margin"); + + if (attempts++ > 128) + { + throw new Exception("Unable to run test within verification margin"); + } } } diff --git a/BitFaster.Caching/Duration.cs b/BitFaster.Caching/Duration.cs index aa7d21ab..47551b0d 100644 --- a/BitFaster.Caching/Duration.cs +++ b/BitFaster.Caching/Duration.cs @@ -23,6 +23,8 @@ public readonly struct Duration // this also avoids overflow when multipling long.MaxValue by 1.0 internal static readonly TimeSpan MaxRepresentable = TimeSpan.FromTicks((long)(long.MaxValue / 100.0d)); + internal static readonly Duration Zero = new Duration(0); + internal Duration(long raw) { this.raw = raw; @@ -116,5 +118,21 @@ public static Duration FromMinutes(double value) /// The subtrahend. /// An duration whose value is the result of the value of a minus the value of b. public static Duration operator -(Duration a, Duration b) => new Duration(a.raw - b.raw); + + /// + /// Returns a value that indicates whether a specified Duration is greater than another specified Duration. + /// + /// The first duration to compare. + /// The second duration to compare. + /// true if the value of a is greater than the value of b; otherwise, false. + public static bool operator >(Duration a, Duration b) => a.raw > b.raw; + + /// + /// Returns a value that indicates whether a specified Duration is less than another specified Duration. + /// + /// The first duration to compare. + /// The second duration to compare. + /// true if the value of a is less than the value of b; otherwise, false. + public static bool operator <(Duration a, Duration b) => a.raw < b.raw; } } diff --git a/BitFaster.Caching/ExpireAfterAccess.cs b/BitFaster.Caching/ExpireAfterAccess.cs new file mode 100644 index 00000000..f77b39a4 --- /dev/null +++ b/BitFaster.Caching/ExpireAfterAccess.cs @@ -0,0 +1,31 @@ +using System; + +namespace BitFaster.Caching +{ + internal sealed class ExpireAfterAccess : IExpiryCalculator + { + private readonly Duration timeToExpire; + + public TimeSpan TimeToExpire => timeToExpire.ToTimeSpan(); + + public ExpireAfterAccess(TimeSpan timeToExpire) + { + this.timeToExpire = Duration.FromTimeSpan(timeToExpire); + } + + public Duration GetExpireAfterCreate(K key, V value) + { + return timeToExpire; + } + + public Duration GetExpireAfterRead(K key, V value, Duration current) + { + return timeToExpire; + } + + public Duration GetExpireAfterUpdate(K key, V value, Duration current) + { + return timeToExpire; + } + } +} diff --git a/BitFaster.Caching/ExpireAfterWrite.cs b/BitFaster.Caching/ExpireAfterWrite.cs new file mode 100644 index 00000000..9f10793f --- /dev/null +++ b/BitFaster.Caching/ExpireAfterWrite.cs @@ -0,0 +1,31 @@ +using System; + +namespace BitFaster.Caching +{ + internal sealed class ExpireAfterWrite : IExpiryCalculator + { + private readonly Duration timeToExpire; + + public TimeSpan TimeToExpire => timeToExpire.ToTimeSpan(); + + public ExpireAfterWrite(TimeSpan timeToExpire) + { + this.timeToExpire = Duration.FromTimeSpan(timeToExpire); + } + + public Duration GetExpireAfterCreate(K key, V value) + { + return timeToExpire; + } + + public Duration GetExpireAfterRead(K key, V value, Duration current) + { + return current; + } + + public Duration GetExpireAfterUpdate(K key, V value, Duration current) + { + return timeToExpire; + } + } +} diff --git a/BitFaster.Caching/Lfu/ConcurrentLfu.cs b/BitFaster.Caching/Lfu/ConcurrentLfu.cs index a3910201..9ff0fe05 100644 --- a/BitFaster.Caching/Lfu/ConcurrentLfu.cs +++ b/BitFaster.Caching/Lfu/ConcurrentLfu.cs @@ -1,249 +1,249 @@ -using System; -using System.Collections; -using System.Collections.Generic; -using System.Diagnostics; -using System.Diagnostics.CodeAnalysis; -using System.Threading.Tasks; -using BitFaster.Caching.Buffers; -using BitFaster.Caching.Lru; -using BitFaster.Caching.Scheduler; - -namespace BitFaster.Caching.Lfu -{ - /// - /// An approximate LFU based on the W-TinyLfu eviction policy. W-TinyLfu tracks items using a window LRU list, and - /// a main space LRU divided into protected and probation segments. Reads and writes to the cache are stored in buffers - /// and later applied to the policy LRU lists in batches under a lock. Each read and write is tracked using a compact - /// popularity sketch to probalistically estimate item frequency. Items proceed through the LRU lists as follows: - /// - /// New items are added to the window LRU. When acessed window items move to the window MRU position. - /// When the window is full, candidate items are moved to the probation segment in LRU order. - /// When the main space is full, the access frequency of each window candidate is compared - /// to probation victims in LRU order. The item with the lowest frequency is evicted until the cache size is within bounds. - /// When a probation item is accessed, it is moved to the protected segment. If the protected segment is full, - /// the LRU protected item is demoted to probation. - /// When a protected item is accessed, it is moved to the protected MRU position. - /// - /// The size of the admission window and main space are adapted over time to iteratively improve hit rate using a - /// hill climbing algorithm. A larger window favors workloads with high recency bias, whereas a larger main space - /// favors workloads with frequency bias. - /// - /// Based on the Caffeine library by ben.manes@gmail.com (Ben Manes). - /// https://github.com/ben-manes/caffeine - [DebuggerTypeProxy(typeof(ConcurrentLfu<,>.LfuDebugView<>))] - [DebuggerDisplay("Count = {Count}/{Capacity}")] - public sealed class ConcurrentLfu : ICacheExt, IAsyncCache, IBoundedPolicy - where K : notnull - { - // Note: for performance reasons this is a mutable struct, it cannot be readonly. - private ConcurrentLfuCore, AccessOrderPolicy> core; - - /// - /// The default buffer size. - /// - public const int DefaultBufferSize = 128; - - /// - /// Initializes a new instance of the ConcurrentLfu class with the specified capacity. - /// - /// The capacity. - public ConcurrentLfu(int capacity) - { - this.core = new(Defaults.ConcurrencyLevel, capacity, new ThreadPoolScheduler(), EqualityComparer.Default, () => this.DrainBuffers()); - } - - /// - /// Initializes a new instance of the ConcurrentLfu class with the specified concurrencyLevel, capacity, scheduler, equality comparer and buffer size. - /// - /// The concurrency level. - /// The capacity. - /// The scheduler. - /// The equality comparer. - public ConcurrentLfu(int concurrencyLevel, int capacity, IScheduler scheduler, IEqualityComparer comparer) - { - this.core = new(concurrencyLevel, capacity, scheduler, comparer, () => this.DrainBuffers()); - } - - internal ConcurrentLfuCore, AccessOrderPolicy> Core => core; - - // structs cannot declare self referencing lambda functions, therefore pass this in from the ctor - private void DrainBuffers() - { - this.core.DrainBuffers(); - } - - /// - public int Count => core.Count; - - /// - public Optional Metrics => core.Metrics; - - /// - public Optional> Events => Optional>.None(); - - /// - public CachePolicy Policy => core.Policy; - - /// - public ICollection Keys => core.Keys; - - /// - public int Capacity => core.Capacity; - - /// - public IScheduler Scheduler => core.Scheduler; - - /// - /// Synchronously perform all pending policy maintenance. Drain the read and write buffers then - /// use the eviction policy to preserve bounded size and remove expired items. - /// - /// - /// Note: maintenance is automatically performed asynchronously immediately following a read or write. - /// It is not necessary to call this method, is provided purely to enable tests to reach a consistent state. - /// - public void DoMaintenance() - { - core.DoMaintenance(); - } - - /// - public void AddOrUpdate(K key, V value) - { - core.AddOrUpdate(key, value); - } - - /// - public void Clear() - { - core.Clear(); - } - - /// - public V GetOrAdd(K key, Func valueFactory) - { - return core.GetOrAdd(key, valueFactory); - } - - /// - public V GetOrAdd(K key, Func valueFactory, TArg factoryArgument) - { - return core.GetOrAdd(key, valueFactory, factoryArgument); - } - - /// - public ValueTask GetOrAddAsync(K key, Func> valueFactory) - { - return core.GetOrAddAsync(key, valueFactory); - } - - /// - public ValueTask GetOrAddAsync(K key, Func> valueFactory, TArg factoryArgument) - { - return core.GetOrAddAsync(key, valueFactory, factoryArgument); - } - - /// - public void Trim(int itemCount) - { - core.Trim(itemCount); - } - - /// - public bool TryGet(K key, [MaybeNullWhen(false)] out V value) - { - return core.TryGet(key, out value); - } - - /// - public bool TryRemove(K key) - { - return core.TryRemove(key); - } - - /// - /// Attempts to remove the specified key value pair. - /// - /// The item to remove. - /// true if the item was removed successfully; otherwise, false. - public bool TryRemove(KeyValuePair item) - { - return core.TryRemove(item); - } - - /// - /// Attempts to remove and return the value that has the specified key. - /// - /// The key of the element to remove. - /// When this method returns, contains the object removed, or the default value of the value type if key does not exist. - /// true if the object was removed successfully; otherwise, false. - public bool TryRemove(K key, [MaybeNullWhen(false)] out V value) - { - return core.TryRemove(key, out value); - } - - /// - public bool TryUpdate(K key, V value) - { - return core.TryUpdate(key, value); - } - - /// - public IEnumerator> GetEnumerator() - { - return core.GetEnumerator(); - } - - /// - IEnumerator IEnumerable.GetEnumerator() - { - return core.GetEnumerator(); - } - -#if DEBUG - /// - /// Format the LFU as a string by converting all the keys to strings. - /// - /// The LFU formatted as a string. - public string FormatLfuString() - { - return core.FormatLfuString(); - } -#endif - - [ExcludeFromCodeCoverage] - internal class LfuDebugView - where N : LfuNode - { - private readonly ConcurrentLfu lfu; - - public LfuDebugView(ConcurrentLfu lfu) - { - this.lfu = lfu; - } - - public string Maintenance => lfu.core.drainStatus.Format(); - - public ICacheMetrics? Metrics => lfu.Metrics.Value; - - public StripedMpscBuffer ReadBuffer => (this.lfu.core.readBuffer as StripedMpscBuffer)!; - - public MpscBoundedBuffer WriteBuffer => (this.lfu.core.writeBuffer as MpscBoundedBuffer)!; - - public KeyValuePair[] Items - { - get - { - var items = new KeyValuePair[lfu.Count]; - - int index = 0; - foreach (var kvp in lfu) - { - items[index++] = kvp; - } - return items; - } - } - } - } - -} +using System; +using System.Collections; +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Threading.Tasks; +using BitFaster.Caching.Buffers; +using BitFaster.Caching.Lru; +using BitFaster.Caching.Scheduler; + +namespace BitFaster.Caching.Lfu +{ + /// + /// An approximate LFU based on the W-TinyLfu eviction policy. W-TinyLfu tracks items using a window LRU list, and + /// a main space LRU divided into protected and probation segments. Reads and writes to the cache are stored in buffers + /// and later applied to the policy LRU lists in batches under a lock. Each read and write is tracked using a compact + /// popularity sketch to probalistically estimate item frequency. Items proceed through the LRU lists as follows: + /// + /// New items are added to the window LRU. When acessed window items move to the window MRU position. + /// When the window is full, candidate items are moved to the probation segment in LRU order. + /// When the main space is full, the access frequency of each window candidate is compared + /// to probation victims in LRU order. The item with the lowest frequency is evicted until the cache size is within bounds. + /// When a probation item is accessed, it is moved to the protected segment. If the protected segment is full, + /// the LRU protected item is demoted to probation. + /// When a protected item is accessed, it is moved to the protected MRU position. + /// + /// The size of the admission window and main space are adapted over time to iteratively improve hit rate using a + /// hill climbing algorithm. A larger window favors workloads with high recency bias, whereas a larger main space + /// favors workloads with frequency bias. + /// + /// Based on the Caffeine library by ben.manes@gmail.com (Ben Manes). + /// https://github.com/ben-manes/caffeine + [DebuggerTypeProxy(typeof(ConcurrentLfu<,>.LfuDebugView<>))] + [DebuggerDisplay("Count = {Count}/{Capacity}")] + public sealed class ConcurrentLfu : ICacheExt, IAsyncCache, IBoundedPolicy + where K : notnull + { + // Note: for performance reasons this is a mutable struct, it cannot be readonly. + private ConcurrentLfuCore, AccessOrderPolicy> core; + + /// + /// The default buffer size. + /// + public const int DefaultBufferSize = 128; + + /// + /// Initializes a new instance of the ConcurrentLfu class with the specified capacity. + /// + /// The capacity. + public ConcurrentLfu(int capacity) + { + this.core = new(Defaults.ConcurrencyLevel, capacity, new ThreadPoolScheduler(), EqualityComparer.Default, () => this.DrainBuffers(), default); + } + + /// + /// Initializes a new instance of the ConcurrentLfu class with the specified concurrencyLevel, capacity, scheduler, equality comparer and buffer size. + /// + /// The concurrency level. + /// The capacity. + /// The scheduler. + /// The equality comparer. + public ConcurrentLfu(int concurrencyLevel, int capacity, IScheduler scheduler, IEqualityComparer comparer) + { + this.core = new(concurrencyLevel, capacity, scheduler, comparer, () => this.DrainBuffers(), default); + } + + internal ConcurrentLfuCore, AccessOrderPolicy> Core => core; + + // structs cannot declare self referencing lambda functions, therefore pass this in from the ctor + private void DrainBuffers() + { + this.core.DrainBuffers(); + } + + /// + public int Count => core.Count; + + /// + public Optional Metrics => core.Metrics; + + /// + public Optional> Events => Optional>.None(); + + /// + public CachePolicy Policy => core.Policy; + + /// + public ICollection Keys => core.Keys; + + /// + public int Capacity => core.Capacity; + + /// + public IScheduler Scheduler => core.Scheduler; + + /// + /// Synchronously perform all pending policy maintenance. Drain the read and write buffers then + /// use the eviction policy to preserve bounded size and remove expired items. + /// + /// + /// Note: maintenance is automatically performed asynchronously immediately following a read or write. + /// It is not necessary to call this method, is provided purely to enable tests to reach a consistent state. + /// + public void DoMaintenance() + { + core.DoMaintenance(); + } + + /// + public void AddOrUpdate(K key, V value) + { + core.AddOrUpdate(key, value); + } + + /// + public void Clear() + { + core.Clear(); + } + + /// + public V GetOrAdd(K key, Func valueFactory) + { + return core.GetOrAdd(key, valueFactory); + } + + /// + public V GetOrAdd(K key, Func valueFactory, TArg factoryArgument) + { + return core.GetOrAdd(key, valueFactory, factoryArgument); + } + + /// + public ValueTask GetOrAddAsync(K key, Func> valueFactory) + { + return core.GetOrAddAsync(key, valueFactory); + } + + /// + public ValueTask GetOrAddAsync(K key, Func> valueFactory, TArg factoryArgument) + { + return core.GetOrAddAsync(key, valueFactory, factoryArgument); + } + + /// + public void Trim(int itemCount) + { + core.Trim(itemCount); + } + + /// + public bool TryGet(K key, [MaybeNullWhen(false)] out V value) + { + return core.TryGet(key, out value); + } + + /// + public bool TryRemove(K key) + { + return core.TryRemove(key); + } + + /// + /// Attempts to remove the specified key value pair. + /// + /// The item to remove. + /// true if the item was removed successfully; otherwise, false. + public bool TryRemove(KeyValuePair item) + { + return core.TryRemove(item); + } + + /// + /// Attempts to remove and return the value that has the specified key. + /// + /// The key of the element to remove. + /// When this method returns, contains the object removed, or the default value of the value type if key does not exist. + /// true if the object was removed successfully; otherwise, false. + public bool TryRemove(K key, [MaybeNullWhen(false)] out V value) + { + return core.TryRemove(key, out value); + } + + /// + public bool TryUpdate(K key, V value) + { + return core.TryUpdate(key, value); + } + + /// + public IEnumerator> GetEnumerator() + { + return core.GetEnumerator(); + } + + /// + IEnumerator IEnumerable.GetEnumerator() + { + return core.GetEnumerator(); + } + +#if DEBUG + /// + /// Format the LFU as a string by converting all the keys to strings. + /// + /// The LFU formatted as a string. + public string FormatLfuString() + { + return core.FormatLfuString(); + } +#endif + + [ExcludeFromCodeCoverage] + internal class LfuDebugView + where N : LfuNode + { + private readonly ConcurrentLfu lfu; + + public LfuDebugView(ConcurrentLfu lfu) + { + this.lfu = lfu; + } + + public string Maintenance => lfu.core.drainStatus.Format(); + + public ICacheMetrics? Metrics => lfu.Metrics.Value; + + public StripedMpscBuffer ReadBuffer => (this.lfu.core.readBuffer as StripedMpscBuffer)!; + + public MpscBoundedBuffer WriteBuffer => (this.lfu.core.writeBuffer as MpscBoundedBuffer)!; + + public KeyValuePair[] Items + { + get + { + var items = new KeyValuePair[lfu.Count]; + + int index = 0; + foreach (var kvp in lfu) + { + items[index++] = kvp; + } + return items; + } + } + } + } + +} diff --git a/BitFaster.Caching/Lfu/ConcurrentLfuCore.cs b/BitFaster.Caching/Lfu/ConcurrentLfuCore.cs index a6679d11..0b56b09d 100644 --- a/BitFaster.Caching/Lfu/ConcurrentLfuCore.cs +++ b/BitFaster.Caching/Lfu/ConcurrentLfuCore.cs @@ -71,7 +71,9 @@ internal struct ConcurrentLfuCore : IBoundedPolicy private readonly N[] drainBuffer; - public ConcurrentLfuCore(int concurrencyLevel, int capacity, IScheduler scheduler, IEqualityComparer comparer, Action drainBuffers) + internal P policy; + + public ConcurrentLfuCore(int concurrencyLevel, int capacity, IScheduler scheduler, IEqualityComparer comparer, Action drainBuffers, P policy) { if (capacity < 3) Throw.ArgOutOfRange(nameof(capacity)); @@ -99,6 +101,8 @@ public ConcurrentLfuCore(int concurrencyLevel, int capacity, IScheduler schedule this.drainBuffer = new N[this.readBuffer.Capacity]; this.drainBuffers = drainBuffers; + + this.policy = policy; } // No lock count: https://arbel.net/2013/02/03/best-practices-for-using-concurrentdictionary/ @@ -123,7 +127,7 @@ public void AddOrUpdate(K key, V value) return; } - var node = default(P).Create(key, value); + var node = policy.Create(key, value); if (this.dictionary.TryAdd(key, node)) { AfterWrite(node); @@ -174,7 +178,7 @@ public void Trim(int itemCount) private bool TryAdd(K key, V value) { - var node = default(P).Create(key, value); + var node = policy.Create(key, value); if (this.dictionary.TryAdd(key, node)) { @@ -254,24 +258,58 @@ public async ValueTask GetOrAddAsync(K key, Func> valu } } - public bool TryGet(K key, [MaybeNullWhen(false)] out V value) + public bool TryGet(K key, [MaybeNullWhen(false)] out V value) + { + return TryGetImpl(key, out value); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private bool TryGetImpl(K key, [MaybeNullWhen(false)] out V value) { if (this.dictionary.TryGetValue(key, out var node)) { - bool delayable = this.readBuffer.TryAdd(node) != BufferStatus.Full; - - if (this.drainStatus.ShouldDrain(delayable)) - { - TryScheduleDrain(); + if (!policy.IsExpired(node)) + { + bool delayable = this.readBuffer.TryAdd(node) != BufferStatus.Full; + + if (this.drainStatus.ShouldDrain(delayable)) + { + TryScheduleDrain(); + } + value = node.Value; + return true; + } + else + { + // expired case, immediately remove from the dictionary + TryRemove(node); } - value = node.Value; - return true; } this.metrics.requestMissCount.Increment(); value = default; return false; + } + + internal bool TryGetNode(K key, [MaybeNullWhen(false)] out N node) + { + return this.dictionary.TryGetValue(key, out node); + } + + [MethodImpl(MethodImplOptions.NoInlining)] + void TryRemove(N node) + { +#if NET6_0_OR_GREATER + if (this.dictionary.TryRemove(new KeyValuePair(node.Key, node))) +#else + // https://devblogs.microsoft.com/pfxteam/little-known-gems-atomic-conditional-removals-from-concurrentdictionary/ + if (((ICollection>)this.dictionary).Remove(new KeyValuePair(node.Key, node))) +#endif + { + node.WasRemoved = true; + AfterWrite(node); + } } public bool TryRemove(KeyValuePair item) @@ -487,6 +525,8 @@ private bool Maintenance(N? droppedWrite = null) { this.drainStatus.VolatileWrite(DrainStatus.ProcessingToIdle); + policy.AdvanceTime(); + // Note: this is only Span on .NET Core 3.1+, else this is no-op and it is still an array var buffer = this.drainBuffer.AsSpanOrArray(); @@ -519,6 +559,7 @@ private bool Maintenance(N? droppedWrite = null) done = true; } + policy.ExpireEntries(ref this); EvictEntries(); this.capacity.OptimizePartitioning(this.metrics, this.cmSketch.ResetSampleSize); ReFitProtected(); @@ -536,7 +577,7 @@ private bool Maintenance(N? droppedWrite = null) return done; } - private void OnAccess(LfuNode node) + private void OnAccess(N node) { // there was a cache hit even if the item was removed or is not yet added. this.metrics.requestHitCount++; @@ -559,9 +600,11 @@ private void OnAccess(LfuNode node) this.protectedLru.MoveToEnd(node); break; } + + policy.OnRead(node); } - private void OnWrite(LfuNode node) + private void OnWrite(N node) { // Nodes can be removed while they are in the write buffer, in which case they should // not be added back into the LRU. @@ -606,6 +649,8 @@ private void OnWrite(LfuNode node) this.metrics.updatedCount++; break; } + + policy.OnWrite(node); } private void PromoteProbation(LfuNode node) @@ -759,7 +804,7 @@ private bool AdmitCandidate(K candidateKey, K victimKey) return candidateFreq > victimFreq; } - private void Evict(LfuNode evictee) + internal void Evict(LfuNode evictee) { evictee.WasRemoved = true; evictee.WasDeleted = true; @@ -772,9 +817,11 @@ private void Evict(LfuNode evictee) #else ((ICollection>)this.dictionary).Remove(kvp); #endif - evictee.list.Remove(evictee); + evictee.list?.Remove(evictee); Disposer.Dispose(evictee.Value); this.metrics.evictedCount++; + + this.policy.OnEvict((N)evictee); } private void ReFitProtected() diff --git a/BitFaster.Caching/Lfu/ConcurrentTLfu.cs b/BitFaster.Caching/Lfu/ConcurrentTLfu.cs new file mode 100644 index 00000000..409f81a1 --- /dev/null +++ b/BitFaster.Caching/Lfu/ConcurrentTLfu.cs @@ -0,0 +1,193 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Threading.Tasks; +using BitFaster.Caching.Lru; +using BitFaster.Caching.Scheduler; + +namespace BitFaster.Caching.Lfu +{ + // LFU with time-based expiry policy. Provided as a wrapper around ConcurrentLfuCore to hide generic item and policy. + internal sealed class ConcurrentTLfu : ICache, IAsyncCache, IBoundedPolicy, ITimePolicy, IDiscreteTimePolicy + where K : notnull + { + // Note: for performance reasons this is a mutable struct, it cannot be readonly. + private ConcurrentLfuCore, ExpireAfterPolicy> core; + + public ConcurrentTLfu(int capacity, IExpiryCalculator expiryCalculator) + { + this.core = new(Defaults.ConcurrencyLevel, capacity, new ThreadPoolScheduler(), EqualityComparer.Default, () => this.DrainBuffers(), new(expiryCalculator)); + } + + public ConcurrentTLfu(int concurrencyLevel, int capacity, IScheduler scheduler, IEqualityComparer comparer, IExpiryCalculator expiryCalculator) + { + this.core = new(concurrencyLevel, capacity, scheduler, comparer, () => this.DrainBuffers(), new(expiryCalculator)); + } + + // structs cannot declare self referencing lambda functions, therefore pass this in from the ctor + private void DrainBuffers() + { + this.core.DrainBuffers(); + } + + /// + public int Count => core.Count; + + /// + public Optional Metrics => core.Metrics; + + /// + public Optional> Events => Optional>.None(); + + /// + public CachePolicy Policy => CreatePolicy(); + + /// + public ICollection Keys => core.Keys; + + /// + public int Capacity => core.Capacity; + + /// + public IScheduler Scheduler => core.Scheduler; + + public void DoMaintenance() + { + core.DoMaintenance(); + } + + /// + public void AddOrUpdate(K key, V value) + { + core.AddOrUpdate(key, value); + } + + /// + public void Clear() + { + core.Clear(); + } + + /// + public V GetOrAdd(K key, Func valueFactory) + { + return core.GetOrAdd(key, valueFactory); + } + + /// + public V GetOrAdd(K key, Func valueFactory, TArg factoryArgument) + { + return core.GetOrAdd(key, valueFactory, factoryArgument); + } + + /// + public ValueTask GetOrAddAsync(K key, Func> valueFactory) + { + return core.GetOrAddAsync(key, valueFactory); + } + + /// + public ValueTask GetOrAddAsync(K key, Func> valueFactory, TArg factoryArgument) + { + return core.GetOrAddAsync(key, valueFactory, factoryArgument); + } + + /// + public void Trim(int itemCount) + { + core.Trim(itemCount); + } + + /// + public bool TryGet(K key, [MaybeNullWhen(false)] out V value) + { + return core.TryGet(key, out value); + } + + /// + public bool TryRemove(K key) + { + return core.TryRemove(key); + } + + public bool TryRemove(KeyValuePair item) + { + return core.TryRemove(item); + } + + public bool TryRemove(K key, [MaybeNullWhen(false)] out V value) + { + return core.TryRemove(key, out value); + } + + /// + public bool TryUpdate(K key, V value) + { + return core.TryUpdate(key, value); + } + + /// + public IEnumerator> GetEnumerator() + { + return core.GetEnumerator(); + } + + /// + IEnumerator IEnumerable.GetEnumerator() + { + return core.GetEnumerator(); + } + + private CachePolicy CreatePolicy() + { + var afterWrite = Optional.None(); + var afterAccess = Optional.None(); + var afterCustom = Optional.None(); + + var calc = core.policy.ExpiryCalculator; + + switch (calc) + { + case ExpireAfterAccess: + afterAccess = new Optional(this); + break; + case ExpireAfterWrite: + afterWrite = new Optional(this); + break; + default: + afterCustom = new Optional(this); + break; + }; + + return new CachePolicy(new Optional(this), afterWrite, afterAccess, afterCustom); + } + + TimeSpan ITimePolicy.TimeToLive => (this.core.policy.ExpiryCalculator) switch + { + ExpireAfterAccess aa => aa.TimeToExpire, + ExpireAfterWrite aw => aw.TimeToExpire, + _ => TimeSpan.Zero, + }; + + /// + public bool TryGetTimeToExpire(K1 key, out TimeSpan timeToExpire) + { + if (key is K k && core.TryGetNode(k, out TimeOrderNode? node)) + { + var tte = new Duration(node.GetTimestamp()) - Duration.SinceEpoch(); + timeToExpire = tte.ToTimeSpan(); + return true; + } + + timeToExpire = default; + return false; + } + + /// + public void TrimExpired() + { + DoMaintenance(); + } + } +} diff --git a/BitFaster.Caching/Lfu/LfuNode.cs b/BitFaster.Caching/Lfu/LfuNode.cs index fbe8b271..ed80f61b 100644 --- a/BitFaster.Caching/Lfu/LfuNode.cs +++ b/BitFaster.Caching/Lfu/LfuNode.cs @@ -1,74 +1,125 @@ -#nullable disable -namespace BitFaster.Caching.Lfu -{ - internal class LfuNode - { - internal LfuNodeList list; - internal LfuNode next; - internal LfuNode prev; - - private volatile bool wasRemoved; - private volatile bool wasDeleted; - - public LfuNode(K k, V v) - { - this.Key = k; - this.Value = v; - } - - public readonly K Key; - - public V Value { get; set; } - - public Position Position { get; set; } - - /// - /// Node was removed from the dictionary, but is still present in the LRU lists. - /// - public bool WasRemoved - { - get => this.wasRemoved; - set => this.wasRemoved = value; - } - - /// - /// Node has been removed both from the dictionary and the LRU lists. - /// - public bool WasDeleted - { - get => this.wasDeleted; - set => this.wasDeleted = value; - } - - public LfuNode Next - { - get { return next == null || next == list.head ? null : next; } - } - - public LfuNode Previous - { - get { return prev == null || this == list.head ? null : prev; } - } - - internal void Invalidate() - { - list = null; - next = null; - prev = null; - } - } - - internal enum Position - { - Window, - Probation, - Protected, - } - - internal sealed class AccessOrderNode : LfuNode - { - public AccessOrderNode(K k, V v) : base(k, v) - { - } - } -} +#nullable disable +namespace BitFaster.Caching.Lfu +{ + internal class LfuNode + { + internal LfuNodeList list; + internal LfuNode next; + internal LfuNode prev; + + private volatile bool wasRemoved; + private volatile bool wasDeleted; + + public LfuNode(K k, V v) + { + this.Key = k; + this.Value = v; + } + + public readonly K Key; + + public V Value { get; set; } + + public Position Position { get; set; } + + /// + /// Node was removed from the dictionary, but is still present in the LRU lists. + /// + public bool WasRemoved + { + get => this.wasRemoved; + set => this.wasRemoved = value; + } + + /// + /// Node has been removed both from the dictionary and the LRU lists. + /// + public bool WasDeleted + { + get => this.wasDeleted; + set => this.wasDeleted = value; + } + + public LfuNode Next + { + get { return next == null || next == list.head ? null : next; } + } + + public LfuNode Previous + { + get { return prev == null || this == list.head ? null : prev; } + } + + internal void Invalidate() + { + list = null; + next = null; + prev = null; + } + } + + internal enum Position + { + Window, + Probation, + Protected, + } + + internal sealed class AccessOrderNode : LfuNode + { + public AccessOrderNode(K k, V v) : base(k, v) + { + } + } + + internal sealed class TimeOrderNode : LfuNode + where K : notnull + { + TimeOrderNode prevTime; + TimeOrderNode nextTime; + + private Duration timeToExpire; + + public TimeOrderNode(K k, V v) : base(k, v) + { + } + + public Duration TimeToExpire + { + get => timeToExpire; + set => timeToExpire = value; + } + + public static TimeOrderNode CreateSentinel() + { + var s = new TimeOrderNode(default, default); + s.nextTime = s.prevTime = s; + return s; + } + + public TimeOrderNode GetPreviousInTimeOrder() + { + return prevTime; + } + + public long GetTimestamp() + { + return timeToExpire.raw; + } + + public void SetPreviousInTimeOrder(TimeOrderNode prev) + { + this.prevTime = prev; + } + + public TimeOrderNode GetNextInTimeOrder() + { + return nextTime; + } + + public void SetNextInTimeOrder(TimeOrderNode next) + { + this.nextTime = next; + } + } +} diff --git a/BitFaster.Caching/Lfu/NodePolicy.cs b/BitFaster.Caching/Lfu/NodePolicy.cs index 5e3a21f8..f2e89261 100644 --- a/BitFaster.Caching/Lfu/NodePolicy.cs +++ b/BitFaster.Caching/Lfu/NodePolicy.cs @@ -1,22 +1,134 @@ using System; using System.Collections.Generic; using System.Runtime.CompilerServices; -using System.Text; namespace BitFaster.Caching.Lfu { internal interface INodePolicy + where K : notnull where N : LfuNode { N Create(K key, V value); + bool IsExpired(N node); + void AdvanceTime(); + void OnRead(N node); + void OnWrite(N node); + void OnEvict(N node); + void ExpireEntries

(ref ConcurrentLfuCore cache) where P : struct, INodePolicy; } internal struct AccessOrderPolicy : INodePolicy> + where K : notnull { [MethodImpl(MethodImplOptions.AggressiveInlining)] public AccessOrderNode Create(K key, V value) { return new AccessOrderNode(key, value); } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool IsExpired(AccessOrderNode node) + { + return false; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void AdvanceTime() + { + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void OnRead(AccessOrderNode node) + { + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void OnWrite(AccessOrderNode node) + { + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void OnEvict(AccessOrderNode node) + { + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void ExpireEntries

(ref ConcurrentLfuCore, P> cache) where P : struct, INodePolicy> + { + } + } + + internal struct ExpireAfterPolicy : INodePolicy> + where K : notnull + { + private readonly IExpiryCalculator expiryCalculator; + private readonly TimerWheel wheel; + private Duration current; + + public ExpireAfterPolicy(IExpiryCalculator expiryCalculator) + { + this.wheel = new TimerWheel(); + this.expiryCalculator = expiryCalculator; + this.current = Duration.SinceEpoch(); + this.wheel.time = current.raw; + } + + public IExpiryCalculator ExpiryCalculator => expiryCalculator; + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public TimeOrderNode Create(K key, V value) + { + var expiry = expiryCalculator.GetExpireAfterCreate(key, value); + return new TimeOrderNode(key, value) { TimeToExpire = Duration.SinceEpoch() + expiry }; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool IsExpired(TimeOrderNode node) + { + return node.TimeToExpire < Duration.SinceEpoch(); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void AdvanceTime() + { + current = Duration.SinceEpoch(); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void OnRead(TimeOrderNode node) + { + var currentExpiry = node.TimeToExpire - current; + node.TimeToExpire = current + expiryCalculator.GetExpireAfterRead(node.Key, node.Value, currentExpiry); + wheel.Reschedule(node); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void OnWrite(TimeOrderNode node) + { + // if the node is not yet scheduled, it is being created + // the time is set on create in case it is read before the buffer is processed + if (node.GetNextInTimeOrder() == null) + { + wheel.Schedule(node); + } + else + { + var currentExpiry = node.TimeToExpire - current; + node.TimeToExpire = current + expiryCalculator.GetExpireAfterUpdate(node.Key, node.Value, currentExpiry); + wheel.Reschedule(node); + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void OnEvict(TimeOrderNode node) + { + TimerWheel.Deschedule(node); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void ExpireEntries

(ref ConcurrentLfuCore, P> cache) where P : struct, INodePolicy> + { + wheel.Advance(ref cache, current); + } } } diff --git a/BitFaster.Caching/Lfu/TimerWheel.cs b/BitFaster.Caching/Lfu/TimerWheel.cs new file mode 100644 index 00000000..6f09fa72 --- /dev/null +++ b/BitFaster.Caching/Lfu/TimerWheel.cs @@ -0,0 +1,287 @@ +using System; + +namespace BitFaster.Caching.Lfu +{ + // Port TimerWheel from Caffeine + // https://github.com/ben-manes/caffeine/blob/73d5011f9db373fc20a6e12d1f194f0d7a967d69/caffeine/src/main/java/com/github/benmanes/caffeine/cache/TimerWheel.java#L36 + // This is separate to avoid in memory dupes due to generics + internal static class TimerWheel + { + internal static readonly int[] Buckets = { 64, 64, 32, 4, 1 }; + + internal static readonly long[] Spans = { + BitOps.CeilingPowerOfTwo(Duration.FromSeconds(1).raw), // 1.07s + BitOps.CeilingPowerOfTwo(Duration.FromMinutes(1).raw), // 1.14m + BitOps.CeilingPowerOfTwo(Duration.FromMinutes(60).raw), // 1.22h + BitOps.CeilingPowerOfTwo(Duration.FromMinutes(60*24).raw), // 1.63d + Buckets[3] * BitOps.CeilingPowerOfTwo(Duration.FromMinutes(60*24*6).raw), // 6.5d + Buckets[3] * BitOps.CeilingPowerOfTwo(Duration.FromMinutes(60*24*6).raw), // 6.5d + }; + + internal static readonly int[] Shift = { + BitOps.TrailingZeroCount(Spans[0]), + BitOps.TrailingZeroCount(Spans[1]), + BitOps.TrailingZeroCount(Spans[2]), + BitOps.TrailingZeroCount(Spans[3]), + BitOps.TrailingZeroCount(Spans[4]), + }; + } + + // A hierarchical timer wheel to add, remove, and fire expiration events in amortized O(1) time. The + // expiration events are deferred until the timer is advanced, which is performed as part of the + // cache's maintenance cycle. + // + // This is a direct port of TimerWheel from Java's Caffeine. + // @author ben.manes@gmail.com (Ben Manes) + // https://github.com/ben-manes/caffeine/blob/master/caffeine/src/main/java/com/github/benmanes/caffeine/cache/TimerWheel.java + internal sealed class TimerWheel + where K : notnull + { + internal readonly TimeOrderNode[][] wheels; + + internal long time; + + public TimerWheel() + { + wheels = new TimeOrderNode[TimerWheel.Buckets.Length][]; + + for (int i = 0; i < wheels.Length; i++) + { + wheels[i] = new TimeOrderNode[TimerWheel.Buckets[i]]; + + for (int j = 0; j < wheels[i].Length; j++) + { + wheels[i][j] = TimeOrderNode< K, V>.CreateSentinel(); + } + } + } + + ///

+ /// Advances the timer and evicts entries that have expired. + /// + /// + /// + public void Advance(ref ConcurrentLfuCore cache, Duration currentTime) + where N : LfuNode + where P : struct, INodePolicy + { + long previousTime = time; + time = currentTime.raw; + + // If wrapping then temporarily shift the clock for a positive comparison. We assume that the + // advancements never exceed a total running time of long.MaxValue nanoseconds (292 years) + // so that an overflow only occurs due to using an arbitrary origin time. + if ((previousTime < 0) && (currentTime > Duration.Zero)) + { + previousTime += long.MaxValue; + currentTime += new Duration(long.MaxValue); + } + + try + { + for (int i = 0; i < TimerWheel.Shift.Length; i++) + { + long previousTicks = (long)(((ulong)previousTime) >> TimerWheel.Shift[i]); + long currentTicks = (long)(((ulong)currentTime.raw) >> TimerWheel.Shift[i]); + long delta = (currentTicks - previousTicks); + + if (delta <= 0L) + { + break; + } + + Expire(ref cache, i, previousTicks, delta); + } + } + catch (Exception) + { + time = previousTime; + throw; + } + } + + // Expires entries or reschedules into the proper bucket if still active. + private void Expire(ref ConcurrentLfuCore cache, int index, long previousTicks, long delta) + where N : LfuNode + where P : struct, INodePolicy + { + TimeOrderNode[] timerWheel = wheels[index]; + int mask = timerWheel.Length - 1; + + // We assume that the delta does not overflow an integer and cause negative steps. This can + // occur only if the advancement exceeds 2^61 nanoseconds (73 years). + int steps = Math.Min(1 + (int)delta, timerWheel.Length); + int start = (int)(previousTicks & mask); + int end = start + steps; + + for (int i = start; i < end; i++) + { + TimeOrderNode sentinel = timerWheel[i & mask]; + TimeOrderNode prev = sentinel.GetPreviousInTimeOrder(); + TimeOrderNode node = sentinel.GetNextInTimeOrder(); + sentinel.SetPreviousInTimeOrder(sentinel); + sentinel.SetNextInTimeOrder(sentinel); + + while (node != sentinel) + { + TimeOrderNode next = node.GetNextInTimeOrder(); + node.SetPreviousInTimeOrder(null); + node.SetNextInTimeOrder(null); + + try + { + if ((node.GetTimestamp() - time) < 0) + { + cache.Evict(node); + } + else + { + Schedule(node); + } + node = next; + } + catch (Exception) + { + node.SetPreviousInTimeOrder(sentinel.GetPreviousInTimeOrder()); + node.SetNextInTimeOrder(next); + sentinel.GetPreviousInTimeOrder().SetNextInTimeOrder(node); + sentinel.SetPreviousInTimeOrder(prev); + throw; + } + } + } + } + + /// + /// Schedules a timer event for the node. + /// + /// + public void Schedule(TimeOrderNode node) + { + TimeOrderNode sentinel = FindBucket(node.GetTimestamp()); + Link(sentinel, node); + } + + /// + /// Reschedules an active timer event for the node. + /// + /// + public void Reschedule(TimeOrderNode node) + { + if (node.GetNextInTimeOrder() != null) + { + Unlink(node); + Schedule(node); + } + } + + /// + /// Removes a timer event for this entry if present. + /// + /// + public static void Deschedule(TimeOrderNode node) + { + Unlink(node); + node.SetNextInTimeOrder(null); + node.SetPreviousInTimeOrder(null); + } + + // Determines the bucket that the timer event should be added to. + private TimeOrderNode FindBucket(long time) + { + long duration = time - this.time; + int length = wheels.Length - 1; + + for (int i = 0; i < length; i++) + { + if (duration < TimerWheel.Spans[i + 1]) + { + long ticks = (long)((ulong)time >> TimerWheel.Shift[i]); + int index = (int)(ticks & (wheels[i].Length - 1)); + + return wheels[i][index]; + } + } + + return wheels[length][0]; + } + + // Adds the entry at the tail of the bucket's list. + private static void Link(TimeOrderNode sentinel, TimeOrderNode node) + { + node.SetPreviousInTimeOrder(sentinel.GetPreviousInTimeOrder()); + node.SetNextInTimeOrder(sentinel); + + sentinel.GetPreviousInTimeOrder().SetNextInTimeOrder(node); + sentinel.SetPreviousInTimeOrder(node); + } + + // Removes the entry from its bucket, if scheduled. + private static void Unlink(TimeOrderNode node) + { + TimeOrderNode next = node.GetNextInTimeOrder(); + + if (next != null) + { + TimeOrderNode prev = node.GetPreviousInTimeOrder(); + next.SetPreviousInTimeOrder(prev); + prev.SetNextInTimeOrder(next); + } + } + + // Returns the duration until the next bucket expires, or long.MaxValue if none. + public Duration GetExpirationDelay() + { + for (int i = 0; i < TimerWheel.Shift.Length; i++) + { + TimeOrderNode[] timerWheel = wheels[i]; + long ticks = (long)((ulong)time >> TimerWheel.Shift[i]); + + long spanMask = TimerWheel.Spans[i] - 1; + int start = (int)(ticks & spanMask); + int end = start + timerWheel.Length; + int mask = timerWheel.Length - 1; + + for (int j = start; j < end; j++) + { + TimeOrderNode sentinel = timerWheel[(j & mask)]; + TimeOrderNode next = sentinel.GetNextInTimeOrder(); + + if (next == sentinel) + { + continue; + } + + long buckets = (j - start); + long delay = (buckets << TimerWheel.Shift[i]) - (time & spanMask); + delay = (delay > 0) ? delay : TimerWheel.Spans[i]; + + for (int k = i + 1; k < TimerWheel.Shift.Length; k++) + { + long nextDelay = PeekAhead(k); + delay = Math.Min(delay, nextDelay); + } + + return new Duration(delay); + } + } + + return new Duration(long.MaxValue); + } + + // Returns the duration when the wheel's next bucket expires, or long.MaxValue if empty. + private long PeekAhead(int index) + { + long ticks = (long)((ulong)time >> TimerWheel.Shift[index]); + TimeOrderNode[] timerWheel = wheels[index]; + + long spanMask = TimerWheel.Spans[index] - 1; + int mask = timerWheel.Length - 1; + int probe = (int)((ticks + 1) & mask); + TimeOrderNode sentinel = timerWheel[probe]; + TimeOrderNode next = sentinel.GetNextInTimeOrder(); + + return (next == sentinel) ? long.MaxValue: (TimerWheel.Spans[index] - (time & spanMask)); + } + } +}