Skip to content

Commit

Permalink
count
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Peck committed Oct 14, 2023
1 parent f3c7a6d commit af30b98
Showing 1 changed file with 46 additions and 38 deletions.
84 changes: 46 additions & 38 deletions BitFaster.Caching.UnitTests/Buffers/MpscBoundedBufferSoakTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public class MpscBoundedBufferSoakTests
private readonly ITestOutputHelper testOutputHelper;
private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(30);

private readonly MpscBoundedBuffer<string> buffer = new MpscBoundedBuffer<string>(1024);

public MpscBoundedBufferSoakTests(ITestOutputHelper testOutputHelper)
{
this.testOutputHelper = testOutputHelper;
Expand All @@ -22,8 +24,6 @@ public MpscBoundedBufferSoakTests(ITestOutputHelper testOutputHelper)
[Fact]
public async Task WhenAddIsContendedBufferCanBeFilled()
{
var buffer = new MpscBoundedBuffer<string>(1024);

await Threaded.Run(4, () =>
{
while (buffer.TryAdd("hello") != BufferStatus.Full)
Expand All @@ -39,25 +39,7 @@ public async Task WhileBufferIsFilledItemsCanBeTaken()
{
this.testOutputHelper.WriteLine($"ProcessorCount={Environment.ProcessorCount}.");

var buffer = new MpscBoundedBuffer<string>(1024);

var fill = Threaded.Run(4, () =>
{
var spin = new SpinWait();
int count = 0;
while (count < 256)
{
while (true)
{
if (buffer.TryAdd("hello") == BufferStatus.Success)
{
break;
}
spin.SpinOnce();
}
count++;
}
});
var fill = CreateParallelFill(buffer, threads: 4, itemsPerThread: 256);

var take = Task.Run(() =>
{
Expand All @@ -83,13 +65,53 @@ public async Task WhileBufferIsFilledBufferCanBeDrained()
{
this.testOutputHelper.WriteLine($"ProcessorCount={Environment.ProcessorCount}.");

var buffer = new MpscBoundedBuffer<string>(1024);
var fill = CreateParallelFill(buffer, threads: 4, itemsPerThread: 256);

var fill = Threaded.Run(4, () =>
var drain = Task.Run(() =>
{
int drained = 0;
var drainBuffer = new ArraySegment<string>(new string[1024]);

while (drained < 1024)
{
drained += buffer.DrainTo(drainBuffer);
}
});

await fill.TimeoutAfter(Timeout, "fill timed out");
await drain.TimeoutAfter(Timeout, "drain timed out");
}

[Fact]
public async Task WhileBufferIsFilledCountCanBeTaken()
{
this.testOutputHelper.WriteLine($"ProcessorCount={Environment.ProcessorCount}.");

var fill = CreateParallelFill(buffer, threads:4, itemsPerThread:256);

var count = Task.Run(() =>
{
int count = 0;

while (!fill.IsCompleted)
{
int newcount = buffer.Count;
newcount.Should().BeGreaterThanOrEqualTo(count);
count = newcount;
}
});

await fill.TimeoutAfter(Timeout, "fill timed out");
await count.TimeoutAfter(Timeout, "count timed out");
}

private Task CreateParallelFill(MpscBoundedBuffer<string> buffer, int threads, int itemsPerThread)
{
return Threaded.Run(threads, () =>
{
var spin = new SpinWait();
int count = 0;
while (count < 256)
while (count < itemsPerThread)
{
while (true)
{
Expand All @@ -102,20 +124,6 @@ public async Task WhileBufferIsFilledBufferCanBeDrained()
count++;
}
});

var drain = Task.Run(() =>
{
int drained = 0;
var drainBuffer = new ArraySegment<string>(new string[1024]);

while (drained < 1024)
{
drained += buffer.DrainTo(drainBuffer);
}
});

await fill.TimeoutAfter(Timeout, "fill timed out");
await drain.TimeoutAfter(Timeout, "drain timed out");
}
}
}

0 comments on commit af30b98

Please sign in to comment.