Skip to content

Commit

Permalink
Fixed effect issue when changing timeouts from control panel
Browse files Browse the repository at this point in the history
  • Loading branch information
stidsborg committed Dec 29, 2024
1 parent 3600af4 commit ce44c17
Show file tree
Hide file tree
Showing 14 changed files with 186 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ public override Task ExpiredTimeoutMakesReactiveChainThrowTimeoutException()
public override Task RegisteredTimeoutIsCancelledAfterReactiveChainCompletes()
=> RegisteredTimeoutIsCancelledAfterReactiveChainCompletes(FunctionStoreFactory.Create());

[TestMethod]
public override Task PendingTimeoutCanBeRemovedFromControlPanel()
=> PendingTimeoutCanBeRemovedFromControlPanel(FunctionStoreFactory.Create());

[TestMethod]
public override Task PendingTimeoutCanBeUpdatedFromControlPanel()
=> PendingTimeoutCanBeUpdatedFromControlPanel(FunctionStoreFactory.Create());

[TestMethod]
public override Task ExpiredImplicitTimeoutsAreAddedToMessages()
=> ExpiredImplicitTimeoutsAreAddedToMessages(FunctionStoreFactory.Create());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1512,7 +1512,8 @@ protected async Task ExistingTimeoutCanBeUpdatedForAction(Task<IFunctionStore> s
flowType,
Task (string param, Workflow workflow) =>
workflow.Messages.RegisteredTimeouts.RegisterTimeout(
"someTimeoutId".ToEffectId(EffectType.System), expiresAt: new DateTime(2100, 1,1, 1,1,1, DateTimeKind.Utc)
"someTimeoutId",
expiresAt: new DateTime(2100, 1,1, 1,1,1, DateTimeKind.Utc)
)
);

