Skip to content

Commit

Permalink
Remove overwrite parameter from timeout registration method
Browse files Browse the repository at this point in the history
  • Loading branch information
stidsborg committed Jul 31, 2024
1 parent 8af93bd commit 82ebf11
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.Helpers;
using Cleipnir.ResilientFunctions.Storage;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace Cleipnir.ResilientFunctions.Tests.InMemoryTests.RFunctionTests;
Expand All @@ -11,4 +9,8 @@ public class TimeoutTests : Cleipnir.ResilientFunctions.Tests.TestTemplates.RFun
[TestMethod]
public override Task ExpiredTimeoutIsAddedToMessages()
=> ExpiredTimeoutIsAddedToMessages(FunctionStoreFactory.Create());

[TestMethod]
public override Task ExpiredTimeoutMakesReactiveChainThrowTimeoutException()
=> ExpiredTimeoutMakesReactiveChainThrowTimeoutException(FunctionStoreFactory.Create());
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ namespace Cleipnir.ResilientFunctions.Tests.ReactiveTests;
public class NoOpTimeoutProvider : ITimeoutProvider
{
public static NoOpTimeoutProvider Instance { get; } = new();
public Task RegisterTimeout(string timeoutId, DateTime expiresAt, bool overwrite = false)
public Task RegisterTimeout(string timeoutId, DateTime expiresAt)
=> Task.CompletedTask;

public Task RegisterTimeout(string timeoutId, TimeSpan expiresIn, bool overwrite = false)
public Task RegisterTimeout(string timeoutId, TimeSpan expiresIn)
=> Task.CompletedTask;

public Task CancelTimeout(string timeoutId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ namespace Cleipnir.ResilientFunctions.Tests.ReactiveTests;

internal static class ReactiveTestExtensions
{
public static IReactiveChain<object> TakeUntilTimeout(this Source s, string timeoutEventId, TimeSpan expiresIn, bool overwrite = false)
=> new TimeoutOperator<object>(s, timeoutEventId, DateTime.UtcNow.Add(expiresIn), overwrite);
public static IReactiveChain<object> TakeUntilTimeout(this Source s, string timeoutEventId, TimeSpan expiresIn)
=> new TimeoutOperator<object>(s, timeoutEventId, DateTime.UtcNow.Add(expiresIn));

public static IReactiveChain<object> TakeUntilTimeout(this Source s, string timeoutEventId, DateTime expiresAt, bool overwrite = false)
=> new TimeoutOperator<object>(s, timeoutEventId, expiresAt, overwrite);
public static IReactiveChain<object> TakeUntilTimeout(this Source s, string timeoutEventId, DateTime expiresAt)
=> new TimeoutOperator<object>(s, timeoutEventId, expiresAt);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using Cleipnir.ResilientFunctions.Domain.Events;
using Cleipnir.ResilientFunctions.Helpers;
using Cleipnir.ResilientFunctions.Reactive.Extensions;
using Cleipnir.ResilientFunctions.Reactive.Origin;
using Cleipnir.ResilientFunctions.Reactive.Utilities;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Shouldly;
Expand Down Expand Up @@ -120,15 +119,15 @@ public List<Tuple<string, DateTime>> Registrations
private readonly object _sync = new();
private readonly List<Tuple<string, DateTime>> _registrations = new();

public Task RegisterTimeout(string timeoutId, DateTime expiresAt, bool overwrite = false)
public Task RegisterTimeout(string timeoutId, DateTime expiresAt)
{
lock (_sync)
_registrations.Add(Tuple.Create(timeoutId, expiresAt));

return Task.CompletedTask;
}

public Task RegisterTimeout(string timeoutId, TimeSpan expiresIn, bool overwrite = false)
public Task RegisterTimeout(string timeoutId, TimeSpan expiresIn)
=> Task.FromException(new Exception("Stub-method invocation"));

public Task CancelTimeout(string timeoutId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Cleipnir.ResilientFunctions.Domain;
using Cleipnir.ResilientFunctions.Domain.Events;
using Cleipnir.ResilientFunctions.Reactive.Extensions;
using Cleipnir.ResilientFunctions.Reactive.Utilities;
using Cleipnir.ResilientFunctions.Storage;
using Cleipnir.ResilientFunctions.Tests.Utils;
using Shouldly;
Expand All @@ -13,7 +14,6 @@ namespace Cleipnir.ResilientFunctions.Tests.TestTemplates.RFunctionTests;
public abstract class TimeoutTests
{
public abstract Task ExpiredTimeoutIsAddedToMessages();

protected async Task ExpiredTimeoutIsAddedToMessages(Task<IFunctionStore> storeTask)
{
var store = await storeTask;
Expand All @@ -28,7 +28,7 @@ protected async Task ExpiredTimeoutIsAddedToMessages(Task<IFunctionStore> storeT
messagesDefaultMaxWaitForCompletion: TimeSpan.MaxValue
)
);
var rFunc = functionsRegistry.RegisterAction(
var rAction = functionsRegistry.RegisterAction(
flowType,
inner: async Task (string _, Workflow workflow) =>
{
Expand All @@ -41,7 +41,40 @@ protected async Task ExpiredTimeoutIsAddedToMessages(Task<IFunctionStore> storeT
}
).Invoke;

await rFunc.Invoke("instanceId", "hello world");
await rAction.Invoke("instanceId", "hello world");
unhandledExceptionHandler.ThrownExceptions.Count.ShouldBe(0);
}

public abstract Task ExpiredTimeoutMakesReactiveChainThrowTimeoutException();
protected async Task ExpiredTimeoutMakesReactiveChainThrowTimeoutException(Task<IFunctionStore> storeTask)
{
var store = await storeTask;
var flowId = TestFlowId.Create();
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
using var functionsRegistry = new FunctionsRegistry
(
store,
new Settings(
unhandledExceptionHandler.Catch,
watchdogCheckFrequency: TimeSpan.FromMilliseconds(500),
messagesDefaultMaxWaitForCompletion: TimeSpan.MaxValue
)
);
var rAction = functionsRegistry.RegisterAction(
flowId.Type,
inner: async Task (string _, Workflow workflow) =>
{
var messages = workflow.Messages;
await messages
.TakeUntilTimeout("TimeoutId", expiresIn: TimeSpan.FromMilliseconds(500))
.FirstOfType<string>();
}
).Invoke;

await Should.ThrowAsync<NoResultException>(
() => rAction.Invoke("instanceId", "hello world")
);

unhandledExceptionHandler.ThrownExceptions.Count.ShouldBe(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ protected async Task TimeoutIsNotRegisteredAgainWhenProviderAlreadyContainsTimeo
pendingTimeouts.Count.ShouldBe(1);
pendingTimeouts.Single().TimeoutId.ShouldBe("timeoutId1");

await timeoutProvider.RegisterTimeout("timeoutId1", expiresIn: TimeSpan.FromHours(1), overwrite: false);
await timeoutProvider.RegisterTimeout("timeoutId1", expiresIn: TimeSpan.FromHours(1));
upsertCount.ShouldBe(1);
}

Expand Down
17 changes: 10 additions & 7 deletions Core/Cleipnir.ResilientFunctions/CoreRuntime/TimeoutProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ namespace Cleipnir.ResilientFunctions.CoreRuntime;

public interface ITimeoutProvider
{
Task RegisterTimeout(string timeoutId, DateTime expiresAt, bool overwrite = false);
Task RegisterTimeout(string timeoutId, TimeSpan expiresIn, bool overwrite = false);
Task RegisterTimeout(string timeoutId, DateTime expiresAt);
Task RegisterTimeout(string timeoutId, TimeSpan expiresIn);
Task CancelTimeout(string timeoutId);
Task<IReadOnlyList<TimeoutEvent>> PendingTimeouts();
}
Expand Down Expand Up @@ -53,21 +53,24 @@ private async Task<Dictionary<string, TimeoutEvent>> GetRegisteredTimeouts()
return localTimeouts;
}

public async Task RegisterTimeout(string timeoutId, DateTime expiresAt, bool overwrite = false)
public async Task RegisterTimeout(string timeoutId, DateTime expiresAt)
{
var registeredTimeouts = await GetRegisteredTimeouts();
lock (_sync)
if (registeredTimeouts.ContainsKey(timeoutId) && !overwrite)
if (registeredTimeouts.ContainsKey(timeoutId))
return;
else
registeredTimeouts[timeoutId] = new TimeoutEvent(timeoutId, expiresAt.ToUniversalTime());

expiresAt = expiresAt.ToUniversalTime();
await _timeoutStore.UpsertTimeout(new StoredTimeout(_flowId, timeoutId, expiresAt.Ticks), overwrite);
await _timeoutStore.UpsertTimeout(
new StoredTimeout(_flowId, timeoutId, expiresAt.Ticks),
overwrite: true
);
}

public Task RegisterTimeout(string timeoutId, TimeSpan expiresIn, bool overwrite = false)
=> RegisterTimeout(timeoutId, expiresAt: DateTime.UtcNow.Add(expiresIn), overwrite);
public Task RegisterTimeout(string timeoutId, TimeSpan expiresIn)
=> RegisterTimeout(timeoutId, expiresAt: DateTime.UtcNow.Add(expiresIn));

public async Task CancelTimeout(string timeoutId)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ public static IReactiveChain<T> TakeUntil<T>(this IReactiveChain<T> s, Func<T, b
});
}

public static IReactiveChain<object> TakeUntilTimeout(this Messages s, string timeoutEventId, TimeSpan expiresIn, bool overwrite = false)
=> new TimeoutOperator<object>(s.Source, timeoutEventId, DateTime.UtcNow.Add(expiresIn), overwrite);
public static IReactiveChain<object> TakeUntilTimeout(this Messages s, string timeoutEventId, DateTime expiresAt, bool overwrite = false)
=> new TimeoutOperator<object>(s.Source, timeoutEventId, expiresAt, overwrite);
public static IReactiveChain<object> TakeUntilTimeout(this Messages s, string timeoutEventId, TimeSpan expiresIn)
=> new TimeoutOperator<object>(s.Source, timeoutEventId, DateTime.UtcNow.Add(expiresIn));
public static IReactiveChain<object> TakeUntilTimeout(this Messages s, string timeoutEventId, DateTime expiresAt)
=> new TimeoutOperator<object>(s.Source, timeoutEventId, expiresAt);

public static IReactiveChain<T> Skip<T>(this IReactiveChain<T> s, int toSkip)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@ public class TimeoutOperator<T> : IReactiveChain<T>
private readonly IReactiveChain<T> _inner;
private readonly string _timeoutId;
private readonly DateTime _expiresAt;
private readonly bool _overwriteExisting;

public TimeoutOperator(IReactiveChain<T> inner, string timeoutId, DateTime expiresAt, bool overwriteExisting)
public TimeoutOperator(IReactiveChain<T> inner, string timeoutId, DateTime expiresAt)
{
_inner = inner;
_timeoutId = timeoutId;
_expiresAt = expiresAt;
_overwriteExisting = overwriteExisting;
}

public ISubscription Subscribe(Action<T> onNext, Action onCompletion, Action<Exception> onError)
Expand All @@ -29,7 +27,6 @@ public ISubscription Subscribe(Action<T> onNext, Action onCompletion, Action<Exc
_inner,
_timeoutId,
_expiresAt,
_overwriteExisting,
onNext,
onCompletion,
onError
Expand All @@ -40,7 +37,6 @@ private class Subscription : ISubscription
{
private readonly string _timeoutId;
private readonly DateTime _expiresAt;
private readonly bool _overwriteExisting;

private readonly Action<T> _signalNext;
private readonly Action _signalCompletion;
Expand All @@ -54,12 +50,11 @@ private class Subscription : ISubscription

public Subscription(
IReactiveChain<T> inner,
string timeoutId, DateTime expiresAt, bool overwriteExisting,
string timeoutId, DateTime expiresAt,
Action<T> signalNext, Action signalCompletion, Action<Exception> signalError)
{
_timeoutId = timeoutId;
_expiresAt = expiresAt;
_overwriteExisting = overwriteExisting;
_signalNext = signalNext;
_signalCompletion = signalCompletion;
_signalError = signalError;
Expand All @@ -82,7 +77,7 @@ public async Task RegisterTimeoutIfNotInExistingEvents()

try
{
await _innerSubscription.TimeoutProvider.RegisterTimeout(_timeoutId, _expiresAt, _overwriteExisting);
await _innerSubscription.TimeoutProvider.RegisterTimeout(_timeoutId, _expiresAt);
}
catch (Exception exception)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ public class TimeoutTests : Cleipnir.ResilientFunctions.Tests.TestTemplates.RFun
[TestMethod]
public override Task ExpiredTimeoutIsAddedToMessages()
=> ExpiredTimeoutIsAddedToMessages(FunctionStoreFactory.Create());

[TestMethod]
public override Task ExpiredTimeoutMakesReactiveChainThrowTimeoutException()
=> ExpiredTimeoutMakesReactiveChainThrowTimeoutException(FunctionStoreFactory.Create());
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,8 @@ public class TimeoutTests : Cleipnir.ResilientFunctions.Tests.TestTemplates.RFun
[TestMethod]
public override Task ExpiredTimeoutIsAddedToMessages()
=> ExpiredTimeoutIsAddedToMessages(FunctionStoreFactory.Create());

[TestMethod]
public override Task ExpiredTimeoutMakesReactiveChainThrowTimeoutException()
=> ExpiredTimeoutMakesReactiveChainThrowTimeoutException(FunctionStoreFactory.Create());
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,8 @@ public class TimeoutTests : Cleipnir.ResilientFunctions.Tests.TestTemplates.RFun
[TestMethod]
public override Task ExpiredTimeoutIsAddedToMessages()
=> ExpiredTimeoutIsAddedToMessages(FunctionStoreFactory.Create());

[TestMethod]
public override Task ExpiredTimeoutMakesReactiveChainThrowTimeoutException()
=> ExpiredTimeoutMakesReactiveChainThrowTimeoutException(FunctionStoreFactory.Create());
}

0 comments on commit 82ebf11

Please sign in to comment.