Skip to content

Commit

Permalink
Fixed Workflow delay issue
Browse files Browse the repository at this point in the history
  • Loading branch information
stidsborg committed Jan 11, 2025
1 parent fb028e6 commit e6a1e54
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,20 @@ public void Deconstruct(out Effect effect, out Messages messages, out States sta

public async Task RegisterCorrelation(string correlation) => await Correlations.Register(correlation);

public Task Delay(TimeSpan @for, bool suspend = true) => Delay(until: DateTime.UtcNow + @for, suspend);
public Task Delay(DateTime until, bool suspend = true)
public Task Delay(TimeSpan @for, bool suspend = true, string? effectId = null) => Delay(until: DateTime.UtcNow + @for, suspend, effectId);
public async Task Delay(DateTime until, bool suspend = true, string? effectId = null)
{
if (until <= DateTime.UtcNow)
return Task.CompletedTask;
effectId ??= $"Delay#{Effect.TakeNextImplicitId()}";
var systemEffectId = EffectId.CreateWithCurrentContext(effectId, EffectType.System);
until = await Effect.CreateOrGet(systemEffectId, until);
var delay = (until - DateTime.UtcNow).RoundUpToZero();

if (delay == TimeSpan.Zero)
return;

if (!suspend)
return Task.Delay((until - DateTime.UtcNow).RoundUpToZero());

throw new PostponeInvocationException(until);
await Task.Delay(delay);
else
throw new PostponeInvocationException(until);
}
}
10 changes: 3 additions & 7 deletions Core/Cleipnir.ResilientFunctions/Domain/Effect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using Cleipnir.ResilientFunctions.Domain.Exceptions;
using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands;
using Cleipnir.ResilientFunctions.Helpers;
using Cleipnir.ResilientFunctions.Messaging;
using Cleipnir.ResilientFunctions.Reactive.Utilities;
using Cleipnir.ResilientFunctions.Storage;

Expand Down Expand Up @@ -89,17 +88,17 @@ public async Task<bool> Mark(string id)
return true;
}

public async Task<T> CreateOrGet<T>(string id, T value)
public Task<T> CreateOrGet<T>(string id, T value) => CreateOrGet(CreateEffectId(id), value);
internal async Task<T> CreateOrGet<T>(EffectId effectId, T value)
{
var effectResults = await GetEffectResults();
var effectId = CreateEffectId(id);
lock (_sync)
{
if (effectResults.TryGetValue(effectId, out var existing) && existing.WorkStatus == WorkStatus.Completed)
return serializer.DeserializeEffectResult<T>(existing.Result!);

if (existing?.StoredException != null)
throw new EffectException(flowType, id, serializer.DeserializeException(existing.StoredException!));
throw new EffectException(flowType, effectId.Id, serializer.DeserializeException(existing.StoredException!));
}

var storedEffect = StoredEffect.CreateCompleted(effectId, serializer.SerializeEffectResult(value));
Expand Down Expand Up @@ -179,9 +178,6 @@ public async Task Capture(string id, Func<Task> work, ResiliencyLevel resiliency
public async Task<T> Capture<T>(string id, Func<Task<T>> work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce)
=> await InnerCapture(id, EffectType.Effect, work, resiliency, EffectContext.CurrentContext);

internal async Task SystemCapture(string id, Func<Task> work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce)
=> await InnerCapture(id, EffectType.System, work, resiliency, EffectContext.CurrentContext);

private async Task InnerCapture(string id, EffectType effectType, Func<Task> work, ResiliencyLevel resiliency, EffectContext effectContext)
{
Delimiters.EnsureNoUnitSeparator(id);
Expand Down

0 comments on commit e6a1e54

Please sign in to comment.