Skip to content

Commit

Permalink
Non-taskify ControlPanel's Events
Browse files Browse the repository at this point in the history
  • Loading branch information
stidsborg committed Dec 15, 2023
1 parent ba09801 commit 107811a
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using Cleipnir.ResilientFunctions.Domain.Exceptions;
using Cleipnir.ResilientFunctions.Helpers;
using Cleipnir.ResilientFunctions.Messaging;
using Cleipnir.ResilientFunctions.Reactive;
using Cleipnir.ResilientFunctions.Reactive.Extensions;
using Cleipnir.ResilientFunctions.Storage;
using Cleipnir.ResilientFunctions.Tests.Utils;
Expand Down Expand Up @@ -754,7 +753,7 @@ async Task(string param, RScrapbook _, Context context) =>
await rAction.Invoke(functionInstanceId.Value, param: "param");

var controlPanel = await rAction.ControlPanel(functionInstanceId).ShouldNotBeNullAsync();
var existingEvents = await controlPanel.Events;
var existingEvents = controlPanel.Events;
existingEvents.Count().ShouldBe(1);
existingEvents[0].ShouldBe("param");
existingEvents[0] = "hello";
Expand Down Expand Up @@ -799,7 +798,7 @@ async Task(string param, RScrapbook _, Context context) =>

var controlPanel = await rAction.ControlPanel(functionInstanceId).ShouldNotBeNullAsync();
controlPanel.Status.ShouldBe(Status.Succeeded);
var existingEvents = await controlPanel.Events;
var existingEvents = controlPanel.Events;
existingEvents.Count().ShouldBe(2);
existingEvents.Clear();
existingEvents.EventsWithIdempotencyKeys.Add(new EventAndIdempotencyKey("hello to you", "1"));
Expand All @@ -810,7 +809,7 @@ async Task(string param, RScrapbook _, Context context) =>
await controlPanel.Refresh();
await controlPanel.ReInvoke();

(await controlPanel.Events).Count().ShouldBe(2);
controlPanel.Events.Count().ShouldBe(2);

syncedList.ShouldNotBeNull();
if (syncedList.Count != 2)
Expand Down Expand Up @@ -857,7 +856,7 @@ async Task(string param, RScrapbook _, Context context) =>
await controlPanel.SaveChanges();
await controlPanel.Refresh();

var events = (await controlPanel.Events).EventsWithIdempotencyKeys;
var events = controlPanel.Events.EventsWithIdempotencyKeys;
events.Count.ShouldBe(2);
events[0].Event.ShouldBe("hello world");
events[0].IdempotencyKey.ShouldBe("1");
Expand Down Expand Up @@ -886,7 +885,7 @@ protected async Task ConcurrentModificationOfExistingEventsCausesExceptionOnSave
await store.EventStore.AppendEvent(functionId, "hello world".ToJson(), typeof(string).SimpleQualifiedName());

var controlPanel = await rAction.ControlPanel(functionInstanceId).ShouldNotBeNullAsync();
var existingEvents = await controlPanel.Events;
var existingEvents = controlPanel.Events;
existingEvents.Count().ShouldBe(1);

await store.EventStore.AppendEvent(functionId, "hello universe".ToJson(), typeof(string).SimpleQualifiedName());
Expand Down Expand Up @@ -930,7 +929,7 @@ protected async Task ConcurrentModificationOfExistingEventsDoesNotCauseException
controlPanel.Epoch.ShouldBe(epoch);
controlPanel.Param.ShouldBe(param);

var events = await controlPanel.Events;
var events = controlPanel.Events;
events.Count().ShouldBe(2);
events[0].ShouldBe("hello world");
events[1].ShouldBe("hello universe");
Expand All @@ -957,7 +956,7 @@ protected async Task ConcurrentModificationOfExistingEventsCausesExceptionOnSave
await store.EventStore.AppendEvent(functionId, "hello world".ToJson(), typeof(string).SimpleQualifiedName());

var controlPanel = await rAction.ControlPanel(functionInstanceId).ShouldNotBeNullAsync();
var existingEvents = await controlPanel.Events;
var existingEvents = controlPanel.Events;
existingEvents.Count().ShouldBe(1);

await store.EventStore.AppendEvent(functionId, "hello universe".ToJson(), typeof(string).SimpleQualifiedName());
Expand Down Expand Up @@ -1001,7 +1000,7 @@ protected async Task ConcurrentModificationOfExistingEventsDoesNotCauseException
controlPanel.Epoch.ShouldBe(epoch);
controlPanel.Param.ShouldBe(param);

