Skip to content

Commit

Permalink
Implemented Callback reactive operator
Browse files Browse the repository at this point in the history
  • Loading branch information
stidsborg committed Nov 11, 2023
1 parent 0cd1a9f commit d126231
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -383,4 +383,43 @@ public async Task DistinctBySuccessfullyFiltersOutDuplicates()
}

#endregion

#region WithCallback

[TestMethod]
public void CallbackOperatorIsInvokedOnEachEmit()
{
var source = new Source(NoOpTimeoutProvider.Instance);
source.SignalNext("hello");
source.SignalNext("world");

var emittedSoFar = new List<string>();
var task = source
.OfType<string>()
.Take(4)
.Callback(s => emittedSoFar.Add(s))
.Completion();

emittedSoFar.Count.ShouldBe(2);
emittedSoFar[0].ShouldBe("hello");
emittedSoFar[1].ShouldBe("world");

task.IsCompleted.ShouldBeFalse();

source.SignalNext("and");
source.SignalNext("universe");

emittedSoFar.Count.ShouldBe(4);
emittedSoFar[2].ShouldBe("and");
emittedSoFar[3].ShouldBe("universe");

task.IsCompletedSuccessfully.ShouldBeTrue();

source.SignalNext("and");
source.SignalNext("multiverse");

emittedSoFar.Count.ShouldBe(4);
}

#endregion
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,13 @@ public static IReactiveChain<T> DistinctBy<T, TKey>(this IReactiveChain<T> s, Fu
notify(next);
};
});

public static IReactiveChain<T> Callback<T>(this IReactiveChain<T> s, Action<T> action)
=> s.WithOperator<T, T>(
(next, notify, _, _) =>
{
action(next);
notify(next);
}
);
}

0 comments on commit d126231

Please sign in to comment.