Skip to content

Commit

Permalink
Added to-be-executed hashset to PostponedWatchdog
Browse files Browse the repository at this point in the history
  • Loading branch information
stidsborg committed Jan 5, 2024
1 parent 90eb996 commit 34f7423
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.CoreRuntime.Invocation;
using Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
using Cleipnir.ResilientFunctions.Helpers;
using Cleipnir.ResilientFunctions.Tests.Utils;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.Helpers.Disposables;

namespace Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
namespace Cleipnir.ResilientFunctions.CoreRuntime.Invocation;

public class AsyncSemaphore
{
Expand All @@ -16,6 +17,16 @@ public async Task<IDisposable> Take()
return new Lock(_semaphore);
}

public bool TryTake(out IDisposable @lock)
{
var success = _semaphore.WaitAsync(timeout: TimeSpan.Zero).Result;
@lock = success
? new Lock(_semaphore)
: Disposable.NoOp();

return success;
}

private class Lock : IDisposable
{
private readonly SemaphoreSlim _semaphore;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
using System.Threading.Tasks;

namespace Cleipnir.ResilientFunctions.CoreRuntime.Invocation;

public delegate Task ReInvoke(string functionInstanceId, int expectedEpoch);
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@ namespace Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
internal class CrashedWatchdog
{
private readonly FunctionTypeId _functionTypeId;
private readonly ScheduleReInvocation _reInvoke;
private readonly ReInvoke _reInvoke;
private readonly IFunctionStore _functionStore;
private readonly TimeSpan _signOfLifeFrequency;
private readonly TimeSpan _delayStartUp;
private readonly UnhandledExceptionHandler _unhandledExceptionHandler;
private readonly ShutdownCoordinator _shutdownCoordinator;

private readonly AsyncSemaphore _asyncSemaphore;
private readonly AsyncSemaphore _maxParallelismSemaphore;
private readonly HashSet<FunctionInstanceId> _toBeExecuted = new();
private readonly object _sync = new();

public CrashedWatchdog(
FunctionTypeId functionTypeId,
IFunctionStore functionStore,
ScheduleReInvocation reInvoke,
AsyncSemaphore asyncSemaphore,
ReInvoke reInvoke,
AsyncSemaphore maxParallelismSemaphore,
TimeSpan signOfLifeFrequency,
TimeSpan delayStartUp,
UnhandledExceptionHandler unhandledExceptionHandler,
Expand All @@ -36,7 +36,7 @@ public CrashedWatchdog(
_functionTypeId = functionTypeId;
_functionStore = functionStore;
_reInvoke = reInvoke;
_asyncSemaphore = asyncSemaphore;
_maxParallelismSemaphore = maxParallelismSemaphore;
_signOfLifeFrequency = signOfLifeFrequency;
_delayStartUp = delayStartUp;
_unhandledExceptionHandler = unhandledExceptionHandler;
Expand Down Expand Up @@ -76,12 +76,10 @@ public async Task Start()
private async Task ReInvokeCrashedFunction(StoredExecutingFunction sef)
{
lock (_sync)
if (_toBeExecuted.Contains(sef.InstanceId))
if (!_toBeExecuted.Add(sef.InstanceId))
return;
else
_toBeExecuted.Add(sef.InstanceId);

using var @lock = await _asyncSemaphore.Take();

using var @lock = await _maxParallelismSemaphore.Take();

if (_shutdownCoordinator.ShutdownInitiated || sef.LeaseExpiration > DateTime.UtcNow.Ticks) return;
try
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.CoreRuntime.Invocation;
using Cleipnir.ResilientFunctions.Domain;
Expand All @@ -13,17 +14,20 @@ internal class PostponedWatchdog
private readonly IFunctionStore _functionStore;
private readonly UnhandledExceptionHandler _unhandledExceptionHandler;
private readonly ShutdownCoordinator _shutdownCoordinator;
private readonly ScheduleReInvocation _reInvoke;
private readonly AsyncSemaphore _asyncSemaphore;
private readonly ReInvoke _reInvoke;
private readonly AsyncSemaphore _maxParallelismSemaphore;
private readonly TimeSpan _postponedCheckFrequency;
private readonly TimeSpan _delayStartUp;
private readonly FunctionTypeId _functionTypeId;

private readonly HashSet<FunctionInstanceId> _toBeExecuted = new();
private readonly object _sync = new();

public PostponedWatchdog(
FunctionTypeId functionTypeId,
IFunctionStore functionStore,
ScheduleReInvocation reInvoke,
AsyncSemaphore asyncSemaphore,
ReInvoke reInvoke,
AsyncSemaphore maxParallelismSemaphore,
TimeSpan postponedCheckFrequency,
TimeSpan delayStartUp,
UnhandledExceptionHandler unhandledExceptionHandler,
Expand All @@ -34,7 +38,7 @@ public PostponedWatchdog(
_unhandledExceptionHandler = unhandledExceptionHandler;
_shutdownCoordinator = shutdownCoordinator;
_reInvoke = reInvoke;
_asyncSemaphore = asyncSemaphore;
_maxParallelismSemaphore = maxParallelismSemaphore;
_postponedCheckFrequency = postponedCheckFrequency;
_delayStartUp = delayStartUp;
}
Expand Down Expand Up @@ -73,6 +77,10 @@ public async Task Start()

private async Task SleepAndThenReInvoke(StoredPostponedFunction spf, DateTime now)
{
lock (_sync)
if (!_toBeExecuted.Add(spf.InstanceId))
return;

var functionId = new FunctionId(_functionTypeId, spf.InstanceId);

var postponedUntil = new DateTime(spf.PostponedUntil, DateTimeKind.Utc);
Expand All @@ -81,7 +89,7 @@ private async Task SleepAndThenReInvoke(StoredPostponedFunction spf, DateTime no

if (_shutdownCoordinator.ShutdownInitiated) return;

using var @lock = await _asyncSemaphore.Take();
using var @lock = await _maxParallelismSemaphore.Take();
try
{
while (DateTime.UtcNow < postponedUntil) //clock resolution means that we might wake up early
Expand All @@ -107,5 +115,10 @@ private async Task SleepAndThenReInvoke(StoredPostponedFunction spf, DateTime no
)
);
}
finally
{
lock (_sync)
_toBeExecuted.Remove(spf.InstanceId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ internal static class WatchDogsFactory
public static void CreateAndStart(
FunctionTypeId functionTypeId,
IFunctionStore functionStore,
ScheduleReInvocation reInvoke,
ReInvoke reInvoke,
SettingsWithDefaults settings,
ShutdownCoordinator shutdownCoordinator)
{
Expand Down
1 change: 1 addition & 0 deletions Core/Cleipnir.ResilientFunctions/Messaging/EventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Linq;
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.CoreRuntime;
using Cleipnir.ResilientFunctions.CoreRuntime.Invocation;
using Cleipnir.ResilientFunctions.CoreRuntime.ParameterSerialization;
using Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
using Cleipnir.ResilientFunctions.Domain;
Expand Down
4 changes: 2 additions & 2 deletions Core/Cleipnir.ResilientFunctions/RFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ public RFunc<TParam, TScrapbook, TReturn> RegisterFunc<TParam, TScrapbook, TRetu
WatchDogsFactory.CreateAndStart(
functionTypeId,
_functionStore,
reInvoke: (id, epoch) => rFuncInvoker.ReInvoke(id.ToString(), epoch),
rFuncInvoker.ReInvoke,
settingsWithDefaults,
_shutdownCoordinator
);
Expand Down Expand Up @@ -528,7 +528,7 @@ private RAction<TParam, TScrapbook> RegisterAction<TParam, TScrapbook>(
WatchDogsFactory.CreateAndStart(
functionTypeId,
_functionStore,
reInvoke: (id, epoch) => rActionInvoker.ReInvoke(id.ToString(), epoch),
rActionInvoker.ReInvoke,
settingsWithDefaults,
_shutdownCoordinator
);
Expand Down

0 comments on commit 34f7423

Please sign in to comment.