var events = await controlPanel.Events;
var events = controlPanel.Events;
events.Count().ShouldBe(2);
events[0].ShouldBe("hello world");
events[1].ShouldBe("hello universe");
Expand Down Expand Up @@ -1031,7 +1030,7 @@ await rAction.Invoke(
);

var controlPanel = await rAction.ControlPanel(functionInstanceId).ShouldNotBeNullAsync();
var existingEvents = await controlPanel.Events;
var existingEvents = controlPanel.Events;
var (@event, idempotencyKey) = existingEvents.EventsWithIdempotencyKeys.Single();
@event.ShouldBe("hello world");
idempotencyKey.ShouldBe("first");
Expand All @@ -1043,7 +1042,7 @@ await rAction.Invoke(

await controlPanel.Refresh();

existingEvents = await controlPanel.Events;
existingEvents = controlPanel.Events;
(@event, idempotencyKey) = existingEvents.EventsWithIdempotencyKeys.Single();
@event.ShouldBe("hello universe");
idempotencyKey.ShouldBe("second");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ public async Task<bool> SetFunctionState(
TReturn? result,
DateTime? postponeUntil,
Exception? exception,
ExistingEvents? existingEvents,
int expectedEpoch
)
{
Expand Down
39 changes: 18 additions & 21 deletions Core/Cleipnir.ResilientFunctions/Domain/ControlPanel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ internal ControlPanel(
TScrapbook scrapbook,
DateTime? postponedUntil,
ExistingActivities existingActivities,
ExistingEvents existingEvents,
PreviouslyThrownException? previouslyThrownException)
{
_invoker = invoker;
Expand All @@ -38,16 +39,15 @@ internal ControlPanel(
PostponedUntil = postponedUntil;
PreviouslyThrownException = previouslyThrownException;
Activities = existingActivities;
Events = existingEvents;
}

public FunctionId FunctionId { get; }
public Status Status { get; private set; }

public int Epoch { get; private set; }
public DateTime LeaseExpiration { get; private set; }

private Task<ExistingEvents>? _events;
public Task<ExistingEvents> Events => _events ??= _invocationHelper.GetExistingEvents(FunctionId);
public ExistingEvents Events { get; private set; }
public ExistingActivities Activities { get; private set; }
public ITimeoutProvider TimeoutProvider => _invocationHelper.CreateTimeoutProvider(FunctionId);

Expand Down Expand Up @@ -96,7 +96,6 @@ public async Task Succeed()

Epoch++;
_changed = false;
_events = null;
}

public async Task Postpone(DateTime until)
Expand All @@ -113,7 +112,6 @@ public async Task Postpone(DateTime until)
Status = Status.Postponed;
PostponedUntil = until;
_changed = false;
_events = null;
}

public Task Postpone(TimeSpan delay) => Postpone(DateTime.UtcNow + delay);
Expand All @@ -132,7 +130,6 @@ public async Task Fail(Exception exception)
Status = Status.Failed;
PreviouslyThrownException = new PreviouslyThrownException(exception.Message, exception.StackTrace, exception.GetType());
_changed = false;
_events = null;
}

public async Task SaveChanges()
Expand All @@ -143,7 +140,6 @@ public async Task SaveChanges()

Epoch++;
_changed = false;
_events = null;
}

public Task Delete() => _invocationHelper.Delete(FunctionId, Epoch);
Expand Down Expand Up @@ -177,7 +173,7 @@ public async Task Refresh()
PostponedUntil = sf.PostponedUntil;
PreviouslyThrownException = sf.PreviouslyThrownException;
_changed = false;
_events = null;
Events = await _invocationHelper.GetExistingEvents(FunctionId);
Activities = await _invocationHelper.GetExistingActivities(FunctionId);
}