Expand Down Expand Up @@ -1554,7 +1555,8 @@ protected async Task ExistingTimeoutCanBeUpdatedForFunc(Task<IFunctionStore> sto
async Task<string> (string param, Workflow workflow) =>
{
await workflow.Messages.RegisteredTimeouts.RegisterTimeout(
"someTimeoutId".ToEffectId(EffectType.System), expiresAt: new DateTime(2100, 1, 1, 1, 1, 1, DateTimeKind.Utc)
"someTimeoutId",
expiresAt: new DateTime(2100, 1, 1, 1, 1, 1, DateTimeKind.Utc)
);

return param;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,116 @@ await controlPanel
unhandledExceptionHandler.ThrownExceptions.Count.ShouldBe(0);
}

public abstract Task PendingTimeoutCanBeRemovedFromControlPanel();
protected async Task PendingTimeoutCanBeRemovedFromControlPanel(Task<IFunctionStore> storeTask)
{
var store = await storeTask;
var flowId = TestFlowId.Create();
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
using var functionsRegistry = new FunctionsRegistry
(
store,
new Settings(
unhandledExceptionHandler.Catch,
messagesDefaultMaxWaitForCompletion: TimeSpan.Zero
)
);
var registration = functionsRegistry.RegisterParamless(
flowId.Type,
inner: Task (workflow) =>
workflow
.Messages
.TakeUntilTimeout("TimeoutId4321", expiresIn: TimeSpan.FromMinutes(10))
.First()
);

await registration.Schedule("someInstanceId");

var controlPanel = await registration.ControlPanel("someInstanceId");
controlPanel.ShouldNotBeNull();
await controlPanel.BusyWaitUntil(cp => cp.Status == Status.Suspended);

var registeredTimeouts = await controlPanel.RegisteredTimeouts.All;
registeredTimeouts.Count.ShouldBe(1);
var registeredTimeout = registeredTimeouts.First();

var id = registeredTimeout.TimeoutId;
id.Id.ShouldBe("TimeoutId4321");
id.Type.ShouldBe(EffectType.Timeout);

var timeouts = (await store.TimeoutStore.GetTimeouts(controlPanel.StoredId)).ToList();
timeouts.Count.ShouldBe(1);
var timeout = timeouts.First();
timeout.TimeoutId.ShouldBe(id);

await controlPanel.RegisteredTimeouts.Remove(id);

await controlPanel.Refresh();

await controlPanel.RegisteredTimeouts.All.ShouldBeEmptyAsync();
await store.TimeoutStore.GetTimeouts(controlPanel.StoredId).ShouldBeEmptyAsync();

unhandledExceptionHandler.ThrownExceptions.Count.ShouldBe(0);
}

public abstract Task PendingTimeoutCanBeUpdatedFromControlPanel();
protected async Task PendingTimeoutCanBeUpdatedFromControlPanel(Task<IFunctionStore> storeTask)
{
var store = await storeTask;
var flowId = TestFlowId.Create();
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
using var functionsRegistry = new FunctionsRegistry
(
store,
new Settings(
unhandledExceptionHandler.Catch,
messagesDefaultMaxWaitForCompletion: TimeSpan.Zero
)
);
var registration = functionsRegistry.RegisterParamless(
flowId.Type,
inner: Task (workflow) =>
workflow
.Messages
.TakeUntilTimeout("TimeoutId4321", expiresIn: TimeSpan.FromMinutes(10))
.First()
);

await registration.Schedule("someInstanceId");

var controlPanel = await registration.ControlPanel("someInstanceId");
controlPanel.ShouldNotBeNull();
await controlPanel.BusyWaitUntil(cp => cp.Status == Status.Suspended);

var registeredTimeouts = await controlPanel.RegisteredTimeouts.All;
registeredTimeouts.Count.ShouldBe(1);
var registeredTimeout = registeredTimeouts.First();

var id = registeredTimeout.TimeoutId;
id.Id.ShouldBe("TimeoutId4321");
id.Type.ShouldBe(EffectType.Timeout);

var timeouts = (await store.TimeoutStore.GetTimeouts(controlPanel.StoredId)).ToList();
timeouts.Count.ShouldBe(1);
var timeout = timeouts.First();
timeout.TimeoutId.ShouldBe(id);

await controlPanel.RegisteredTimeouts.Upsert(id, new DateTime(2100, 1, 1, 0, 0, 0, DateTimeKind.Utc));

await controlPanel.Refresh();

registeredTimeout = (await controlPanel.RegisteredTimeouts.All).Single();
timeout = (await store.TimeoutStore.GetTimeouts(controlPanel.StoredId)).Single();

registeredTimeout.TimeoutId.ShouldBe(id);
timeout.TimeoutId.ShouldBe(id);

registeredTimeout.Expiry.ShouldBe(new DateTime(2100, 1, 1, 0, 0, 0, DateTimeKind.Utc));
new DateTime(timeout.Expiry).ShouldBe(new DateTime(2100, 1, 1, 0, 0, 0, DateTimeKind.Utc));

unhandledExceptionHandler.ThrownExceptions.Count.ShouldBe(0);
}

public abstract Task ExpiredImplicitTimeoutsAreAddedToMessages();
protected async Task ExpiredImplicitTimeoutsAreAddedToMessages(Task<IFunctionStore> storeTask)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ public ExistingStates CreateExistingStates(FlowId flowId)

public ExistingEffects CreateExistingEffects(FlowId flowId) => new(MapToStoredId(flowId), _functionStore.EffectsStore, _settings.Serializer);
public ExistingMessages CreateExistingMessages(FlowId flowId) => new(MapToStoredId(flowId), _functionStore.MessageStore, _settings.Serializer);
public ExistingRegisteredTimeouts CreateExistingTimeouts(FlowId flowId) => new(MapToStoredId(flowId), _functionStore.TimeoutStore);
public ExistingRegisteredTimeouts CreateExistingTimeouts(FlowId flowId, ExistingEffects existingEffects) => new(MapToStoredId(flowId), _functionStore.TimeoutStore, existingEffects);
public ExistingSemaphores CreateExistingSemaphores(FlowId flowId) => new(MapToStoredId(flowId), _functionStore, CreateExistingEffects(flowId));

public DistributedSemaphores CreateSemaphores(StoredId storedId, Effect effect)
Expand Down
18 changes: 11 additions & 7 deletions Core/Cleipnir.ResilientFunctions/CoreRuntime/RegisteredTimeouts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ public interface IRegisteredTimeouts
Task<IReadOnlyList<RegisteredTimeout>> PendingTimeouts();
}

