Skip to content

Commit

Permalink
Removed StateStore
Browse files Browse the repository at this point in the history
  • Loading branch information
stidsborg committed Oct 14, 2024
1 parent f479c20 commit 0fd1a96
Show file tree
Hide file tree
Showing 35 changed files with 125 additions and 613 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ public class LeaseUpdaterTestFunctionStore : IFunctionStore

public IMessageStore MessageStore => _inner.MessageStore;
public IEffectsStore EffectsStore => _inner.EffectsStore;
public IStatesStore StatesStore => _inner.StatesStore;
public ITimeoutStore TimeoutStore => _inner.TimeoutStore;
public ICorrelationStore CorrelationStore => _inner.CorrelationStore;
public Utilities Utilities => _inner.Utilities;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,28 @@ protected async Task SunshineScenarioTest(Task<IEffectsStore> storeTask)
);
await store
.GetEffectResults(functionId)
.ToListAsync()
.SelectAsync(l => l.Any())
.ShouldBeFalseAsync();

await store.SetEffectResult(functionId, storedEffect1);

var storedEffects = await store
.GetEffectResults(functionId)
.ToListAsync();
.GetEffectResults(functionId);
storedEffects.Count.ShouldBe(1);
var se = storedEffects[0];
se.ShouldBe(storedEffect1);

await store.SetEffectResult(functionId, storedEffect2);
storedEffects = await store.GetEffectResults(functionId).ToListAsync();
storedEffects = await store.GetEffectResults(functionId);
storedEffects.Count.ShouldBe(2);
storedEffects[0].ShouldBe(storedEffect1);
storedEffects[1].ShouldBe(storedEffect2);

await store.SetEffectResult(functionId, storedEffect2);
await store.GetEffectResults(functionId).ToListAsync();
await store.GetEffectResults(functionId);

await store.SetEffectResult(functionId, storedEffect2);
storedEffects = await store.GetEffectResults(functionId).ToListAsync();
storedEffects = await store.GetEffectResults(functionId);
storedEffects.Count.ShouldBe(2);
storedEffects[0].ShouldBe(storedEffect1);
storedEffects[1].ShouldBe(storedEffect2);
Expand Down Expand Up @@ -147,12 +145,12 @@ await store
.ShouldBeTrueAsync();

await store.DeleteEffectResult(functionId, storedEffect2.EffectId, isState: false);
var storedEffects = await store.GetEffectResults(functionId).ToListAsync();
var storedEffects = await store.GetEffectResults(functionId);
storedEffects.Count.ShouldBe(1);
storedEffects[0].EffectId.ShouldBe(storedEffect1.EffectId);

