Skip to content

Commit

Permalink
Removed eager sync on construction of ExistingTimeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
stidsborg committed Aug 5, 2024
1 parent 4fc3e73 commit cb83960
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1518,20 +1518,22 @@ protected async Task ExistingTimeoutCanBeUpdatedForAction(Task<IFunctionStore> s
var controlPanel = await actionRegistration.ControlPanel(flowInstance.Value);
controlPanel.ShouldNotBeNull();
var timeouts = controlPanel.Timeouts;
timeouts.All.Count.ShouldBe(1);
timeouts["someTimeoutId"].ShouldBe(new DateTime(2100, 1,1, 1,1,1, DateTimeKind.Utc));
(await timeouts.All).Count.ShouldBe(1);
await timeouts["someTimeoutId"].ShouldBeAsync(new DateTime(2100, 1,1, 1,1,1, DateTimeKind.Utc));

await timeouts.Upsert("someOtherTimeoutId", new DateTime(2101, 1, 1, 1, 1, 1, DateTimeKind.Utc));
timeouts.All.Count.ShouldBe(2);
timeouts["someOtherTimeoutId"].ShouldBe(new DateTime(2101, 1,1, 1,1,1, DateTimeKind.Utc));
(await timeouts.All).Count.ShouldBe(2);
await timeouts["someOtherTimeoutId"].ShouldBeAsync(new DateTime(2101, 1,1, 1,1,1, DateTimeKind.Utc));

await timeouts.Remove("someTimeoutId");
timeouts.All.Count.ShouldBe(1);
(await timeouts.All).Count.ShouldBe(1);

await controlPanel.Refresh();

timeouts.All.Count.ShouldBe(1);
timeouts["someOtherTimeoutId"].ShouldBe(new DateTime(2101, 1,1, 1,1,1, DateTimeKind.Utc));
(await timeouts.All).Count.ShouldBe(1);
await timeouts["someOtherTimeoutId"].ShouldBeAsync(new DateTime(2101, 1,1, 1,1,1, DateTimeKind.Utc));

unhandledExceptionCatcher.ShouldNotHaveExceptions();
}

public abstract Task ExistingTimeoutCanBeUpdatedForFunc();
Expand Down Expand Up @@ -1561,20 +1563,22 @@ await workflow.Messages.Timeouts.RegisterTimeout(
var controlPanel = await funcRegistration.ControlPanel(flowInstance.Value);
controlPanel.ShouldNotBeNull();
var timeouts = controlPanel.Timeouts;
timeouts.All.Count.ShouldBe(1);
timeouts["someTimeoutId"].ShouldBe(new DateTime(2100, 1,1, 1,1,1, DateTimeKind.Utc));
(await timeouts.All).Count.ShouldBe(1);
await timeouts["someTimeoutId"].ShouldBeAsync(new DateTime(2100, 1,1, 1,1,1, DateTimeKind.Utc));

await timeouts.Upsert("someOtherTimeoutId", new DateTime(2101, 1, 1, 1, 1, 1, DateTimeKind.Utc));
timeouts.All.Count.ShouldBe(2);
timeouts["someOtherTimeoutId"].ShouldBe(new DateTime(2101, 1,1, 1,1,1, DateTimeKind.Utc));
(await timeouts.All).Count.ShouldBe(2);
await timeouts["someOtherTimeoutId"].ShouldBeAsync(new DateTime(2101, 1,1, 1,1,1, DateTimeKind.Utc));

await timeouts.Remove("someTimeoutId");
timeouts.All.Count.ShouldBe(1);
(await timeouts.All).Count.ShouldBe(1);

await controlPanel.Refresh();

timeouts.All.Count.ShouldBe(1);
timeouts["someOtherTimeoutId"].ShouldBe(new DateTime(2101, 1,1, 1,1,1, DateTimeKind.Utc));
(await timeouts.All).Count.ShouldBe(1);
await timeouts["someOtherTimeoutId"].ShouldBeAsync(new DateTime(2101, 1,1, 1,1,1, DateTimeKind.Utc));

unhandledExceptionCatcher.ShouldNotHaveExceptions();
}

