Skip to content

Commit

Permalink
Remove eager sync on construction of Correlations
Browse files Browse the repository at this point in the history
  • Loading branch information
stidsborg committed Aug 1, 2024
1 parent e2bcb7c commit c01f923
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1521,14 +1521,14 @@ protected async Task CorrelationsCanBeChanged(Task<IFunctionStore> storeTask)
var controlPanel = await registration.ControlPanel(flowInstance.Value);
controlPanel.ShouldNotBeNull();

controlPanel.Correlations.Contains("SomeCorrelation").ShouldBeTrue();
await controlPanel.Correlations.Contains("SomeCorrelation").ShouldBeTrueAsync();
await controlPanel.Correlations.Remove("SomeCorrelation");
await controlPanel.Correlations.Register("SomeNewCorrelation");

controlPanel = await registration.ControlPanel(flowInstance.Value);
controlPanel.ShouldNotBeNull();
controlPanel.Correlations.Contains("SomeCorrelation").ShouldBeFalse();
controlPanel.Correlations.Contains("SomeNewCorrelation").ShouldBeTrue();
await controlPanel.Correlations.Contains("SomeCorrelation").ShouldBeFalseAsync();
await controlPanel.Correlations.Contains("SomeNewCorrelation").ShouldBeTrueAsync();
}

public abstract Task DeleteRemovesFunctionFromAllStores();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,18 +406,10 @@ public async Task<States> CreateStates(FlowId flowId, string? defaultState, bool
return new States(flowId, defaultState, existingStoredStates, _functionStore, statesStore, serializer);
}

public async Task<Correlations> CreateCorrelations(FlowId flowId, bool sync)
public Correlations CreateCorrelations(FlowId flowId)
{
var correlationStore = _functionStore.CorrelationStore;
if (!sync)
return new Correlations(
flowId,
existingCorrelations: [],
correlationStore
);

var existingCorrelations = await correlationStore.GetCorrelations(flowId);
return new Correlations(flowId, existingCorrelations, correlationStore);
return new Correlations(flowId, correlationStore);
}

public async Task<ExistingStates> GetExistingStates(FlowId flowId, string? defaultState)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ await _invocationHelper.PersistFunctionInStore(

var effect = _invocationHelper.CreateEffect(flowId);
var states = await _invocationHelper.CreateStates(flowId, defaultState: null, sync: false);
var correlations = await _invocationHelper.CreateCorrelations(flowId, sync: false);
var correlations = _invocationHelper.CreateCorrelations(flowId);
var workflow = new Workflow(flowId, messages, effect, states, _utilities, correlations);

return new PreparedInvocation(
Expand Down Expand Up @@ -238,14 +238,14 @@ private async Task<PreparedReInvocation> PrepareForReInvocation(FlowId flowId, R
isWorkflowRunning: () => !isWorkflowRunningDisposable.Disposed
);
var statesTask = Task.Run(() => _invocationHelper.CreateStates(flowId, defaultState, sync: true));
var correlationsTask = Task.Run(() => _invocationHelper.CreateCorrelations(flowId, sync: true));
var correlations = _invocationHelper.CreateCorrelations(flowId);
var workflow = new Workflow(
flowId,
messages,
_invocationHelper.CreateEffect(flowId),
await statesTask,
_utilities,
await correlationsTask
correlations
);

return new PreparedReInvocation(
Expand Down
2 changes: 1 addition & 1 deletion Core/Cleipnir.ResilientFunctions/Domain/ControlPanel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public async Task Refresh()
Effects = await _invocationHelper.GetExistingEffects(FlowId);
States = await _invocationHelper.GetExistingStates(FlowId, sf.DefaultState);
Timeouts = await _invocationHelper.GetExistingTimeouts(FlowId);
Correlations = await _invocationHelper.CreateCorrelations(FlowId, sync: true);
Correlations = _invocationHelper.CreateCorrelations(FlowId);

_innerParamChanged = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ await _invocationHelper.GetExistingEffects(functionId),
await _invocationHelper.GetExistingStates(functionId, functionState.DefaultState),
await _invocationHelper.GetExistingMessages(functionId),
await _invocationHelper.GetExistingTimeouts(functionId),
await _invocationHelper.CreateCorrelations(functionId, sync: true),
_invocationHelper.CreateCorrelations(functionId),
functionState.PreviouslyThrownException
);
}
Expand Down Expand Up @@ -76,7 +76,7 @@ await _invocationHelper.GetExistingEffects(functionId),
await _invocationHelper.GetExistingStates(functionId, functionState.DefaultState),
await _invocationHelper.GetExistingMessages(functionId),
await _invocationHelper.GetExistingTimeouts(functionId),
await _invocationHelper.CreateCorrelations(functionId, sync: true),
_invocationHelper.CreateCorrelations(functionId),
functionState.PreviouslyThrownException
);
}
Expand Down Expand Up @@ -116,7 +116,7 @@ await _invocationHelper.GetExistingEffects(functionId),
await _invocationHelper.GetExistingStates(functionId, f.DefaultState),
await _invocationHelper.GetExistingMessages(functionId),
await _invocationHelper.GetExistingTimeouts(functionId),
await _invocationHelper.CreateCorrelations(functionId, sync: true),
_invocationHelper.CreateCorrelations(functionId),
f.PreviouslyThrownException
);
}
Expand Down
48 changes: 28 additions & 20 deletions Core/Cleipnir.ResilientFunctions/Domain/Correlations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,51 +5,59 @@

namespace Cleipnir.ResilientFunctions.Domain;

public class Correlations
public class Correlations(FlowId flowId, ICorrelationStore correlationStore)
{
private readonly FlowId _flowId;
private readonly HashSet<string> _registered;
private readonly ICorrelationStore _correlationStore;
private HashSet<string>? _correlations;
private readonly object _sync = new();

public Correlations(
FlowId flowId,
IEnumerable<string> existingCorrelations,
ICorrelationStore correlationStore
)
private async Task<HashSet<string>> GetCorrelations()
{
_flowId = flowId;
_registered = existingCorrelations.ToHashSet();
_correlationStore = correlationStore;
lock (_sync)
if (_correlations is not null)
return _correlations;

var correlations = (await correlationStore.GetCorrelations(flowId))
.ToHashSet();

lock (_sync)
if (_correlations is null)
return _correlations = correlations;
else
return _correlations;
}

public async Task Register(string correlation)
{
var registered = await GetCorrelations();

lock (_sync)
if (_registered.Contains(correlation))
if (registered.Contains(correlation))
return;

await _correlationStore.SetCorrelation(_flowId, correlation);
await correlationStore.SetCorrelation(flowId, correlation);

lock (_sync)
_registered.Add(correlation);
registered.Add(correlation);
}

public bool Contains(string correlation)
public async Task<bool> Contains(string correlation)
{
var registered = await GetCorrelations();
lock (_sync)
return _registered.Contains(correlation);
return registered.Contains(correlation);
}

public async Task Remove(string correlation)
{
var registered = await GetCorrelations();

lock (_sync)
if (!_registered.Contains(correlation))
if (!registered.Contains(correlation))
return;

await _correlationStore.RemoveCorrelation(_flowId, correlation);
await correlationStore.RemoveCorrelation(flowId, correlation);

lock (_sync)
_registered.Remove(correlation);
registered.Remove(correlation);
}
}

0 comments on commit c01f923

Please sign in to comment.