From d126231a92543b4a40b5ee75f0e5f5e534cbbee3 Mon Sep 17 00:00:00 2001 From: stidsborg Date: Sat, 11 Nov 2023 10:06:58 +0100 Subject: [PATCH] Implemented Callback reactive operator --- .../ReactiveTests/InnerOperatorsTests.cs | 39 +++++++++++++++++++ .../Reactive/Extensions/InnerOperators.cs | 9 +++++ 2 files changed, 48 insertions(+) diff --git a/Core/Cleipnir.ResilientFunctions.Tests/ReactiveTests/InnerOperatorsTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/ReactiveTests/InnerOperatorsTests.cs index 12d31b83..053a423d 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/ReactiveTests/InnerOperatorsTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/ReactiveTests/InnerOperatorsTests.cs @@ -383,4 +383,43 @@ public async Task DistinctBySuccessfullyFiltersOutDuplicates() } #endregion + + #region WithCallback + + [TestMethod] + public void CallbackOperatorIsInvokedOnEachEmit() + { + var source = new Source(NoOpTimeoutProvider.Instance); + source.SignalNext("hello"); + source.SignalNext("world"); + + var emittedSoFar = new List(); + var task = source + .OfType() + .Take(4) + .Callback(s => emittedSoFar.Add(s)) + .Completion(); + + emittedSoFar.Count.ShouldBe(2); + emittedSoFar[0].ShouldBe("hello"); + emittedSoFar[1].ShouldBe("world"); + + task.IsCompleted.ShouldBeFalse(); + + source.SignalNext("and"); + source.SignalNext("universe"); + + emittedSoFar.Count.ShouldBe(4); + emittedSoFar[2].ShouldBe("and"); + emittedSoFar[3].ShouldBe("universe"); + + task.IsCompletedSuccessfully.ShouldBeTrue(); + + source.SignalNext("and"); + source.SignalNext("multiverse"); + + emittedSoFar.Count.ShouldBe(4); + } + + #endregion } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Reactive/Extensions/InnerOperators.cs b/Core/Cleipnir.ResilientFunctions/Reactive/Extensions/InnerOperators.cs index b15c731d..1a659cf0 100644 --- a/Core/Cleipnir.ResilientFunctions/Reactive/Extensions/InnerOperators.cs +++ b/Core/Cleipnir.ResilientFunctions/Reactive/Extensions/InnerOperators.cs @@ -167,4 +167,13 @@ public static IReactiveChain DistinctBy(this IReactiveChain s, Fu notify(next); }; }); + + public static IReactiveChain Callback(this IReactiveChain s, Action action) + => s.WithOperator( + (next, notify, _, _) => + { + action(next); + notify(next); + } + ); } \ No newline at end of file