await store.DeleteEffectResult(functionId, storedEffect2.EffectId, isState: false);
storedEffects = await store.GetEffectResults(functionId).ToListAsync();
storedEffects = await store.GetEffectResults(functionId);
storedEffects.Count.ShouldBe(1);
storedEffects[0].EffectId.ShouldBe(storedEffect1.EffectId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ await store
.ShouldBeAsync(0);

await store
.StatesStore
.GetStates(functionId)
.EffectsStore
.GetEffectResults(functionId)
.SelectAsync(states => states.Count())
.ShouldBeAsync(0);

Expand Down Expand Up @@ -123,8 +123,8 @@ await store
.ShouldBeAsync(0);

await store
.StatesStore
.GetStates(functionId)
.EffectsStore
.GetEffectResults(functionId)
.SelectAsync(states => states.Count())
.ShouldBeAsync(0);

Expand Down Expand Up @@ -1657,10 +1657,6 @@ await store.TimeoutStore.GetTimeouts(functionId)
.SelectAsync(ts => ts.Any())
.ShouldBeFalseAsync();

await store.StatesStore.GetStates(functionId)
.SelectAsync(s => s.Any())
.ShouldBeFalseAsync();

await store.CorrelationStore.GetCorrelations(functionId)
.SelectAsync(c => c.Any())
.ShouldBeFalseAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ await store
var storedFunction = await store.GetFunction(functionId);
storedFunction.ShouldNotBeNull();
storedFunction.Status.ShouldBe(Status.Succeeded);
var effects = await store.StatesStore.GetStates(functionId);
var stateResult = effects.Single(e => e.StateId == "State").StateJson;
var effects = await store.EffectsStore.GetEffectResults(functionId);
var stateResult = effects.Single(e => e.EffectId == "State").Result!;
stateResult.ShouldNotBeNull();
stateResult.DeserializeFromJsonTo<State>().Value.ShouldBe(1);
await rFunc(flowInstance.Value, param).ShouldBeAsync("TEST");
Expand Down Expand Up @@ -248,9 +248,9 @@ await store
var storedFunction = await store.GetFunction(functionId);
storedFunction.ShouldNotBeNull();
storedFunction.Status.ShouldBe(Status.Succeeded);
var effects = await store.StatesStore.GetStates(functionId);
var state = effects.Single(e => e.StateId == "State").StateJson;
state.DeserializeFromJsonTo<State>().Value.ShouldBe(1);
var effects = await store.EffectsStore.GetEffectResults(functionId);
var state = effects.Single(e => e.EffectId == "State").Result;
state!.DeserializeFromJsonTo<State>().Value.ShouldBe(1);
await rAction(flowInstance.Value, param);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.CoreRuntime.Invocation;
Expand Down Expand Up @@ -319,7 +320,7 @@ public async Task EffectsCrudTest(Task<IFunctionStore> storeTask)
var flowId = TestFlowId.Create();
var effect = new Effect(
flowId,
existingEffectsFunc: () => store.EffectsStore.GetEffectResults(flowId),
lazyExstingEffects: new Lazy<Task<IReadOnlyList<StoredEffect>>>(() => store.EffectsStore.GetEffectResults(flowId)),
store.EffectsStore,
DefaultSerializer.Instance
);
Expand Down Expand Up @@ -355,11 +356,15 @@ public async Task ExistingEffectsFuncIsOnlyInvokedAfterGettingValue(Task<IFuncti

var effect = new Effect(
flowId,
existingEffectsFunc: () =>
{
syncedCounter.Increment();
return Enumerable.Empty<StoredEffect>().ToTask();
},
lazyExstingEffects: new Lazy<Task<IReadOnlyList<StoredEffect>>>(
() =>
{
syncedCounter.Increment();
return new List<StoredEffect>()
.CastTo<IReadOnlyList<StoredEffect>>()
.ToTask();
})
,
store.EffectsStore,
DefaultSerializer.Instance
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ await BusyWait.Until(
var storedFunction = await store.GetFunction(functionId);
storedFunction.ShouldNotBeNull();

var states = await store.StatesStore.GetStates(functionId);
var state = states.Single(e => e.StateId == "State");
state.StateJson.DeserializeFromJsonTo<State>().Value.ShouldBe(1);
var states = await store.EffectsStore.GetEffectResults(functionId);
var state = states.Single(e => e.EffectId == "State");
state.Result!.DeserializeFromJsonTo<State>().Value.ShouldBe(1);

await rFunc(param, param).ShouldBeAsync("TEST");
unhandledExceptionHandler.ShouldNotHaveExceptions();
Expand Down Expand Up @@ -243,9 +243,9 @@ await Should.ThrowAsync<InvocationPostponedException>(() =>
var storedFunction = await store.GetFunction(functionId);
storedFunction.ShouldNotBeNull();

var states = await store.StatesStore.GetStates(functionId);
var state = states.Single(e => e.StateId == "State");
state.StateJson.DeserializeFromJsonTo<State>().Value.ShouldBe(1);
var states = await store.EffectsStore.GetEffectResults(functionId);
var state = states.Single(e => e.EffectId == "State");
state.Result!.DeserializeFromJsonTo<State>().Value.ShouldBe(1);

await rFunc(flowInstance.Value, param);
unhandledExceptionHandler.ShouldNotHaveExceptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ await BusyWait.Until(
var function = await store.GetFunction(functionId);
function.ShouldNotBeNull();
function.Status.ShouldBe(Status.Succeeded);
var states = await store.StatesStore.GetStates(functionId);
var state = states.Single(e => e.StateId == "State").StateJson.DeserializeFromJsonTo<ListState<string>>();
var states = await store.EffectsStore.GetEffectResults(functionId);
var state = states.Single(e => e.EffectId == "State").Result!.DeserializeFromJsonTo<ListState<string>>();
state.List.Single().ShouldBe("world");

unhandledExceptionCatcher.ShouldNotHaveExceptions();
Expand Down Expand Up @@ -219,8 +219,8 @@ await BusyWait.Until(
function.ShouldNotBeNull();
function.Status.ShouldBe(Status.Succeeded);
function.Result!.DeserializeFromJsonTo<string>().ShouldBe("something");
var states = await store.StatesStore.GetStates(functionId);
var state = states.Single(e => e.StateId == "State").StateJson.DeserializeFromJsonTo<ListState<string>>();
var states = await store.EffectsStore.GetEffectResults(functionId);
var state = states.Single(e => e.EffectId == "State").Result!.DeserializeFromJsonTo<ListState<string>>();
state.List.Single().ShouldBe("world");

unhandledExceptionCatcher.ShouldNotHaveExceptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ protected async Task SunshineScenarioNullReturningFuncWithState(Task<IFunctionSt
.GetFunction(functionId)
.ShouldNotBeNullAsync();

var states = await store.StatesStore.GetStates(functionId);
var state = states.Single(e => e.StateId == "State").StateJson!.DeserializeFromJsonTo<ListState<string>>();
var states = await store.EffectsStore.GetEffectResults(functionId);
var state = states.Single(e => e.EffectId == "State").Result!.DeserializeFromJsonTo<ListState<string>>();
state.List.Single().ShouldBe("hello world");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,37 @@ public abstract class StatesStoreTests
public abstract Task SunshineScenario();
public async Task SunshineScenario(Task<IFunctionStore> storeTask)
{
var statesStore = await storeTask.SelectAsync(s => s.StatesStore);
var functionId = TestFlowId.Create();
var statesStore = await storeTask.SelectAsync(s => s.EffectsStore);
var flowId = TestFlowId.Create();

var initialStates = await statesStore.GetStates(functionId);
var initialStates = await statesStore.GetEffectResults(flowId);
initialStates.ShouldBeEmpty();

await statesStore.UpsertState(functionId, new StoredState("Id#1", "SomeJson#1"));
await statesStore.UpsertState(functionId, new StoredState("Id#2", "SomeJson#2"));
await statesStore.SetEffectResult(
flowId,
StoredEffect.CreateState(new StoredState("Id#1", "SomeJson#1"))
);
await statesStore.SetEffectResult(
flowId,
StoredEffect.CreateState(new StoredState("Id#2", "SomeJson#2"))
);

var states = await statesStore.GetStates(functionId).ToListAsync();
var states = await statesStore.GetEffectResults(flowId);
states.Count.ShouldBe(2);

var state1 = states.Single(s => s.StateId == "Id#1");
state1.StateJson.ShouldBe("SomeJson#1");
var state1 = states.Single(s => s.EffectId == "Id#1");
state1.IsState.ShouldBeTrue();
state1.Result.ShouldBe("SomeJson#1");

var state2 = states.Single(s => s.StateId == "Id#2");
state2.StateJson.ShouldBe("SomeJson#2");
var state2 = states.Single(s => s.EffectId == "Id#2");
state2.IsState.ShouldBeTrue();
state2.Result.ShouldBe("SomeJson#2");

await statesStore.RemoveState(functionId, state1.StateId);
await statesStore.DeleteEffectResult(flowId, state1.EffectId, isState: true);

states = await statesStore.GetStates(functionId).ToListAsync();
states = await statesStore.GetEffectResults(flowId);
states.Count.ShouldBe(1);
state2 = states.Single(s => s.StateId == "Id#2");
state2.StateJson.ShouldBe("SomeJson#2");
state2 = states.Single(s => s.EffectId == "Id#2");
state2.Result.ShouldBe("SomeJson#2");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ await store.CreateFunction(
timestamp: DateTime.UtcNow.Ticks
).ShouldBeTrueAsync();

await store.StatesStore.UpsertState(functionId, new StoredState("SomeStateId", "SomeStateJson"));
await store.EffectsStore.SetEffectResult(
functionId,
StoredEffect.CreateState(new StoredState("SomeStateId", "SomeStateJson"))
);
await store.CorrelationStore.SetCorrelation(functionId, "SomeCorrelationId");
await store.EffectsStore.SetEffectResult(
functionId,
Expand All @@ -162,7 +165,6 @@ await store.TimeoutStore.UpsertTimeout(
await store.DeleteFunction(functionId);

await store.GetFunction(functionId).ShouldBeNullAsync();
await store.StatesStore.GetStates(functionId).ShouldBeEmptyAsync();
await store.CorrelationStore.GetCorrelations(functionId).ShouldBeEmptyAsync();
await store.EffectsStore.GetEffectResults(functionId).ShouldBeEmptyAsync();
await store.MessageStore.GetMessages(functionId, skip: 0).ShouldBeEmptyAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ public class CrashableFunctionStore : IFunctionStore
public SyncedFlag AfterPostponeFunctionFlag { get; } = new();
public IMessageStore MessageStore => _crashed ? throw new TimeoutException() : _inner.MessageStore;
public IEffectsStore EffectsStore => _crashed ? throw new TimeoutException() : _inner.EffectsStore;
public IStatesStore StatesStore => _crashed ? throw new TimeoutException() : _inner.StatesStore;
public ITimeoutStore TimeoutStore => _crashed ? throw new TimeoutException() : _inner.TimeoutStore;
public ICorrelationStore CorrelationStore => _crashed ? throw new TimeoutException() : _inner.CorrelationStore;
public Utilities Utilities => _crashed ? throw new TimeoutException() : _inner.Utilities;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,9 @@ await store.GetFunction(functionId).Map(sf => sf!.Status) == Status.Succeeded,

var storedFunction = await store.GetFunction(functionId);
storedFunction!.Result!.DeserializeFromJsonTo<string>().ShouldBe($"{param.Id}-{param.Value}");
var states = await store.StatesStore.GetStates(functionId);
states.Single(e => e.StateId == "Scraps")
.StateJson
var states = await store.EffectsStore.GetEffectResults(functionId);
states.Single(e => e.EffectId == "Scraps")
.Result!
.DeserializeFromJsonTo<ListState>()
.Scraps
.ShouldBe(new [] {1,2,3,4});
Expand Down Expand Up @@ -508,9 +508,9 @@ await store.GetFunction(functionId).Map(sf => sf!.Status) == Status.Succeeded
paramTcs.Task.Result.ShouldBe(param);

var storedFunction = await store.GetFunction(functionId);
var states = await store.StatesStore.GetStates(functionId);
states.Single(e => e.StateId == "Scraps")
.StateJson
var states = await store.EffectsStore.GetEffectResults(functionId);
states.Single(e => e.EffectId == "Scraps")
.Result!
.DeserializeFromJsonTo<ListState>()
.Scraps
.ShouldBe(new[] {1, 2, 3, 4});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,30 +376,29 @@ public Messages CreateMessages(FlowId flowId, ScheduleReInvocation scheduleReInv

return new Messages(messageWriter, registeredTimeouts, messagesPullerAndEmitter);
}

public Effect CreateEffect(FlowId flowId)
public Tuple<Effect, States> CreateEffectAndStates(FlowId flowId, string? defaultState)
{
var effectsStore = _functionStore.EffectsStore;
return new Effect(

var lazyEffects = new Lazy<Task<IReadOnlyList<StoredEffect>>>(effectsStore.GetEffectResults(flowId));
var states = new States(
flowId,
() => effectsStore.GetEffectResults(flowId),
defaultState,
_functionStore,
effectsStore,
lazyEffects,
_settings.Serializer
);
}

public States CreateStates(FlowId flowId, string? defaultState)
{
var statesStore = _functionStore.StatesStore;
var serializer = _settings.Serializer;

return new States(

var effect = new Effect(
flowId,
defaultState,
_functionStore,
statesStore,
serializer
lazyEffects,
effectsStore,
_settings.Serializer
);

return Tuple.Create(effect, states);
}

public Correlations CreateCorrelations(FlowId flowId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,7 @@ await _invocationHelper.PersistFunctionInStore(
interruptCount
);

var effect = _invocationHelper.CreateEffect(flowId);
var states = _invocationHelper.CreateStates(flowId, defaultState: null);
var (effect, states) = _invocationHelper.CreateEffectAndStates(flowId, defaultState: null);
var correlations = _invocationHelper.CreateCorrelations(flowId);
var workflow = new Workflow(flowId, messages, effect, states, _utilities, correlations, interruptCount);

Expand Down Expand Up @@ -243,13 +242,14 @@ private async Task<PreparedReInvocation> PrepareForReInvocation(FlowId flowId, R
isWorkflowRunning: () => !isWorkflowRunningDisposable.Disposed,
interruptCount
);
var states = _invocationHelper.CreateStates(flowId, defaultState);

var (effect, states) = _invocationHelper.CreateEffectAndStates(flowId, defaultState);
var correlations = _invocationHelper.CreateCorrelations(flowId);

var workflow = new Workflow(
flowId,
messages,
_invocationHelper.CreateEffect(flowId),
effect,
states,
_utilities,
correlations,
Expand Down
Loading

0 comments on commit 0fd1a96

Please sign in to comment.