Skip to content

Commit

Permalink
Remove eager sync on construction of ExistingEffects
Browse files Browse the repository at this point in the history
  • Loading branch information
stidsborg committed Aug 2, 2024
1 parent 6a786f5 commit 879477c
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ await BusyWait.Until(() =>
);

var controlPanel = await rAction.ControlPanel(flowInstance);
controlPanel!.Effects.GetValue<int>("id").ShouldBe(1);
await controlPanel!.Effects.GetValue<int>("id").ShouldBeAsync(1);
counter.Current.ShouldBe(2);
}

Expand Down Expand Up @@ -189,7 +189,7 @@ await workflow.Effect
await controlPanel.Refresh();

var value = controlPanel.Effects.GetValue<string>("someId");
value.ShouldBe("hello world");
await value.ShouldBeAsync("hello world");
counter.Current.ShouldBe(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ await workflow.Effect
await controlPanel.Refresh();

var value = controlPanel.Effects.GetValue<string>("someId");
value.ShouldBe("hello world");
await value.ShouldBeAsync("hello world");
}

public abstract Task CompletedAtMostOnceWorkWithCallIdAndGenericResultIsNotExecutedMultipleTimes();
Expand Down Expand Up @@ -214,6 +214,6 @@ await workflow.Effect
await controlPanel.Refresh();

var value = controlPanel.Effects.GetValue<Person>("someId");
value.ShouldBe(new Person("Peter", 32));
await value.ShouldBeAsync(new Person("Peter", 32));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1270,12 +1270,12 @@ protected async Task EffectsAreUpdatedAfterRefresh(Task<IFunctionStore> storeTas

var secondControlPanel = await rAction.ControlPanel(flowInstance.Value);
secondControlPanel.ShouldNotBeNull();

await secondControlPanel.Effects.HasValue("Id").ShouldBeAsync(false);

await firstControlPanel.Effects.SetSucceeded("Id", "SomeResult");

secondControlPanel.Effects.HasValue("Id").ShouldBe(false);
await secondControlPanel.Refresh();
secondControlPanel.Effects.GetValue<string>("Id").ShouldBe("SomeResult");
await secondControlPanel.Effects.GetValue<string>("Id").ShouldBeAsync("SomeResult");

unhandledExceptionCatcher.ShouldNotHaveExceptions();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ async Task (string param, Workflow workflow) =>
await controlPanel.Restart();

await controlPanel.Refresh();
controlPanel.Effects.All.Count.ShouldBe(0);
(await controlPanel.Effects).All.Count.ShouldBe(0);
}

public abstract Task EffectsCrudTest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,6 @@ await Should.ThrowAsync<FunctionInvocationPostponedException>(
controlPanel.ShouldNotBeNull();

var delay = controlPanel.Effects.GetValue<DateTime>("Delay");
delay.ShouldBe(tomorrow);
await delay.ShouldBeAsync(tomorrow);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ await _functionStore.StatesStore.GetStates(flowId),
_settings.Serializer
);

