diff --git a/BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuSoakTests.cs b/BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuSoakTests.cs index b78fc982..c1f1122f 100644 --- a/BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuSoakTests.cs +++ b/BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuSoakTests.cs @@ -1,6 +1,8 @@ using System; using System.Collections.Generic; +using System.Reflection; using System.Threading.Tasks; +using BitFaster.Caching.Buffers; using BitFaster.Caching.Lfu; using BitFaster.Caching.Scheduler; using FluentAssertions; @@ -12,10 +14,101 @@ namespace BitFaster.Caching.UnitTests.Lfu [Collection("Soak")] public class ConcurrentLfuSoakTests { + private const int iterations = 10; private readonly ITestOutputHelper output; public ConcurrentLfuSoakTests(ITestOutputHelper testOutputHelper) { this.output = testOutputHelper; + } + + [Theory] + [Repeat(iterations)] + public async Task WhenConcurrentGetCacheEndsInConsistentState(int iteration) + { + var scheduler = new BackgroundThreadScheduler(); + var lfu = new ConcurrentLfuBuilder().WithCapacity(9).WithScheduler(scheduler).Build() as ConcurrentLfu; + + await Threaded.Run(4, () => { + for (int i = 0; i < 100000; i++) + { + lfu.GetOrAdd(i + 1, i => i.ToString()); + } + }); + + this.output.WriteLine($"iteration {iteration} keys={string.Join(" ", lfu.Keys)}"); + + scheduler.Dispose(); + await scheduler.Completion; + + RunIntegrityCheck(lfu); + } + + [Theory] + [Repeat(iterations)] + public async Task WhenConcurrentGetAsyncCacheEndsInConsistentState(int iteration) + { + var scheduler = new BackgroundThreadScheduler(); + var lfu = new ConcurrentLfuBuilder().WithCapacity(9).WithScheduler(scheduler).Build() as ConcurrentLfu; + + await Threaded.RunAsync(4, async () => { + for (int i = 0; i < 100000; i++) + { + await lfu.GetOrAddAsync(i + 1, i => Task.FromResult(i.ToString())); + } + }); + + this.output.WriteLine($"iteration {iteration} keys={string.Join(" ", lfu.Keys)}"); + + scheduler.Dispose(); + await scheduler.Completion; + + RunIntegrityCheck(lfu); + } + + [Theory] + [Repeat(iterations)] + public async Task WhenConcurrentGetWithArgCacheEndsInConsistentState(int iteration) + { + var scheduler = new BackgroundThreadScheduler(); + var lfu = new ConcurrentLfuBuilder().WithCapacity(9).WithScheduler(scheduler).Build() as ConcurrentLfu; + + await Threaded.Run(4, () => { + for (int i = 0; i < 100000; i++) + { + // use the arg overload + lfu.GetOrAdd(i + 1, (i, s) => i.ToString(), "Foo"); + } + }); + + this.output.WriteLine($"iteration {iteration} keys={string.Join(" ", lfu.Keys)}"); + + scheduler.Dispose(); + await scheduler.Completion; + + RunIntegrityCheck(lfu); + } + + [Theory] + [Repeat(iterations)] + public async Task WhenConcurrentGetAsyncWithArgCacheEndsInConsistentState(int iteration) + { + var scheduler = new BackgroundThreadScheduler(); + var lfu = new ConcurrentLfuBuilder().WithCapacity(9).WithScheduler(scheduler).Build() as ConcurrentLfu; + + await Threaded.RunAsync(4, async () => { + for (int i = 0; i < 100000; i++) + { + // use the arg overload + await lfu.GetOrAddAsync(i + 1, (i, s) => Task.FromResult(i.ToString()), "Foo"); + } + }); + + this.output.WriteLine($"iteration {iteration} keys={string.Join(" ", lfu.Keys)}"); + + scheduler.Dispose(); + await scheduler.Completion; + + RunIntegrityCheck(lfu); } [Fact] @@ -45,6 +138,105 @@ await Threaded.Run(threads, i => this.output.WriteLine($"Maintenance ops {cache.Scheduler.RunCount}"); cache.Metrics.Value.Misses.Should().Be(iterations * threads); + RunIntegrityCheck(cache); + } + + private void RunIntegrityCheck(ConcurrentLfu cache) + { + new ConcurrentLfuIntegrityChecker(cache).Validate(); + } + } + + public class ConcurrentLfuIntegrityChecker + { + private readonly ConcurrentLfu cache; + + private readonly LfuNodeList windowLru; + private readonly LfuNodeList probationLru; + private readonly LfuNodeList protectedLru; + + private readonly StripedMpscBuffer> readBuffer; + private readonly MpscBoundedBuffer> writeBuffer; + + private static FieldInfo windowLruField = typeof(ConcurrentLfu).GetField("windowLru", BindingFlags.NonPublic | BindingFlags.Instance); + private static FieldInfo probationLruField = typeof(ConcurrentLfu).GetField("probationLru", BindingFlags.NonPublic | BindingFlags.Instance); + private static FieldInfo protectedLruField = typeof(ConcurrentLfu).GetField("protectedLru", BindingFlags.NonPublic | BindingFlags.Instance); + + private static FieldInfo readBufferField = typeof(ConcurrentLfu).GetField("readBuffer", BindingFlags.NonPublic | BindingFlags.Instance); + private static FieldInfo writeBufferField = typeof(ConcurrentLfu).GetField("writeBuffer", BindingFlags.NonPublic | BindingFlags.Instance); + + public ConcurrentLfuIntegrityChecker(ConcurrentLfu cache) + { + this.cache = cache; + + // get lrus via reflection + this.windowLru = (LfuNodeList)windowLruField.GetValue(cache); + this.probationLru = (LfuNodeList)probationLruField.GetValue(cache); + this.protectedLru = (LfuNodeList)protectedLruField.GetValue(cache); + + this.readBuffer = (StripedMpscBuffer>)readBufferField.GetValue(cache); + this.writeBuffer = (MpscBoundedBuffer>)writeBufferField.GetValue(cache); + } + + public void Validate() + { + cache.DoMaintenance(); + + // buffers should be empty after maintenance + this.readBuffer.Count.Should().Be(0); + this.writeBuffer.Count.Should().Be(0); + + // all the items in the LRUs must exist in the dictionary. + // no items should be marked as removed after maintenance has run + VerifyLruInDictionary(this.windowLru); + VerifyLruInDictionary(this.probationLru); + VerifyLruInDictionary(this.protectedLru); + + // all the items in the dictionary must exist in the node list + VerifyDictionaryInLrus(); + + // cache must be within capacity + cache.Count.Should().BeLessThanOrEqualTo(cache.Capacity, "capacity out of valid range"); + } + + private void VerifyLruInDictionary(LfuNodeList lfuNodes) + { + var node = lfuNodes.First; + + while (node != null) + { + node.WasRemoved.Should().BeFalse(); + node.WasDeleted.Should().BeFalse(); + cache.TryGet(node.Key, out _).Should().BeTrue(); + + node = node.Next; + } + } + + private void VerifyDictionaryInLrus() + { + foreach (var kvp in this.cache) + { + var exists = Exists(kvp, this.windowLru) || Exists(kvp, this.probationLru) || Exists(kvp, this.protectedLru); + exists.Should().BeTrue($"key {kvp.Key} should exist in LRU lists"); + } + } + + private static bool Exists(KeyValuePair kvp, LfuNodeList lfuNodes) + { + var node = lfuNodes.First; + + while (node != null) + { + if (EqualityComparer.Default.Equals(node.Key, kvp.Key)) + { + return true; + } + + node = node.Next; + } + + return false; } } } diff --git a/BitFaster.Caching.UnitTests/Lru/ConcurrentLruTests.cs b/BitFaster.Caching.UnitTests/Lru/ConcurrentLruTests.cs index 0edcf3e1..ab2bdbc1 100644 --- a/BitFaster.Caching.UnitTests/Lru/ConcurrentLruTests.cs +++ b/BitFaster.Caching.UnitTests/Lru/ConcurrentLruTests.cs @@ -1131,8 +1131,6 @@ private void Warmup() lru.GetOrAdd(-8, valueFactory.Create); lru.GetOrAdd(-9, valueFactory.Create); } - - } public class ConcurrentLruIntegrityChecker