Expand All @@ -202,6 +198,7 @@ internal ControlPanel(
TReturn? result,
DateTime? postponedUntil,
ExistingActivities activities,
ExistingEvents events,
PreviouslyThrownException? previouslyThrownException)
{
_invoker = invoker;
Expand All @@ -216,6 +213,7 @@ internal ControlPanel(
Result = result;
PostponedUntil = postponedUntil;
Activities = activities;
Events = events;
PreviouslyThrownException = previouslyThrownException;
}

Expand All @@ -225,8 +223,7 @@ internal ControlPanel(
public int Epoch { get; private set; }
public DateTime LeaseExpiration { get; private set; }

private Task<ExistingEvents>? _events;
public Task<ExistingEvents> Events => _events ??= _invocationHelper.GetExistingEvents(FunctionId);
public ExistingEvents Events { get; private set; }
public ExistingActivities Activities { get; private set; }

public ITimeoutProvider TimeoutProvider => _invocationHelper.CreateTimeoutProvider(FunctionId);
Expand Down Expand Up @@ -268,8 +265,10 @@ public TScrapbook Scrapbook
public async Task Succeed(TReturn result)
{
var success = await _invocationHelper.SetFunctionState(
FunctionId, Status.Succeeded, Param, Scrapbook, result, PostponedUntil, exception: null,
existingEvents: _events == null ? null : await _events,
FunctionId, Status.Succeeded,
Param, Scrapbook,
result,
PostponedUntil, exception: null,
Epoch
);

Expand All @@ -279,14 +278,15 @@ public async Task Succeed(TReturn result)
Epoch++;
Status = Status.Succeeded;
_changed = false;
_events = null;
}

public async Task Postpone(DateTime until)
{
var success = await _invocationHelper.SetFunctionState(
FunctionId, Status.Postponed, Param, Scrapbook, result: default, until, exception: null,
existingEvents: _events == null ? null : await _events,
FunctionId, Status.Postponed,
Param, Scrapbook,
result: default, until,
exception: null,
Epoch
);

Expand All @@ -297,16 +297,16 @@ public async Task Postpone(DateTime until)
Status = Status.Postponed;
PostponedUntil = until;
_changed = false;
_events = null;
}

public Task Postpone(TimeSpan delay) => Postpone(DateTime.UtcNow + delay);

public async Task Fail(Exception exception)
{
var success = await _invocationHelper.SetFunctionState(
FunctionId, Status.Failed, Param, Scrapbook, result: default, postponeUntil: null, exception,
existingEvents: _events == null ? null : await _events,
FunctionId, Status.Failed,
Param, Scrapbook,
result: default, postponeUntil: null, exception,
Epoch
);

Expand All @@ -317,7 +317,6 @@ public async Task Fail(Exception exception)
Status = Status.Failed;
PreviouslyThrownException = new PreviouslyThrownException(exception.Message, exception.StackTrace, exception.GetType());
_changed = false;
_events = null;
}

public async Task SaveChanges()
Expand All @@ -328,7 +327,6 @@ public async Task SaveChanges()

Epoch++;
_changed = false;
_events = null;
}

public Task Delete() => _invocationHelper.Delete(FunctionId, Epoch);
Expand Down Expand Up @@ -365,7 +363,6 @@ public async Task Refresh()
Activities = await _invocationHelper.GetExistingActivities(FunctionId);

_changed = false;
_events = null;
}

public async Task<TReturn> WaitForCompletion(bool allowPostponeAndSuspended = false)
Expand Down
20 changes: 11 additions & 9 deletions Core/Cleipnir.ResilientFunctions/Domain/ControlPanels.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,23 @@ internal ControlPanels(FunctionTypeId functionTypeId, Invoker<TParam, TScrapbook
public async Task<ControlPanel<TParam, TScrapbook>?> For(FunctionInstanceId functionInstanceId)
{
var functionId = new FunctionId(_functionTypeId, functionInstanceId);
var f = await _invocationHelper.GetFunction(functionId);
if (f == null)
var functionState = await _invocationHelper.GetFunction(functionId);
if (functionState == null)
return null;

return new ControlPanel<TParam, TScrapbook>(
_invoker,
_invocationHelper,
functionId,
f.Status,
f.Epoch,
f.LeaseExpiration,
f.Param,
f.Scrapbook,
f.PostponedUntil,
functionState.Status,
functionState.Epoch,
functionState.LeaseExpiration,
functionState.Param,
functionState.Scrapbook,
functionState.PostponedUntil,
await _invocationHelper.GetExistingActivities(functionId),
f.PreviouslyThrownException
await _invocationHelper.GetExistingEvents(functionId),
functionState.PreviouslyThrownException
);
}
}
Expand Down Expand Up @@ -75,6 +76,7 @@ internal ControlPanels(FunctionTypeId functionTypeId, Invoker<TParam, TScrapbook
f.Result,
f.PostponedUntil,
await _invocationHelper.GetExistingActivities(functionId),
await _invocationHelper.GetExistingEvents(functionId),
f.PreviouslyThrownException
);
}
Expand Down

0 comments on commit 107811a

Please sign in to comment.