public async Task<ExistingEffects> GetExistingEffects(FlowId flowId)
public async Task<ExistingEffects> CreateExistingEffects(FlowId flowId)
{
var effectsStore = _functionStore.EffectsStore;
var existingEffects = await effectsStore.GetEffectResults(flowId);
Expand Down
17 changes: 7 additions & 10 deletions Core/Cleipnir.ResilientFunctions/Domain/ControlPanel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ internal ControlPanel(
InvocationHelper<Unit, Unit> invocationHelper,
FlowId flowId,
Status status, int epoch, long leaseExpiration,
DateTime? postponedUntil, ExistingEffects effects,
DateTime? postponedUntil,
ExistingStates states, ExistingMessages messages, ExistingTimeouts timeouts, Correlations correlations,
PreviouslyThrownException? previouslyThrownException
) : base(
invoker, invocationHelper, flowId, status, epoch,
leaseExpiration, innerParam: Unit.Instance, innerResult: Unit.Instance, postponedUntil,
effects, states, messages, timeouts, correlations, previouslyThrownException
states, messages, timeouts, correlations, previouslyThrownException
) { }

public Task Succeed() => InnerSucceed(result: Unit.Instance);
Expand All @@ -32,13 +32,13 @@ internal ControlPanel(
InvocationHelper<TParam, Unit> invocationHelper,
FlowId flowId,
Status status, int epoch, long leaseExpiration, TParam innerParam,
DateTime? postponedUntil, ExistingEffects effects,
DateTime? postponedUntil,
ExistingStates states, ExistingMessages messages, ExistingTimeouts timeouts, Correlations correlations,
PreviouslyThrownException? previouslyThrownException
) : base(
invoker, invocationHelper, flowId, status, epoch,
leaseExpiration, innerParam, innerResult: Unit.Instance, postponedUntil,
effects, states, messages, timeouts, correlations, previouslyThrownException
states, messages, timeouts, correlations, previouslyThrownException
) { }

public TParam Param
Expand All @@ -58,11 +58,11 @@ internal ControlPanel(
FlowId flowId, Status status, int epoch,
long leaseExpiration, TParam innerParam,
TReturn? innerResult,
DateTime? postponedUntil, ExistingEffects effects, ExistingStates states, ExistingMessages messages,
DateTime? postponedUntil, ExistingStates states, ExistingMessages messages,
ExistingTimeouts timeouts, Correlations correlations, PreviouslyThrownException? previouslyThrownException
) : base(
invoker, invocationHelper, flowId, status, epoch, leaseExpiration,
innerParam, innerResult, postponedUntil, effects, states, messages,
innerParam, innerResult, postponedUntil, states, messages,
timeouts, correlations, previouslyThrownException
) { }

Expand Down Expand Up @@ -92,7 +92,6 @@ internal BaseControlPanel(
TParam innerParam,
TReturn? innerResult,
DateTime? postponedUntil,
ExistingEffects effects,
ExistingStates states,
ExistingMessages messages,
ExistingTimeouts timeouts,
Expand All @@ -109,7 +108,6 @@ internal BaseControlPanel(
_innerParam = innerParam;
InnerResult = innerResult;
PostponedUntil = postponedUntil;
Effects = effects;
States = states;
Messages = messages;
Timeouts = timeouts;
Expand All @@ -124,7 +122,7 @@ internal BaseControlPanel(
public DateTime LeaseExpiration { get; private set; }

public ExistingMessages Messages { get; private set; }
public ExistingEffects Effects { get; private set; }
public Task<ExistingEffects> Effects => _invocationHelper.CreateExistingEffects(FlowId);
public ExistingStates States { get; private set; }
public Correlations Correlations { get; private set; }

Expand Down Expand Up @@ -247,7 +245,6 @@ public async Task Refresh()
PostponedUntil = sf.PostponedUntil;
PreviouslyThrownException = sf.PreviouslyThrownException;
Messages = await _invocationHelper.GetExistingMessages(FlowId);
Effects = await _invocationHelper.GetExistingEffects(FlowId);
States = await _invocationHelper.GetExistingStates(FlowId, sf.DefaultState);
Timeouts = await _invocationHelper.GetExistingTimeouts(FlowId);
Correlations = _invocationHelper.CreateCorrelations(FlowId);
Expand Down
45 changes: 21 additions & 24 deletions Core/Cleipnir.ResilientFunctions/Domain/ControlPanelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,23 @@ internal ControlPanelFactory(FlowType flowType, Invoker<Unit, Unit> invoker, Inv

public async Task<ControlPanel?> Create(FlowInstance flowInstance)
{
var functionId = new FlowId(_flowType, flowInstance);
var functionState = await _invocationHelper.GetFunction(functionId);
var flowId = new FlowId(_flowType, flowInstance);
var functionState = await _invocationHelper.GetFunction(flowId);
if (functionState == null)
return null;

return new ControlPanel(
_invoker,
_invocationHelper,
functionId,
flowId,
functionState.Status,
functionState.Epoch,
functionState.LeaseExpiration,
functionState.PostponedUntil,
await _invocationHelper.GetExistingEffects(functionId),
await _invocationHelper.GetExistingStates(functionId, functionState.DefaultState),
await _invocationHelper.GetExistingMessages(functionId),
await _invocationHelper.GetExistingTimeouts(functionId),
_invocationHelper.CreateCorrelations(functionId),
await _invocationHelper.GetExistingStates(flowId, functionState.DefaultState),
await _invocationHelper.GetExistingMessages(flowId),
await _invocationHelper.GetExistingTimeouts(flowId),
_invocationHelper.CreateCorrelations(flowId),
functionState.PreviouslyThrownException
);
}
Expand All @@ -58,25 +57,24 @@ internal ControlPanelFactory(FlowType flowType, Invoker<TParam, Unit> invoker, I

public async Task<ControlPanel<TParam>?> Create(FlowInstance flowInstance)
{
var functionId = new FlowId(_flowType, flowInstance);
var functionState = await _invocationHelper.GetFunction(functionId);
var flowId = new FlowId(_flowType, flowInstance);
var functionState = await _invocationHelper.GetFunction(flowId);
if (functionState == null)
return null;

return new ControlPanel<TParam>(
_invoker,
_invocationHelper,
functionId,
flowId,
functionState.Status,
functionState.Epoch,
functionState.LeaseExpiration,
functionState.Param!,
functionState.PostponedUntil,
await _invocationHelper.GetExistingEffects(functionId),
await _invocationHelper.GetExistingStates(functionId, functionState.DefaultState),
await _invocationHelper.GetExistingMessages(functionId),
await _invocationHelper.GetExistingTimeouts(functionId),
_invocationHelper.CreateCorrelations(functionId),
await _invocationHelper.GetExistingStates(flowId, functionState.DefaultState),
await _invocationHelper.GetExistingMessages(flowId),
await _invocationHelper.GetExistingTimeouts(flowId),
_invocationHelper.CreateCorrelations(flowId),
functionState.PreviouslyThrownException
);
}
Expand All @@ -97,26 +95,25 @@ internal ControlPanelFactory(FlowType flowType, Invoker<TParam, TReturn> invoker

public async Task<ControlPanel<TParam, TReturn>?> Create(FlowInstance flowInstance)
{
var functionId = new FlowId(_flowType, flowInstance);
var f = await _invocationHelper.GetFunction(functionId);
var flowId = new FlowId(_flowType, flowInstance);
var f = await _invocationHelper.GetFunction(flowId);
if (f == null)
return null;

return new ControlPanel<TParam, TReturn>(
_invoker,
_invocationHelper,
functionId,
flowId,
f.Status,
f.Epoch,
f.LeaseExpiration,
f.Param!,
f.Result,
f.PostponedUntil,
await _invocationHelper.GetExistingEffects(functionId),
await _invocationHelper.GetExistingStates(functionId, f.DefaultState),
await _invocationHelper.GetExistingMessages(functionId),
await _invocationHelper.GetExistingTimeouts(functionId),
_invocationHelper.CreateCorrelations(functionId),
await _invocationHelper.GetExistingStates(flowId, f.DefaultState),
await _invocationHelper.GetExistingMessages(flowId),
await _invocationHelper.GetExistingTimeouts(flowId),
_invocationHelper.CreateCorrelations(flowId),
f.PreviouslyThrownException
);
}
Expand Down
27 changes: 27 additions & 0 deletions Core/Cleipnir.ResilientFunctions/Domain/ExistingEffects.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,31 @@ public Task SetSucceeded<TResult>(string effectId, TResult result)

public Task SetFailed(string effectId, Exception exception)
=> Set(new StoredEffect(effectId, WorkStatus.Failed, Result: null, StoredException: _serializer.SerializeException(exception)));
}

public static class ExistingEffectsExtensions
{
public static async Task<bool> HasValue(this Task<ExistingEffects> existingEffects, string effectId)
=> (await existingEffects).HasValue(effectId);

public static async Task<TResult?> GetValue<TResult>(this Task<ExistingEffects> existingEffects, string effectId)
=> (await existingEffects).GetValue<TResult>(effectId);

public static async Task Remove(this Task<ExistingEffects> existingEffects, string effectId)
=> await (await existingEffects).Remove(effectId);

public static async Task SetValue<TValue>(this Task<ExistingEffects> existingEffects, string effectId, TValue value)
=> await (await existingEffects).SetValue(effectId, value);

public static async Task SetStarted(this Task<ExistingEffects> existingEffects, string effectId)
=> await (await existingEffects).SetStarted(effectId);

public static async Task SetSucceeded(this Task<ExistingEffects> existingEffects, string effectId)
=> await (await existingEffects).SetSucceeded(effectId);

public static async Task SetSucceeded<TResult>(this Task<ExistingEffects> existingEffects, string effectId, TResult result)
=> await (await existingEffects).SetSucceeded(effectId, result);

public static async Task SetFailed(this Task<ExistingEffects> existingEffects, string effectId, Exception exception)
=> await (await existingEffects).SetFailed(effectId, exception);
}

0 comments on commit 879477c

Please sign in to comment.