diff --git a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/AsyncSemaphoreTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/AsyncSemaphoreTests.cs index d17e1985..d511f1a0 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/AsyncSemaphoreTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/AsyncSemaphoreTests.cs @@ -1,4 +1,5 @@ using System.Threading.Tasks; +using Cleipnir.ResilientFunctions.CoreRuntime.Invocation; using Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs; using Cleipnir.ResilientFunctions.Helpers; using Cleipnir.ResilientFunctions.Tests.Utils; diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/AsyncSemaphore.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/AsyncSemaphore.cs similarity index 69% rename from Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/AsyncSemaphore.cs rename to Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/AsyncSemaphore.cs index 1351d024..00f90394 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/AsyncSemaphore.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/AsyncSemaphore.cs @@ -1,8 +1,9 @@ using System; using System.Threading; using System.Threading.Tasks; +using Cleipnir.ResilientFunctions.Helpers.Disposables; -namespace Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs; +namespace Cleipnir.ResilientFunctions.CoreRuntime.Invocation; public class AsyncSemaphore { @@ -16,6 +17,16 @@ public async Task Take() return new Lock(_semaphore); } + public bool TryTake(out IDisposable @lock) + { + var success = _semaphore.WaitAsync(timeout: TimeSpan.Zero).Result; + @lock = success + ? new Lock(_semaphore) + : Disposable.NoOp(); + + return success; + } + private class Lock : IDisposable { private readonly SemaphoreSlim _semaphore; diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/ReInvoke.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/ReInvoke.cs new file mode 100644 index 00000000..fee8af07 --- /dev/null +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/ReInvoke.cs @@ -0,0 +1,5 @@ +using System.Threading.Tasks; + +namespace Cleipnir.ResilientFunctions.CoreRuntime.Invocation; + +public delegate Task ReInvoke(string functionInstanceId, int expectedEpoch); \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/CrashedWatchdog.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/CrashedWatchdog.cs index 8c3a840b..e750af7b 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/CrashedWatchdog.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/CrashedWatchdog.cs @@ -12,22 +12,22 @@ namespace Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs; internal class CrashedWatchdog { private readonly FunctionTypeId _functionTypeId; - private readonly ScheduleReInvocation _reInvoke; + private readonly ReInvoke _reInvoke; private readonly IFunctionStore _functionStore; private readonly TimeSpan _signOfLifeFrequency; private readonly TimeSpan _delayStartUp; private readonly UnhandledExceptionHandler _unhandledExceptionHandler; private readonly ShutdownCoordinator _shutdownCoordinator; - private readonly AsyncSemaphore _asyncSemaphore; + private readonly AsyncSemaphore _maxParallelismSemaphore; private readonly HashSet _toBeExecuted = new(); private readonly object _sync = new(); public CrashedWatchdog( FunctionTypeId functionTypeId, IFunctionStore functionStore, - ScheduleReInvocation reInvoke, - AsyncSemaphore asyncSemaphore, + ReInvoke reInvoke, + AsyncSemaphore maxParallelismSemaphore, TimeSpan signOfLifeFrequency, TimeSpan delayStartUp, UnhandledExceptionHandler unhandledExceptionHandler, @@ -36,7 +36,7 @@ public CrashedWatchdog( _functionTypeId = functionTypeId; _functionStore = functionStore; _reInvoke = reInvoke; - _asyncSemaphore = asyncSemaphore; + _maxParallelismSemaphore = maxParallelismSemaphore; _signOfLifeFrequency = signOfLifeFrequency; _delayStartUp = delayStartUp; _unhandledExceptionHandler = unhandledExceptionHandler; @@ -76,12 +76,10 @@ public async Task Start() private async Task ReInvokeCrashedFunction(StoredExecutingFunction sef) { lock (_sync) - if (_toBeExecuted.Contains(sef.InstanceId)) + if (!_toBeExecuted.Add(sef.InstanceId)) return; - else - _toBeExecuted.Add(sef.InstanceId); - - using var @lock = await _asyncSemaphore.Take(); + + using var @lock = await _maxParallelismSemaphore.Take(); if (_shutdownCoordinator.ShutdownInitiated || sef.LeaseExpiration > DateTime.UtcNow.Ticks) return; try diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/PostponedWatchdog.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/PostponedWatchdog.cs index cd1d369c..5a632beb 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/PostponedWatchdog.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/PostponedWatchdog.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; using Cleipnir.ResilientFunctions.CoreRuntime.Invocation; using Cleipnir.ResilientFunctions.Domain; @@ -13,17 +14,20 @@ internal class PostponedWatchdog private readonly IFunctionStore _functionStore; private readonly UnhandledExceptionHandler _unhandledExceptionHandler; private readonly ShutdownCoordinator _shutdownCoordinator; - private readonly ScheduleReInvocation _reInvoke; - private readonly AsyncSemaphore _asyncSemaphore; + private readonly ReInvoke _reInvoke; + private readonly AsyncSemaphore _maxParallelismSemaphore; private readonly TimeSpan _postponedCheckFrequency; private readonly TimeSpan _delayStartUp; private readonly FunctionTypeId _functionTypeId; + + private readonly HashSet _toBeExecuted = new(); + private readonly object _sync = new(); public PostponedWatchdog( FunctionTypeId functionTypeId, IFunctionStore functionStore, - ScheduleReInvocation reInvoke, - AsyncSemaphore asyncSemaphore, + ReInvoke reInvoke, + AsyncSemaphore maxParallelismSemaphore, TimeSpan postponedCheckFrequency, TimeSpan delayStartUp, UnhandledExceptionHandler unhandledExceptionHandler, @@ -34,7 +38,7 @@ public PostponedWatchdog( _unhandledExceptionHandler = unhandledExceptionHandler; _shutdownCoordinator = shutdownCoordinator; _reInvoke = reInvoke; - _asyncSemaphore = asyncSemaphore; + _maxParallelismSemaphore = maxParallelismSemaphore; _postponedCheckFrequency = postponedCheckFrequency; _delayStartUp = delayStartUp; } @@ -73,6 +77,10 @@ public async Task Start() private async Task SleepAndThenReInvoke(StoredPostponedFunction spf, DateTime now) { + lock (_sync) + if (!_toBeExecuted.Add(spf.InstanceId)) + return; + var functionId = new FunctionId(_functionTypeId, spf.InstanceId); var postponedUntil = new DateTime(spf.PostponedUntil, DateTimeKind.Utc); @@ -81,7 +89,7 @@ private async Task SleepAndThenReInvoke(StoredPostponedFunction spf, DateTime no if (_shutdownCoordinator.ShutdownInitiated) return; - using var @lock = await _asyncSemaphore.Take(); + using var @lock = await _maxParallelismSemaphore.Take(); try { while (DateTime.UtcNow < postponedUntil) //clock resolution means that we might wake up early @@ -107,5 +115,10 @@ private async Task SleepAndThenReInvoke(StoredPostponedFunction spf, DateTime no ) ); } + finally + { + lock (_sync) + _toBeExecuted.Remove(spf.InstanceId); + } } } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/WatchDogsFactory.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/WatchDogsFactory.cs index 648339a5..23baccaf 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/WatchDogsFactory.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/WatchDogsFactory.cs @@ -11,7 +11,7 @@ internal static class WatchDogsFactory public static void CreateAndStart( FunctionTypeId functionTypeId, IFunctionStore functionStore, - ScheduleReInvocation reInvoke, + ReInvoke reInvoke, SettingsWithDefaults settings, ShutdownCoordinator shutdownCoordinator) { diff --git a/Core/Cleipnir.ResilientFunctions/Messaging/EventSource.cs b/Core/Cleipnir.ResilientFunctions/Messaging/EventSource.cs index 60c52381..ab3f79ca 100644 --- a/Core/Cleipnir.ResilientFunctions/Messaging/EventSource.cs +++ b/Core/Cleipnir.ResilientFunctions/Messaging/EventSource.cs @@ -3,6 +3,7 @@ using System.Linq; using System.Threading.Tasks; using Cleipnir.ResilientFunctions.CoreRuntime; +using Cleipnir.ResilientFunctions.CoreRuntime.Invocation; using Cleipnir.ResilientFunctions.CoreRuntime.ParameterSerialization; using Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs; using Cleipnir.ResilientFunctions.Domain; diff --git a/Core/Cleipnir.ResilientFunctions/RFunctions.cs b/Core/Cleipnir.ResilientFunctions/RFunctions.cs index 6ebefedf..2100b684 100644 --- a/Core/Cleipnir.ResilientFunctions/RFunctions.cs +++ b/Core/Cleipnir.ResilientFunctions/RFunctions.cs @@ -377,7 +377,7 @@ public RFunc RegisterFunc rFuncInvoker.ReInvoke(id.ToString(), epoch), + rFuncInvoker.ReInvoke, settingsWithDefaults, _shutdownCoordinator ); @@ -528,7 +528,7 @@ private RAction RegisterAction( WatchDogsFactory.CreateAndStart( functionTypeId, _functionStore, - reInvoke: (id, epoch) => rActionInvoker.ReInvoke(id.ToString(), epoch), + rActionInvoker.ReInvoke, settingsWithDefaults, _shutdownCoordinator );