Skip to content

Commit

Permalink
EventsSubscription Pull-method renamed to PullNewEvents
Browse files Browse the repository at this point in the history
  • Loading branch information
stidsborg committed Oct 15, 2023
1 parent 94169d8 commit 9a6a1a4
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ await functionStore.CreateFunction(
);
await eventStore.AppendEvent(functionId, event1);

var newEvents = await subscription.Pull();
var newEvents = await subscription.PullNewEvents();
newEvents.Count.ShouldBe(1);
var storedEvent = newEvents[0];
var @event = DefaultSerializer.Instance.DeserializeEvent(storedEvent.EventJson, storedEvent.EventType);
Expand All @@ -526,14 +526,14 @@ await functionStore.CreateFunction(
);
await eventStore.AppendEvent(functionId, event2);

newEvents = await subscription.Pull();
newEvents = await subscription.PullNewEvents();
newEvents.Count.ShouldBe(1);
storedEvent = newEvents[0];
@event = DefaultSerializer.Instance.DeserializeEvent(storedEvent.EventJson, storedEvent.EventType);
@event.ShouldBe("hello universe");
storedEvent.IdempotencyKey.ShouldBe("idempotency_key_2");

await subscription.Pull().SelectAsync(l => l.Count).ShouldBeAsync(0);
await subscription.PullNewEvents().SelectAsync(l => l.Count).ShouldBeAsync(0);
}

public abstract Task EventSubscriptionPublishesFiltersOutEventsWithSameIdempotencyKeys();
Expand Down Expand Up @@ -561,7 +561,7 @@ await functionStore.CreateFunction(
);
await eventStore.AppendEvent(functionId, event1);

var newEvents = await subscription.Pull();
var newEvents = await subscription.PullNewEvents();
newEvents.Count.ShouldBe(1);
var storedEvent = newEvents[0];
var @event = DefaultSerializer.Instance.DeserializeEvent(storedEvent.EventJson, storedEvent.EventType);
Expand All @@ -575,10 +575,10 @@ await functionStore.CreateFunction(
);
await eventStore.AppendEvent(functionId, event2);

newEvents = await subscription.Pull();
newEvents = await subscription.PullNewEvents();
newEvents.Count.ShouldBe(0);

newEvents = await subscription.Pull();
newEvents = await subscription.PullNewEvents();
newEvents.Count.ShouldBe(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ await functionStore.CreateFunction(

var subscription = eventStore.SubscribeToEvents(functionId);

var events = await subscription.Pull();
var events = await subscription.PullNewEvents();
events.ShouldBeEmpty();

await eventStore.AppendEvent(
Expand All @@ -40,14 +40,14 @@ await eventStore.AppendEvent(
eventType: typeof(string).SimpleQualifiedName()
);

events = await subscription.Pull();
events = await subscription.PullNewEvents();
events.Count.ShouldBe(1);
DefaultSerializer
.Instance
.DeserializeEvent(events[0].EventJson, events[0].EventType)
.ShouldBe("hello world");

events = await subscription.Pull();
events = await subscription.PullNewEvents();
events.ShouldBeEmpty();

await eventStore.AppendEvent(
Expand All @@ -56,7 +56,7 @@ await eventStore.AppendEvent(
eventType: typeof(string).SimpleQualifiedName()
);

events = await subscription.Pull();
events = await subscription.PullNewEvents();
events.Count.ShouldBe(1);

DefaultSerializer
Expand All @@ -72,7 +72,7 @@ await eventStore.AppendEvent(
eventType: typeof(string).SimpleQualifiedName()
);

events = await subscription.Pull();
events = await subscription.PullNewEvents();
events.ShouldBeEmpty();
}
}
2 changes: 1 addition & 1 deletion Core/Cleipnir.ResilientFunctions/Messaging/EventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private async Task DeliverOutstandingEvents(bool deliverDespiteNoActiveSubscript

try
{
var storedEvents = await _eventsSubscription!.Pull();
var storedEvents = await _eventsSubscription!.PullNewEvents();
foreach (var storedEvent in storedEvents)
{
if (storedEvent.IdempotencyKey != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ namespace Cleipnir.ResilientFunctions.Messaging;

public sealed class EventsSubscription : IAsyncDisposable
{
private readonly Func<Task<IReadOnlyList<StoredEvent>>> _pullEvents;
private readonly Func<Task<IReadOnlyList<StoredEvent>>> _pullNewEvents;
private readonly Func<ValueTask> _dispose;
private bool _disposed;

private readonly object _sync = new();

public EventsSubscription(Func<Task<IReadOnlyList<StoredEvent>>> pullEvents, Func<ValueTask> dispose)
public EventsSubscription(Func<Task<IReadOnlyList<StoredEvent>>> pullNewEvents, Func<ValueTask> dispose)
{
_pullEvents = pullEvents;
_pullNewEvents = pullNewEvents;
_dispose = dispose;
}

public Task<IReadOnlyList<StoredEvent>> Pull() => _pullEvents();
public Task<IReadOnlyList<StoredEvent>> PullNewEvents() => _pullNewEvents();

public ValueTask DisposeAsync()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ public virtual EventsSubscription SubscribeToEvents(FunctionId functionId)
var skip = 0;

var subscription = new EventsSubscription(
pullEvents: () =>
pullNewEvents: () =>
{
List<StoredEvent>? events;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public EventsSubscription SubscribeToEvents(FunctionId functionId)
var idempotencyKeys = new HashSet<string>();

var subscription = new EventsSubscription(
pullEvents: async () =>
pullNewEvents: async () =>
{
lock (sync)
if (disposed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public EventsSubscription SubscribeToEvents(FunctionId functionId)
var disposed = false;

var subscription = new EventsSubscription(
pullEvents: async () =>
pullNewEvents: async () =>
{
lock (sync)
if (disposed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public EventsSubscription SubscribeToEvents(FunctionId functionId)
var skip = 0;

var subscription = new EventsSubscription(
async () =>
pullNewEvents: async () =>
{
lock (sync)
if (disposed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public EventsSubscription SubscribeToEvents(FunctionId functionId)
var skip = 0;

var subscription = new EventsSubscription(
pullEvents: async () =>
pullNewEvents: async () =>
{
lock (sync)
if (disposed)
Expand Down

0 comments on commit 9a6a1a4

Please sign in to comment.