diff --git a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/EventStoreTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/EventStoreTests.cs index 875aaa83..d415f2d8 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/EventStoreTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/EventStoreTests.cs @@ -512,7 +512,7 @@ await functionStore.CreateFunction( ); await eventStore.AppendEvent(functionId, event1); - var newEvents = await subscription.Pull(); + var newEvents = await subscription.PullNewEvents(); newEvents.Count.ShouldBe(1); var storedEvent = newEvents[0]; var @event = DefaultSerializer.Instance.DeserializeEvent(storedEvent.EventJson, storedEvent.EventType); @@ -526,14 +526,14 @@ await functionStore.CreateFunction( ); await eventStore.AppendEvent(functionId, event2); - newEvents = await subscription.Pull(); + newEvents = await subscription.PullNewEvents(); newEvents.Count.ShouldBe(1); storedEvent = newEvents[0]; @event = DefaultSerializer.Instance.DeserializeEvent(storedEvent.EventJson, storedEvent.EventType); @event.ShouldBe("hello universe"); storedEvent.IdempotencyKey.ShouldBe("idempotency_key_2"); - await subscription.Pull().SelectAsync(l => l.Count).ShouldBeAsync(0); + await subscription.PullNewEvents().SelectAsync(l => l.Count).ShouldBeAsync(0); } public abstract Task EventSubscriptionPublishesFiltersOutEventsWithSameIdempotencyKeys(); @@ -561,7 +561,7 @@ await functionStore.CreateFunction( ); await eventStore.AppendEvent(functionId, event1); - var newEvents = await subscription.Pull(); + var newEvents = await subscription.PullNewEvents(); newEvents.Count.ShouldBe(1); var storedEvent = newEvents[0]; var @event = DefaultSerializer.Instance.DeserializeEvent(storedEvent.EventJson, storedEvent.EventType); @@ -575,10 +575,10 @@ await functionStore.CreateFunction( ); await eventStore.AppendEvent(functionId, event2); - newEvents = await subscription.Pull(); + newEvents = await subscription.PullNewEvents(); newEvents.Count.ShouldBe(0); - newEvents = await subscription.Pull(); + newEvents = await subscription.PullNewEvents(); newEvents.Count.ShouldBe(0); } } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/EventsSubscriptionTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/EventsSubscriptionTests.cs index 71b730a9..1911ff86 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/EventsSubscriptionTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/EventsSubscriptionTests.cs @@ -31,7 +31,7 @@ await functionStore.CreateFunction( var subscription = eventStore.SubscribeToEvents(functionId); - var events = await subscription.Pull(); + var events = await subscription.PullNewEvents(); events.ShouldBeEmpty(); await eventStore.AppendEvent( @@ -40,14 +40,14 @@ await eventStore.AppendEvent( eventType: typeof(string).SimpleQualifiedName() ); - events = await subscription.Pull(); + events = await subscription.PullNewEvents(); events.Count.ShouldBe(1); DefaultSerializer .Instance .DeserializeEvent(events[0].EventJson, events[0].EventType) .ShouldBe("hello world"); - events = await subscription.Pull(); + events = await subscription.PullNewEvents(); events.ShouldBeEmpty(); await eventStore.AppendEvent( @@ -56,7 +56,7 @@ await eventStore.AppendEvent( eventType: typeof(string).SimpleQualifiedName() ); - events = await subscription.Pull(); + events = await subscription.PullNewEvents(); events.Count.ShouldBe(1); DefaultSerializer @@ -72,7 +72,7 @@ await eventStore.AppendEvent( eventType: typeof(string).SimpleQualifiedName() ); - events = await subscription.Pull(); + events = await subscription.PullNewEvents(); events.ShouldBeEmpty(); } } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Messaging/EventSource.cs b/Core/Cleipnir.ResilientFunctions/Messaging/EventSource.cs index d2314c69..5b57b865 100644 --- a/Core/Cleipnir.ResilientFunctions/Messaging/EventSource.cs +++ b/Core/Cleipnir.ResilientFunctions/Messaging/EventSource.cs @@ -97,7 +97,7 @@ private async Task DeliverOutstandingEvents(bool deliverDespiteNoActiveSubscript try { - var storedEvents = await _eventsSubscription!.Pull(); + var storedEvents = await _eventsSubscription!.PullNewEvents(); foreach (var storedEvent in storedEvents) { if (storedEvent.IdempotencyKey != null) diff --git a/Core/Cleipnir.ResilientFunctions/Messaging/EventsSubscription.cs b/Core/Cleipnir.ResilientFunctions/Messaging/EventsSubscription.cs index 1df5f84b..c2c4362d 100644 --- a/Core/Cleipnir.ResilientFunctions/Messaging/EventsSubscription.cs +++ b/Core/Cleipnir.ResilientFunctions/Messaging/EventsSubscription.cs @@ -6,19 +6,19 @@ namespace Cleipnir.ResilientFunctions.Messaging; public sealed class EventsSubscription : IAsyncDisposable { - private readonly Func>> _pullEvents; + private readonly Func>> _pullNewEvents; private readonly Func _dispose; private bool _disposed; private readonly object _sync = new(); - public EventsSubscription(Func>> pullEvents, Func dispose) + public EventsSubscription(Func>> pullNewEvents, Func dispose) { - _pullEvents = pullEvents; + _pullNewEvents = pullNewEvents; _dispose = dispose; } - public Task> Pull() => _pullEvents(); + public Task> PullNewEvents() => _pullNewEvents(); public ValueTask DisposeAsync() { diff --git a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs index b11b6a1b..22790182 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs @@ -479,7 +479,7 @@ public virtual EventsSubscription SubscribeToEvents(FunctionId functionId) var skip = 0; var subscription = new EventsSubscription( - pullEvents: () => + pullNewEvents: () => { List? events; diff --git a/Stores/AzureBlob/Cleipnir.ResilientFunctions.AzureBlob/AzureBlobEventStore.cs b/Stores/AzureBlob/Cleipnir.ResilientFunctions.AzureBlob/AzureBlobEventStore.cs index 69a2381b..7a95874d 100644 --- a/Stores/AzureBlob/Cleipnir.ResilientFunctions.AzureBlob/AzureBlobEventStore.cs +++ b/Stores/AzureBlob/Cleipnir.ResilientFunctions.AzureBlob/AzureBlobEventStore.cs @@ -208,7 +208,7 @@ public EventsSubscription SubscribeToEvents(FunctionId functionId) var idempotencyKeys = new HashSet(); var subscription = new EventsSubscription( - pullEvents: async () => + pullNewEvents: async () => { lock (sync) if (disposed) diff --git a/Stores/MySQL/Cleipnir.ResilientFunctions.MySQL/MySqlEventStore.cs b/Stores/MySQL/Cleipnir.ResilientFunctions.MySQL/MySqlEventStore.cs index 00f24f48..4d56a2fb 100644 --- a/Stores/MySQL/Cleipnir.ResilientFunctions.MySQL/MySqlEventStore.cs +++ b/Stores/MySQL/Cleipnir.ResilientFunctions.MySQL/MySqlEventStore.cs @@ -292,7 +292,7 @@ public EventsSubscription SubscribeToEvents(FunctionId functionId) var disposed = false; var subscription = new EventsSubscription( - pullEvents: async () => + pullNewEvents: async () => { lock (sync) if (disposed) diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlEventStore.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlEventStore.cs index 0a5eaac0..d74ec894 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlEventStore.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlEventStore.cs @@ -295,7 +295,7 @@ public EventsSubscription SubscribeToEvents(FunctionId functionId) var skip = 0; var subscription = new EventsSubscription( - async () => + pullNewEvents: async () => { lock (sync) if (disposed) diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerEventStore.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerEventStore.cs index bf128cc8..a31b609d 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerEventStore.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerEventStore.cs @@ -243,7 +243,7 @@ public EventsSubscription SubscribeToEvents(FunctionId functionId) var skip = 0; var subscription = new EventsSubscription( - pullEvents: async () => + pullNewEvents: async () => { lock (sync) if (disposed)