public enum TimeoutStatus
{
Created,
Registered,
Cancelled
}

public class RegisteredTimeouts(StoredId storedId, ITimeoutStore timeoutStore, Effect effect) : IRegisteredTimeouts
{
private enum TimeoutStatus
{
Created,
Registered,
Cancelled
}


public string GetNextImplicitId() => EffectContext.CurrentContext.NextImplicitId();

Expand All @@ -41,7 +43,9 @@ await timeoutStore.UpsertTimeout(
}

public Task RegisterTimeout(string timeoutId, TimeSpan expiresIn)
=> RegisterTimeout(EffectId.CreateWithCurrentContext(timeoutId, EffectType.System), expiresAt: DateTime.UtcNow.Add(expiresIn));
=> RegisterTimeout(EffectId.CreateWithCurrentContext(timeoutId, EffectType.Timeout), expiresAt: DateTime.UtcNow.Add(expiresIn));
public Task RegisterTimeout(string timeoutId, DateTime expiresAt)
=> RegisterTimeout(EffectId.CreateWithCurrentContext(timeoutId, EffectType.Timeout), expiresAt);
public Task RegisterTimeout(EffectId timeoutId, TimeSpan expiresIn)
=> RegisterTimeout(timeoutId, expiresAt: DateTime.UtcNow.Add(expiresIn));

Expand Down
2 changes: 1 addition & 1 deletion Core/Cleipnir.ResilientFunctions/Domain/ControlPanel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ public async Task Refresh()
Effects = _invocationHelper.CreateExistingEffects(FlowId);
Messages = _invocationHelper.CreateExistingMessages(FlowId);
States = _invocationHelper.CreateExistingStates(FlowId);
RegisteredTimeouts = _invocationHelper.CreateExistingTimeouts(FlowId);
RegisteredTimeouts = _invocationHelper.CreateExistingTimeouts(FlowId, Effects);
Correlations = _invocationHelper.CreateCorrelations(FlowId);

_innerParamChanged = false;
Expand Down
17 changes: 10 additions & 7 deletions Core/Cleipnir.ResilientFunctions/Domain/ControlPanelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ internal ControlPanelFactory(FlowType flowType, StoredType storedType, Invoker<U
var functionState = await _invocationHelper.GetFunction(storedId);
if (functionState == null)
return null;


var existingEffects = _invocationHelper.CreateExistingEffects(flowId);
return new ControlPanel(
_invoker,
_invocationHelper,
Expand All @@ -36,11 +37,11 @@ internal ControlPanelFactory(FlowType flowType, StoredType storedType, Invoker<U
functionState.Status,
functionState.Epoch,
functionState.Expires,
_invocationHelper.CreateExistingEffects(flowId),
existingEffects,
_invocationHelper.CreateExistingStates(flowId),
_invocationHelper.CreateExistingMessages(flowId),
_invocationHelper.CreateExistingSemaphores(flowId),
_invocationHelper.CreateExistingTimeouts(flowId),
_invocationHelper.CreateExistingTimeouts(flowId, existingEffects),
_invocationHelper.CreateCorrelations(flowId),
functionState.PreviouslyThrownException
);
Expand Down Expand Up @@ -71,6 +72,7 @@ internal ControlPanelFactory(FlowType flowType, StoredType storedType, Invoker<T
if (functionState == null)
return null;

var existingEffects = _invocationHelper.CreateExistingEffects(flowId);
return new ControlPanel<TParam>(
_invoker,
_invocationHelper,
Expand All @@ -80,11 +82,11 @@ internal ControlPanelFactory(FlowType flowType, StoredType storedType, Invoker<T
functionState.Epoch,
functionState.Expires,
functionState.Param!,
_invocationHelper.CreateExistingEffects(flowId),
existingEffects,
_invocationHelper.CreateExistingStates(flowId),
_invocationHelper.CreateExistingMessages(flowId),
_invocationHelper.CreateExistingSemaphores(flowId),
_invocationHelper.CreateExistingTimeouts(flowId),
_invocationHelper.CreateExistingTimeouts(flowId, existingEffects),
_invocationHelper.CreateCorrelations(flowId),
functionState.PreviouslyThrownException
);
Expand Down Expand Up @@ -114,6 +116,7 @@ internal ControlPanelFactory(FlowType flowType, StoredType storedType, Invoker<T
if (f == null)
return null;

var existingEffects = _invocationHelper.CreateExistingEffects(flowId);
return new ControlPanel<TParam, TReturn>(
_invoker,
_invocationHelper,
Expand All @@ -124,11 +127,11 @@ internal ControlPanelFactory(FlowType flowType, StoredType storedType, Invoker<T
f.Expires,
f.Param!,
f.Result,
_invocationHelper.CreateExistingEffects(flowId),
existingEffects,
_invocationHelper.CreateExistingStates(flowId),
_invocationHelper.CreateExistingMessages(flowId),
_invocationHelper.CreateExistingSemaphores(flowId),
_invocationHelper.CreateExistingTimeouts(flowId),
_invocationHelper.CreateExistingTimeouts(flowId, existingEffects),
_invocationHelper.CreateCorrelations(flowId),
f.PreviouslyThrownException
);
Expand Down
4 changes: 3 additions & 1 deletion Core/Cleipnir.ResilientFunctions/Domain/EffectType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ public enum EffectType
{
Effect = 'E',
State = 'S',
System = 'Y'
System = 'Y',
Timeout = 'T'
}

public static class EffectTypeExtensions
{
public static bool IsState(this EffectType effectType) => effectType == EffectType.State;
public static bool IsSystem(this EffectType effectType) => effectType == EffectType.System;
public static bool IsEffect(this EffectType effectType) => effectType == EffectType.Effect;
public static bool IsTimeout(this EffectType effectType) => effectType == EffectType.Timeout;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.CoreRuntime;
using Cleipnir.ResilientFunctions.Helpers;
using Cleipnir.ResilientFunctions.Storage;

namespace Cleipnir.ResilientFunctions.Domain;

public class ExistingRegisteredTimeouts(StoredId storedId, ITimeoutStore timeoutStore)
public class ExistingRegisteredTimeouts(StoredId storedId, ITimeoutStore timeoutStore, ExistingEffects effects)
{
private Dictionary<EffectId, DateTime>? _timeouts;

Expand All @@ -23,7 +24,7 @@ private async Task<Dictionary<EffectId, DateTime>> GetTimeouts()
);
}

public Task<DateTime> this[TimeoutId timeoutId] => this[new EffectId(timeoutId.Value, EffectType.System, Context: "")];
public Task<DateTime> this[TimeoutId timeoutId] => this[new EffectId(timeoutId.Value, EffectType.Timeout, Context: "")];
public Task<DateTime> this[EffectId timeoutId] => GetTimeouts().ContinueWith(t => t.Result[timeoutId]);

public Task<IReadOnlyList<RegisteredTimeout>> All
Expand All @@ -34,24 +35,26 @@ public Task<IReadOnlyList<RegisteredTimeout>> All
.CastTo<IReadOnlyList<RegisteredTimeout>>()
);

public Task Remove(TimeoutId timeoutId) => Remove(new EffectId(timeoutId.Value, EffectType.System, Context: ""));
public Task Remove(TimeoutId timeoutId) => Remove(new EffectId(timeoutId.Value, EffectType.Timeout, Context: ""));
public async Task Remove(EffectId timeoutId)
{
var timeouts = await GetTimeouts();
await effects.Remove(timeoutId);
await timeoutStore.RemoveTimeout(storedId, timeoutId);
timeouts.Remove(timeoutId);
}

public Task Upsert(TimeoutId timeoutId, DateTime expiresAt)
=> Upsert(new EffectId(timeoutId.Value, EffectType.System, Context: ""), expiresAt);
=> Upsert(new EffectId(timeoutId.Value, EffectType.Timeout, Context: ""), expiresAt);
public async Task Upsert(EffectId timeoutId, DateTime expiresAt)
{
var timeouts = await GetTimeouts();
await timeoutStore.UpsertTimeout(
new StoredTimeout(storedId, timeoutId, expiresAt.ToUniversalTime().Ticks),
overwrite: true
);


await effects.SetValue(timeoutId, TimeoutStatus.Registered);
timeouts[timeoutId] = expiresAt;
}
public Task Upsert(TimeoutId timeoutId, TimeSpan expiresIn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ public static IReactiveChain<T> TakeUntil<T>(this IReactiveChain<T> s, Func<T, b
}

public static IReactiveChain<object> TakeUntilTimeout(this Messages s, string timeoutEventId, TimeSpan expiresIn)
=> new TimeoutOperator<object>(s.Source, EffectId.CreateWithCurrentContext(timeoutEventId, EffectType.System), expiresAt: DateTime.UtcNow.Add(expiresIn));
=> new TimeoutOperator<object>(s.Source, EffectId.CreateWithCurrentContext(timeoutEventId, EffectType.Timeout), expiresAt: DateTime.UtcNow.Add(expiresIn));
public static IReactiveChain<object> TakeUntilTimeout(this Messages s, string timeoutEventId, DateTime expiresAt)
=> new TimeoutOperator<object>(s.Source, EffectId.CreateWithCurrentContext(timeoutEventId, EffectType.System), expiresAt);
=> new TimeoutOperator<object>(s.Source, EffectId.CreateWithCurrentContext(timeoutEventId, EffectType.Timeout), expiresAt);
public static IReactiveChain<object> TakeUntilTimeout(this Messages s, TimeSpan expiresIn)
=> s.TakeUntilTimeout(s.RegisteredTimeouts.GetNextImplicitId(), expiresIn);
public static IReactiveChain<object> TakeUntilTimeout(this Messages s, DateTime expiresAt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public static Task<Option<T>> LastOfType<T>(this Messages messages, string timeo
public static async Task SuspendUntil(this Messages s, string timeoutEventId, DateTime resumeAt)
{
var timeoutEmitted = false;
var effectId = EffectId.CreateWithCurrentContext(timeoutEventId, EffectType.System);
var effectId = EffectId.CreateWithCurrentContext(timeoutEventId, EffectType.Timeout);
var subscription = s
.OfType<TimeoutEvent>()
.Where(t => t.TimeoutId == effectId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,15 @@ public override Task ExpiredTimeoutMakesReactiveChainThrowTimeoutException()
[TestMethod]
public override Task RegisteredTimeoutIsCancelledAfterReactiveChainCompletes()
=> RegisteredTimeoutIsCancelledAfterReactiveChainCompletes(FunctionStoreFactory.Create());


[TestMethod]
public override Task PendingTimeoutCanBeRemovedFromControlPanel()
=> PendingTimeoutCanBeRemovedFromControlPanel(FunctionStoreFactory.Create());

[TestMethod]
public override Task PendingTimeoutCanBeUpdatedFromControlPanel()
=> PendingTimeoutCanBeUpdatedFromControlPanel(FunctionStoreFactory.Create());

[TestMethod]
public override Task ExpiredImplicitTimeoutsAreAddedToMessages()
=> ExpiredImplicitTimeoutsAreAddedToMessages(FunctionStoreFactory.Create());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@ public override Task ExpiredTimeoutMakesReactiveChainThrowTimeoutException()
[TestMethod]
public override Task RegisteredTimeoutIsCancelledAfterReactiveChainCompletes()
=> RegisteredTimeoutIsCancelledAfterReactiveChainCompletes(FunctionStoreFactory.Create());


[TestMethod]
public override Task PendingTimeoutCanBeRemovedFromControlPanel()
=> PendingTimeoutCanBeRemovedFromControlPanel(FunctionStoreFactory.Create());

[TestMethod]
public override Task PendingTimeoutCanBeUpdatedFromControlPanel()
=> PendingTimeoutCanBeUpdatedFromControlPanel(FunctionStoreFactory.Create());

[TestMethod]
public override Task ExpiredImplicitTimeoutsAreAddedToMessages()
=> ExpiredImplicitTimeoutsAreAddedToMessages(FunctionStoreFactory.Create());
Expand Down
Loading

0 comments on commit ce44c17

Please sign in to comment.