public abstract Task CorrelationsCanBeChanged();
Expand Down Expand Up @@ -1609,6 +1613,8 @@ protected async Task CorrelationsCanBeChanged(Task<IFunctionStore> storeTask)
controlPanel.ShouldNotBeNull();
await controlPanel.Correlations.Contains("SomeCorrelation").ShouldBeFalseAsync();
await controlPanel.Correlations.Contains("SomeNewCorrelation").ShouldBeTrueAsync();

unhandledExceptionCatcher.ShouldNotHaveExceptions();
}

public abstract Task DeleteRemovesFunctionFromAllStores();
Expand Down Expand Up @@ -1661,5 +1667,7 @@ await store.EffectsStore
.GetEffectResults(functionId)
.SelectAsync(e => e.Any())
.ShouldBeFalseAsync();

unhandledExceptionCatcher.ShouldNotHaveExceptions();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -418,13 +418,7 @@ public ExistingStates CreateExistingStates(FlowId flowId, string? defaultState)

public ExistingEffects CreateExistingEffects(FlowId flowId) => new(flowId, _functionStore.EffectsStore, _settings.Serializer);
public ExistingMessages CreateExistingMessages(FlowId flowId) => new(flowId, _functionStore.MessageStore, _settings.Serializer);

public async Task<ExistingTimeouts> GetExistingTimeouts(FlowId flowId)
=> new ExistingTimeouts(
flowId,
_functionStore.TimeoutStore,
await _functionStore.TimeoutStore.GetTimeouts(flowId)
);
public ExistingTimeouts CreateExistingTimeouts(FlowId flowId) => new(flowId, _functionStore.TimeoutStore);

