diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs index faa12bf3..5cea3e27 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs @@ -46,15 +46,20 @@ public void Deconstruct(out Effect effect, out Messages messages, out States sta public async Task RegisterCorrelation(string correlation) => await Correlations.Register(correlation); - public Task Delay(TimeSpan @for, bool suspend = true) => Delay(until: DateTime.UtcNow + @for, suspend); - public Task Delay(DateTime until, bool suspend = true) + public Task Delay(TimeSpan @for, bool suspend = true, string? effectId = null) => Delay(until: DateTime.UtcNow + @for, suspend, effectId); + public async Task Delay(DateTime until, bool suspend = true, string? effectId = null) { - if (until <= DateTime.UtcNow) - return Task.CompletedTask; + effectId ??= $"Delay#{Effect.TakeNextImplicitId()}"; + var systemEffectId = EffectId.CreateWithCurrentContext(effectId, EffectType.System); + until = await Effect.CreateOrGet(systemEffectId, until); + var delay = (until - DateTime.UtcNow).RoundUpToZero(); + if (delay == TimeSpan.Zero) + return; + if (!suspend) - return Task.Delay((until - DateTime.UtcNow).RoundUpToZero()); - - throw new PostponeInvocationException(until); + await Task.Delay(delay); + else + throw new PostponeInvocationException(until); } } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs b/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs index c2110621..c04b6466 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs @@ -8,7 +8,6 @@ using Cleipnir.ResilientFunctions.Domain.Exceptions; using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands; using Cleipnir.ResilientFunctions.Helpers; -using Cleipnir.ResilientFunctions.Messaging; using Cleipnir.ResilientFunctions.Reactive.Utilities; using Cleipnir.ResilientFunctions.Storage; @@ -89,17 +88,17 @@ public async Task Mark(string id) return true; } - public async Task CreateOrGet(string id, T value) + public Task CreateOrGet(string id, T value) => CreateOrGet(CreateEffectId(id), value); + internal async Task CreateOrGet(EffectId effectId, T value) { var effectResults = await GetEffectResults(); - var effectId = CreateEffectId(id); lock (_sync) { if (effectResults.TryGetValue(effectId, out var existing) && existing.WorkStatus == WorkStatus.Completed) return serializer.DeserializeEffectResult(existing.Result!); if (existing?.StoredException != null) - throw new EffectException(flowType, id, serializer.DeserializeException(existing.StoredException!)); + throw new EffectException(flowType, effectId.Id, serializer.DeserializeException(existing.StoredException!)); } var storedEffect = StoredEffect.CreateCompleted(effectId, serializer.SerializeEffectResult(value)); @@ -179,9 +178,6 @@ public async Task Capture(string id, Func work, ResiliencyLevel resiliency public async Task Capture(string id, Func> work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce) => await InnerCapture(id, EffectType.Effect, work, resiliency, EffectContext.CurrentContext); - internal async Task SystemCapture(string id, Func work, ResiliencyLevel resiliency = ResiliencyLevel.AtLeastOnce) - => await InnerCapture(id, EffectType.System, work, resiliency, EffectContext.CurrentContext); - private async Task InnerCapture(string id, EffectType effectType, Func work, ResiliencyLevel resiliency, EffectContext effectContext) { Delimiters.EnsureNoUnitSeparator(id);