Skip to content

Commit

Permalink
Robustifying CrashedOrPostponedWatchdog
Browse files Browse the repository at this point in the history
  • Loading branch information
stidsborg committed Sep 15, 2024
1 parent a808463 commit 68be4a6
Showing 1 changed file with 27 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public void Register(
if (!isStarted)
Task.Run(Start);
}
public async Task Start()

private async Task Start()
{
await Task.Delay(_delayStartUp);

Expand All @@ -76,40 +76,48 @@ public async Task Start()
#endif

var flowsDictionary = _flowsDictionary;

foreach (var sef in eligibleFunctions.WithRandomOffset())
{
if (!_flowsDictionary.TryGetValue(sef.FlowId.Type, out var tuple))
if (!flowsDictionary.TryGetValue(sef.FlowId.Type, out var tuple))
continue;

var (restartFunction, scheduleRestart, asyncSemaphore) = tuple;
if (!asyncSemaphore.TryTake(out var takenLock))
break;
continue;

var runningFunction = _shutdownCoordinator.TryRegisterRunningFunction();
if (runningFunction == null)
{
takenLock.Dispose();
return;
}

var restartedFunction = await restartFunction(sef.FlowId, sef.Epoch);
if (restartedFunction == null)
{
runningFunction.Dispose();
takenLock.Dispose();
break;
}

await scheduleRestart(
sef.FlowId.Instance,
restartedFunction,
onCompletion: () =>
try
{
var restartedFunction = await restartFunction(sef.FlowId, sef.Epoch);
if (restartedFunction == null)
{
takenLock.Dispose();
runningFunction.Dispose();
takenLock.Dispose();
break;
}
);

await scheduleRestart(
sef.FlowId.Instance,
restartedFunction,
onCompletion: () =>
{
takenLock.Dispose();
runningFunction.Dispose();
}
);
}
catch
{
runningFunction.Dispose();
takenLock.Dispose();
throw;
}
}

var timeElapsed = DateTime.UtcNow - now;
Expand Down

0 comments on commit 68be4a6

Please sign in to comment.