private string? SerializeParameter(TParam param)
{
Expand Down
2 changes: 1 addition & 1 deletion Core/Cleipnir.ResilientFunctions/Domain/ControlPanel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public async Task Refresh()
Effects = _invocationHelper.CreateExistingEffects(FlowId);
Messages = _invocationHelper.CreateExistingMessages(FlowId);
States = _invocationHelper.CreateExistingStates(FlowId, sf.DefaultState);
Timeouts = await _invocationHelper.GetExistingTimeouts(FlowId);
Timeouts = _invocationHelper.CreateExistingTimeouts(FlowId);
Correlations = _invocationHelper.CreateCorrelations(FlowId);

_innerParamChanged = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ internal ControlPanelFactory(FlowType flowType, Invoker<Unit, Unit> invoker, Inv
_invocationHelper.CreateExistingEffects(flowId),
_invocationHelper.CreateExistingStates(flowId, functionState.DefaultState),
_invocationHelper.CreateExistingMessages(flowId),
await _invocationHelper.GetExistingTimeouts(flowId),
_invocationHelper.CreateExistingTimeouts(flowId),
_invocationHelper.CreateCorrelations(flowId),
functionState.PreviouslyThrownException
);
Expand Down Expand Up @@ -75,7 +75,7 @@ internal ControlPanelFactory(FlowType flowType, Invoker<TParam, Unit> invoker, I
_invocationHelper.CreateExistingEffects(flowId),
_invocationHelper.CreateExistingStates(flowId, functionState.DefaultState),
_invocationHelper.CreateExistingMessages(flowId),
await _invocationHelper.GetExistingTimeouts(flowId),
_invocationHelper.CreateExistingTimeouts(flowId),
_invocationHelper.CreateCorrelations(flowId),
functionState.PreviouslyThrownException
);
Expand Down Expand Up @@ -115,7 +115,7 @@ internal ControlPanelFactory(FlowType flowType, Invoker<TParam, TReturn> invoker
_invocationHelper.CreateExistingEffects(flowId),
_invocationHelper.CreateExistingStates(flowId, f.DefaultState),
_invocationHelper.CreateExistingMessages(flowId),
await _invocationHelper.GetExistingTimeouts(flowId),
_invocationHelper.CreateExistingTimeouts(flowId),
_invocationHelper.CreateCorrelations(flowId),
f.PreviouslyThrownException
);
Expand Down
58 changes: 33 additions & 25 deletions Core/Cleipnir.ResilientFunctions/Domain/ExistingTimeouts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,56 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.Domain.Events;
using Cleipnir.ResilientFunctions.Helpers;
using Cleipnir.ResilientFunctions.Storage;

namespace Cleipnir.ResilientFunctions.Domain;

public class ExistingTimeouts
public class ExistingTimeouts(FlowId flowId, ITimeoutStore timeoutStore)
{
private readonly FlowId _flowId;
private readonly ITimeoutStore _timeoutStore;
private readonly Dictionary<string, DateTime> _timeouts;

public ExistingTimeouts(FlowId flowId, ITimeoutStore timeoutStore, IEnumerable<StoredTimeout> storedTimeouts)
private Dictionary<TimeoutId, DateTime>? _timeouts;

private async Task<Dictionary<TimeoutId, DateTime>> GetTimeouts()
{
_flowId = flowId;
_timeoutStore = timeoutStore;
_timeouts = storedTimeouts.ToDictionary(s => s.TimeoutId, s => new DateTime(s.Expiry, DateTimeKind.Utc));
if (_timeouts is not null)
return _timeouts;

var storedTimeouts = await timeoutStore.GetTimeouts(flowId);
return _timeouts = storedTimeouts.ToDictionary(
s => new TimeoutId(s.TimeoutId),
s => new DateTime(s.Expiry, DateTimeKind.Utc)
);
}

public DateTime this[string timeoutId] => _timeouts[timeoutId];

public IReadOnlyList<TimeoutEvent> All
=> _timeouts
.Select(kv => new TimeoutEvent(kv.Key, kv.Value))
.ToList();

public async Task Remove(string timeoutId)
public Task<DateTime> this[TimeoutId timeoutId] => GetTimeouts().ContinueWith(t => t.Result[timeoutId]);

public Task<IReadOnlyList<RegisteredTimeout>> All

Check failure on line 28 in Core/Cleipnir.ResilientFunctions/Domain/ExistingTimeouts.cs

View workflow job for this annotation

GitHub Actions / build

The type or namespace name 'RegisteredTimeout' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 28 in Core/Cleipnir.ResilientFunctions/Domain/ExistingTimeouts.cs

View workflow job for this annotation

GitHub Actions / build

The type or namespace name 'RegisteredTimeout' could not be found (are you missing a using directive or an assembly reference?)
=> GetTimeouts().ContinueWith(
t => t.Result
.Select(kv => new RegisteredTimeout(kv.Key, kv.Value))
.ToList()
.CastTo<IReadOnlyList<RegisteredTimeout>>()
);

public async Task Remove(TimeoutId timeoutId)
{
await _timeoutStore.RemoveTimeout(_flowId, timeoutId);
var timeouts = await GetTimeouts();

_timeouts.Remove(timeoutId);
await timeoutStore.RemoveTimeout(flowId, timeoutId.Value);
timeouts.Remove(timeoutId);
}

public async Task Upsert(string timeoutId, DateTime expiresAt)
public async Task Upsert(TimeoutId timeoutId, DateTime expiresAt)
{
await _timeoutStore.UpsertTimeout(
new StoredTimeout(_flowId, timeoutId, expiresAt.ToUniversalTime().Ticks),
var timeouts = await GetTimeouts();
await timeoutStore.UpsertTimeout(
new StoredTimeout(flowId, timeoutId.Value, expiresAt.ToUniversalTime().Ticks),
overwrite: true
);

_timeouts[timeoutId] = expiresAt;
timeouts[timeoutId] = expiresAt;
}

public Task Upsert(string timeoutId, TimeSpan expiresIn)
public Task Upsert(TimeoutId timeoutId, TimeSpan expiresIn)
=> Upsert(timeoutId, expiresAt: DateTime.UtcNow.Add(expiresIn));
}
24 changes: 24 additions & 0 deletions Core/Cleipnir.ResilientFunctions/Domain/TimeoutId.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;

namespace Cleipnir.ResilientFunctions.Domain;

public class TimeoutId
{
public string Value { get; }
public TimeoutId(string value)
{
ArgumentNullException.ThrowIfNull(value);
Delimiters.EnsureNoUnitSeparator(value);

Value = value;
}

public static implicit operator TimeoutId(string flowInstance) => new(flowInstance);
public override string ToString() => Value;
public static bool operator ==(TimeoutId id1, TimeoutId id2) => id1.Equals(id2);
public static bool operator !=(TimeoutId id1, TimeoutId id2) => !(id1 == id2);

public override bool Equals(object? obj)
=> obj is TimeoutId id && id.Value == Value;
public override int GetHashCode() => Value.GetHashCode();
}

0 comments on commit cb83960

Please sign in to comment.