Skip to content

Commit

Permalink
TimeoutProvider optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
stidsborg committed Jul 31, 2024
1 parent e09391f commit 8af93bd
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ await functionStore.CreateFunction(
);
var eventSerializer = new EventSerializer();
var messagesWriter = new MessageWriter(functionId, functionStore, eventSerializer, scheduleReInvocation: (_, _) => Task.CompletedTask);
var timeoutProvider = new TimeoutProvider(functionId, functionStore.TimeoutStore, messagesWriter, timeoutCheckFrequency: TimeSpan.FromSeconds(1));
var timeoutProvider = new TimeoutProvider(functionId, functionStore.TimeoutStore);
var messagesPullerAndEmitter = new MessagesPullerAndEmitter(
functionId,
defaultDelay: TimeSpan.FromSeconds(1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ await functionStore.CreateFunction(
timestamp: DateTime.UtcNow.Ticks
);
var messagesWriter = new MessageWriter(functionId, functionStore, DefaultSerializer.Instance, scheduleReInvocation: (_, _) => Task.CompletedTask);
var timeoutProvider = new TimeoutProvider(functionId, functionStore.TimeoutStore, messagesWriter, timeoutCheckFrequency: TimeSpan.FromSeconds(1));
var timeoutProvider = new TimeoutProvider(functionId, functionStore.TimeoutStore);
var messagesPullerAndEmitter = new MessagesPullerAndEmitter(
functionId,
defaultDelay: TimeSpan.FromMilliseconds(250),
Expand Down Expand Up @@ -64,7 +64,7 @@ await functionStore.CreateFunction(
timestamp: DateTime.UtcNow.Ticks
);
var messagesWriter = new MessageWriter(functionId, functionStore, DefaultSerializer.Instance, scheduleReInvocation: (_, _) => Task.CompletedTask);
var timeoutProvider = new TimeoutProvider(functionId, functionStore.TimeoutStore, messagesWriter, timeoutCheckFrequency: TimeSpan.FromSeconds(1));
var timeoutProvider = new TimeoutProvider(functionId, functionStore.TimeoutStore);
var messagesPullerAndEmitter = new MessagesPullerAndEmitter(
functionId,
defaultDelay: TimeSpan.FromMilliseconds(250),
Expand Down Expand Up @@ -102,7 +102,7 @@ await functionStore.CreateFunction(
timestamp: DateTime.UtcNow.Ticks
);
var messagesWriter = new MessageWriter(functionId, functionStore, DefaultSerializer.Instance, scheduleReInvocation: (_, _) => Task.CompletedTask);
var timeoutProvider = new TimeoutProvider(functionId, functionStore.TimeoutStore, messagesWriter, timeoutCheckFrequency: TimeSpan.FromSeconds(1));
var timeoutProvider = new TimeoutProvider(functionId, functionStore.TimeoutStore);
var messagesPullerAndEmitter = new MessagesPullerAndEmitter(
functionId,
defaultDelay: TimeSpan.FromMilliseconds(250),
Expand Down Expand Up @@ -145,7 +145,7 @@ await functionStore.CreateFunction(
timestamp: DateTime.UtcNow.Ticks
);
var messagesWriter = new MessageWriter(functionId, functionStore, DefaultSerializer.Instance, scheduleReInvocation: (_, _) => Task.CompletedTask);
var timeoutProvider = new TimeoutProvider(functionId, functionStore.TimeoutStore, messagesWriter, timeoutCheckFrequency: TimeSpan.FromSeconds(1));
var timeoutProvider = new TimeoutProvider(functionId, functionStore.TimeoutStore);
var messagesPullerAndEmitter = new MessagesPullerAndEmitter(
functionId,
defaultDelay: TimeSpan.FromMilliseconds(250),
Expand Down Expand Up @@ -187,7 +187,7 @@ await functionStore.CreateFunction(
timestamp: DateTime.UtcNow.Ticks
);
var messagesWriter = new MessageWriter(functionId, functionStore, DefaultSerializer.Instance, scheduleReInvocation: (_, _) => Task.CompletedTask);
var timeoutProvider = new TimeoutProvider(functionId, functionStore.TimeoutStore, messagesWriter, timeoutCheckFrequency: TimeSpan.FromSeconds(1));
var timeoutProvider = new TimeoutProvider(functionId, functionStore.TimeoutStore);
var messagesPullerAndEmitter = new MessagesPullerAndEmitter(
functionId,
defaultDelay: TimeSpan.FromMilliseconds(250),
Expand Down Expand Up @@ -225,7 +225,7 @@ await functionStore.CreateFunction(
timestamp: DateTime.UtcNow.Ticks
);
var messagesWriter = new MessageWriter(functionId, functionStore, DefaultSerializer.Instance, scheduleReInvocation: (_, _) => Task.CompletedTask);
var timeoutProvider = new TimeoutProvider(functionId, functionStore.TimeoutStore, messagesWriter, timeoutCheckFrequency: TimeSpan.FromSeconds(1));
var timeoutProvider = new TimeoutProvider(functionId, functionStore.TimeoutStore);
var messagesPullerAndEmitter = new MessagesPullerAndEmitter(
functionId,
defaultDelay: TimeSpan.FromMilliseconds(250),
Expand Down Expand Up @@ -275,7 +275,7 @@ await functionStore.CreateFunction(
timestamp: DateTime.UtcNow.Ticks
);
var messagesWriter = new MessageWriter(functionId, functionStore, DefaultSerializer.Instance, scheduleReInvocation: (_, _) => Task.CompletedTask);
var timeoutProvider = new TimeoutProvider(functionId, functionStore.TimeoutStore, messagesWriter, timeoutCheckFrequency: TimeSpan.FromSeconds(1));
var timeoutProvider = new TimeoutProvider(functionId, functionStore.TimeoutStore);
var messagesPullerAndEmitter = new MessagesPullerAndEmitter(
functionId,
defaultDelay: TimeSpan.FromMilliseconds(250),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.CoreRuntime;
using Cleipnir.ResilientFunctions.Domain.Events;
using Cleipnir.ResilientFunctions.Helpers;

namespace Cleipnir.ResilientFunctions.Tests.ReactiveTests;

Expand All @@ -18,5 +19,7 @@ public Task RegisterTimeout(string timeoutId, TimeSpan expiresIn, bool overwrite
public Task CancelTimeout(string timeoutId)
=> Task.CompletedTask;

public Task<List<TimeoutEvent>> PendingTimeouts() => Task.FromResult(new List<TimeoutEvent>());
public Task<IReadOnlyList<TimeoutEvent>> PendingTimeouts() => new List<TimeoutEvent>()
.CastTo<IReadOnlyList<TimeoutEvent>>()
.ToTask();
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public Task RegisterTimeout(string timeoutId, TimeSpan expiresIn, bool overwrite

public Task CancelTimeout(string timeoutId)
=> Task.FromException(new Exception("Stub-method invocation"));
public Task<List<TimeoutEvent>> PendingTimeouts()
=> Task.FromException<List<TimeoutEvent>>(new Exception("Stub-method invocation"));
public Task<IReadOnlyList<TimeoutEvent>> PendingTimeouts()
=> Task.FromException<IReadOnlyList<TimeoutEvent>>(new Exception("Stub-method invocation"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,7 @@ protected async Task RegisteredTimeoutIsReturnedFromTimeoutProvider(Task<ITimeou
var store = await storeTask;
var functionId = TestFlowId.Create();

var timeoutProvider = new TimeoutProvider(
functionId,
store,
messageWriter: null,
timeoutCheckFrequency: TimeSpan.Zero
);
var timeoutProvider = new TimeoutProvider(functionId, store);

await timeoutProvider.RegisterTimeout("timeoutId1", expiresIn: TimeSpan.FromHours(1));
await timeoutProvider.RegisterTimeout("timeoutId2", expiresIn: TimeSpan.FromHours(2));
Expand All @@ -131,18 +126,11 @@ protected async Task RegisteredTimeoutIsReturnedFromTimeoutProviderForFunctionId
var store = await storeTask;
var functionId = TestFlowId.Create();

var timeoutProvider = new TimeoutProvider(
functionId,
store,
messageWriter: null,
timeoutCheckFrequency: TimeSpan.Zero
);
var timeoutProvider = new TimeoutProvider(functionId, store);

var otherInstanceTimeoutProvider = new TimeoutProvider(
new FlowId(functionId.Type, functionId.Instance.Value + "2"),
store,
messageWriter: null,
timeoutCheckFrequency: TimeSpan.Zero
new FlowId(functionId.Type, functionId.Instance.Value + "2"),
store
);

await timeoutProvider.RegisterTimeout("timeoutId1", expiresIn: TimeSpan.FromHours(1));
Expand All @@ -164,12 +152,7 @@ protected async Task TimeoutIsNotRegisteredAgainWhenProviderAlreadyContainsTimeo
var store = new TimeoutStoreDecorator(await storeTask, () => upsertCount++);
var functionId = TestFlowId.Create();

var timeoutProvider = new TimeoutProvider(
functionId,
store,
messageWriter: null,
timeoutCheckFrequency: TimeSpan.Zero
);
var timeoutProvider = new TimeoutProvider(functionId, store);

await timeoutProvider.RegisterTimeout("timeoutId1", expiresIn: TimeSpan.FromHours(1));
upsertCount.ShouldBe(1);
Expand All @@ -189,12 +172,7 @@ protected async Task CancellingNonExistingTimeoutDoesNotResultInIO(Task<ITimeout
var store = new TimeoutStoreDecorator(await storeTask, removeTimeoutCallback: () => removeCount++);
var functionId = TestFlowId.Create();

var timeoutProvider = new TimeoutProvider(
functionId,
store,
messageWriter: null,
timeoutCheckFrequency: TimeSpan.Zero
);
var timeoutProvider = new TimeoutProvider(functionId, store);

var pendingTimeouts = await timeoutProvider.PendingTimeouts();
pendingTimeouts.ShouldBeEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,7 @@ await _functionStore.BulkScheduleFunctions(
public async Task<Messages> CreateMessages(FlowId flowId, ScheduleReInvocation scheduleReInvocation, Func<bool> isWorkflowRunning, bool sync)
{
var messageWriter = new MessageWriter(flowId, _functionStore, Serializer, scheduleReInvocation);
var timeoutProvider = new TimeoutProvider(
flowId,
_functionStore.TimeoutStore,
messageWriter,
_settings.WatchdogCheckFrequency
);
var timeoutProvider = new TimeoutProvider(flowId, _functionStore.TimeoutStore);
var messagesPullerAndEmitter = new MessagesPullerAndEmitter(
flowId,
defaultDelay: _settings.MessagesPullFrequency,
Expand Down
81 changes: 34 additions & 47 deletions Core/Cleipnir.ResilientFunctions/CoreRuntime/TimeoutProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.Domain;
using Cleipnir.ResilientFunctions.Domain.Events;
using Cleipnir.ResilientFunctions.Helpers;
using Cleipnir.ResilientFunctions.Messaging;
using Cleipnir.ResilientFunctions.Storage;

namespace Cleipnir.ResilientFunctions.CoreRuntime;
Expand All @@ -15,88 +13,77 @@ public interface ITimeoutProvider
Task RegisterTimeout(string timeoutId, DateTime expiresAt, bool overwrite = false);
Task RegisterTimeout(string timeoutId, TimeSpan expiresIn, bool overwrite = false);
Task CancelTimeout(string timeoutId);
Task<List<TimeoutEvent>> PendingTimeouts();
Task<IReadOnlyList<TimeoutEvent>> PendingTimeouts();
}

public class TimeoutProvider : ITimeoutProvider
{
private readonly ITimeoutStore _timeoutStore;

private readonly MessageWriter? _messageWriter;
private readonly TimeSpan _timeoutCheckFrequency;
private readonly HashSet<string> _localTimeouts = new();
private Dictionary<string, TimeoutEvent>? _localTimeouts2;
private readonly object _sync = new();

private readonly FlowId _flowId;

public TimeoutProvider(FlowId flowId, ITimeoutStore timeoutStore, MessageWriter? messageWriter, TimeSpan timeoutCheckFrequency)
public TimeoutProvider(FlowId flowId, ITimeoutStore timeoutStore)
{
_timeoutStore = timeoutStore;
_messageWriter = messageWriter;
_timeoutCheckFrequency = timeoutCheckFrequency;
_flowId = flowId;
}

private async Task<Dictionary<string, TimeoutEvent>> GetRegisteredTimeouts()
{
lock (_sync)
if (_localTimeouts2 is not null)
return _localTimeouts2;

var timeouts = await _timeoutStore.GetTimeouts(_flowId);
var localTimeouts = timeouts
.ToDictionary(
t => t.TimeoutId,
t => new TimeoutEvent(t.TimeoutId, new DateTime(t.Expiry).ToUniversalTime())
);

lock (_sync)
if (_localTimeouts2 is null)
_localTimeouts2 = localTimeouts;
else
localTimeouts = _localTimeouts2;

return localTimeouts;
}

public async Task RegisterTimeout(string timeoutId, DateTime expiresAt, bool overwrite = false)
{
var registeredTimeouts = await GetRegisteredTimeouts();
lock (_sync)
if (_localTimeouts.Contains(timeoutId) && !overwrite)
if (registeredTimeouts.ContainsKey(timeoutId) && !overwrite)
return;
else
registeredTimeouts[timeoutId] = new TimeoutEvent(timeoutId, expiresAt.ToUniversalTime());

expiresAt = expiresAt.ToUniversalTime();
_ = RegisterLocalTimeout(timeoutId, expiresAt);
await _timeoutStore.UpsertTimeout(new StoredTimeout(_flowId, timeoutId, expiresAt.Ticks), overwrite);
}

public Task RegisterTimeout(string timeoutId, TimeSpan expiresIn, bool overwrite = false)
=> RegisterTimeout(timeoutId, expiresAt: DateTime.UtcNow.Add(expiresIn), overwrite);

private async Task RegisterLocalTimeout(string timeoutId, DateTime expiresAt)
{
if (_messageWriter == null) return;

var expiresIn = expiresAt - DateTime.UtcNow;
if (expiresIn > _timeoutCheckFrequency) return;

lock (_sync)
_localTimeouts.Add(timeoutId);

await Task.Delay(expiresIn.RoundUpToZero());

lock (_sync)
if (!_localTimeouts.Contains(timeoutId)) return;

await _messageWriter.AppendMessage(
new TimeoutEvent(timeoutId, expiresAt),
idempotencyKey: $"Timeout¤{timeoutId}"
);

await CancelTimeout(timeoutId);
}

public async Task CancelTimeout(string timeoutId)
{
var registeredTimeouts = await GetRegisteredTimeouts();
lock (_sync)
if (!_localTimeouts.Remove(timeoutId))
if (!registeredTimeouts.Remove(timeoutId))
return;

await _timeoutStore.RemoveTimeout(_flowId, timeoutId);
}

public async Task<List<TimeoutEvent>> PendingTimeouts()
public async Task<IReadOnlyList<TimeoutEvent>> PendingTimeouts()
{
var timeouts = (await _timeoutStore.GetTimeouts(_flowId)).ToList();
var registeredTimeouts = await GetRegisteredTimeouts();

lock (_sync)
{
_localTimeouts.Clear();
foreach (var timeout in timeouts)
_localTimeouts.Add(timeout.TimeoutId);
}

return timeouts
.Where(t => t.FlowId == _flowId)
.Select(t => new TimeoutEvent(t.TimeoutId, Expiration: new DateTime(t.Expiry).ToUniversalTime()))
.ToList();
return registeredTimeouts.Values.ToList();
}
}

0 comments on commit 8af93bd

Please sign in to comment.