From 6a9093dabd429798e50a5327d25f4fce08b75f6f Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 10 Mar 2022 12:07:46 +0000 Subject: [PATCH] Moving Caching logic to central file (#318) --- src/Equinox.Core/Caching.fs | 45 ++++++++++++ src/Equinox.Core/Equinox.Core.fsproj | 1 + src/Equinox.CosmosStore/CosmosStore.fs | 17 ++--- src/Equinox.EventStore/EventStore.fs | 75 +++----------------- src/Equinox.SqlStreamStore/SqlStreamStore.fs | 66 +++-------------- 5 files changed, 69 insertions(+), 135 deletions(-) create mode 100644 src/Equinox.Core/Caching.fs diff --git a/src/Equinox.Core/Caching.fs b/src/Equinox.Core/Caching.fs new file mode 100644 index 000000000..db64acd20 --- /dev/null +++ b/src/Equinox.Core/Caching.fs @@ -0,0 +1,45 @@ +module Equinox.Core.Caching + +type internal Decorator<'event, 'state, 'context>(inner : ICategory<'event, 'state, string, 'context>, updateCache : string -> StreamToken * 'state -> Async) = + + let cache streamName inner = async { + let! tokenAndState = inner + do! updateCache streamName tokenAndState + return tokenAndState } + + interface ICategory<'event, 'state, string, 'context> with + member _.Load(log, streamName : string, opt) : Async = + inner.Load(log, streamName, opt) |> cache streamName + + member _.TrySync(log : Serilog.ILogger, streamName, streamToken, state, events : 'event list, context) : Async> = async { + match! inner.TrySync(log, streamName, streamToken, state, events, context) with + | SyncResult.Conflict resync -> return SyncResult.Conflict (resync |> cache streamName) + | SyncResult.Written (token', state') -> + do! updateCache streamName (token', state') + return SyncResult.Written (token', state') } + +let applyCacheUpdatesWithSlidingExpiration + (cache : ICache) + (prefix : string) + (slidingExpiration : System.TimeSpan) + (category : ICategory<'event, 'state, string, 'context>) + supersedes + : ICategory<'event, 'state, string, 'context> = + let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = new CacheEntry<'state>(initialToken, initialState, supersedes) + let options = CacheItemOptions.RelativeExpiration slidingExpiration + let addOrUpdateSlidingExpirationCacheEntry streamName value = cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value) + Decorator<'event, 'state, 'context>(category, addOrUpdateSlidingExpirationCacheEntry) :> _ + +let applyCacheUpdatesWithFixedTimeSpan + (cache : ICache) + (prefix : string) + (lifetime : System.TimeSpan) + (category : ICategory<'event, 'state, string, 'context>) + supersedes + : ICategory<'event, 'state, string, 'context> = + let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = CacheEntry<'state>(initialToken, initialState, supersedes) + let addOrUpdateFixedLifetimeCacheEntry streamName value = + let expirationPoint = let creationDate = System.DateTimeOffset.UtcNow in creationDate.Add lifetime + let options = CacheItemOptions.AbsoluteExpiration expirationPoint + cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value) + Decorator<'event, 'state, 'context>(category, addOrUpdateFixedLifetimeCacheEntry) :> _ diff --git a/src/Equinox.Core/Equinox.Core.fsproj b/src/Equinox.Core/Equinox.Core.fsproj index 979a1be0f..8970d0d68 100644 --- a/src/Equinox.Core/Equinox.Core.fsproj +++ b/src/Equinox.Core/Equinox.Core.fsproj @@ -16,6 +16,7 @@ + diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index 9cb9f3119..54d8d1c35 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -1132,23 +1132,22 @@ module internal Caching = checkUnfolds, compressUnfolds, mapUnfolds : Choice 'state -> 'event seq, 'event list -> 'state -> 'event list * 'event list>) = let cache streamName inner = async { - let! ts = inner - do! updateCache streamName ts - return ts } + let! tokenAndState = inner + do! updateCache streamName tokenAndState + return tokenAndState } interface ICategory<'event, 'state, string, 'context> with member _.Load(log, streamName, allowStale) : Async = async { match! tryReadCache streamName with | None -> return! category.Load(log, streamName, initial, checkUnfolds, fold, isOrigin) |> cache streamName | Some tokenAndState when allowStale -> return tokenAndState // read already updated TTL, no need to write | Some (token, state) -> return! category.Reload(log, streamName, token, state, fold, isOrigin) |> cache streamName } - member _.TrySync(log : ILogger, streamName, streamToken, state, events : 'event list, context) - : Async> = async { + member _.TrySync(log : ILogger, streamName, streamToken, state, events : 'event list, context) : Async> = async { match! category.Sync(log, streamName, streamToken, state, events, mapUnfolds, fold, isOrigin, context, compressUnfolds) with | SyncResult.Conflict resync -> return SyncResult.Conflict (cache streamName resync) | SyncResult.Written (token', state') -> - let! res = cache streamName (async { return token', state' }) - return SyncResult.Written res } + do! updateCache streamName (token', state') + return SyncResult.Written (token', state') } module ConnectionString = @@ -1159,12 +1158,10 @@ module ConnectionString = namespace Equinox.CosmosStore -open Equinox open Equinox.Core open Equinox.CosmosStore.Core open FsCodec open Microsoft.Azure.Cosmos -open Serilog open System [] @@ -1447,13 +1444,11 @@ type CosmosStoreCategory<'event, 'state, 'context> let cosmosCat = Category<'event, 'state, 'context>(container, codec) Caching.CachingCategory<'event, 'state, 'context>(cosmosCat, fold, initial, isOrigin, tryReadCache, updateCache, checkUnfolds, compressUnfolds, mapUnfolds) :> _ categories.GetOrAdd(categoryName, createCategory) - let resolve (StreamName.CategoryAndId (categoryName, streamId)) = let container, streamName, maybeContainerInitializationGate = context.ResolveContainerClientAndStreamIdAndInit(categoryName, streamId) resolveCategory (categoryName, container), streamName, maybeContainerInitializationGate let empty = Token.create Position.fromKnownEmpty, initial let storeCategory = StoreCategory(resolve, empty) - member _.Resolve(streamName, ?context) = storeCategory.Resolve(streamName, ?context = context) namespace Equinox.CosmosStore.Core diff --git a/src/Equinox.EventStore/EventStore.fs b/src/Equinox.EventStore/EventStore.fs index 805fc3878..dcf906855 100755 --- a/src/Equinox.EventStore/EventStore.fs +++ b/src/Equinox.EventStore/EventStore.fs @@ -143,8 +143,7 @@ module private Write = /// Yields `EsSyncResult.Written` or `EsSyncResult.Conflict` to signify WrongExpectedVersion let private writeEventsAsync (log : ILogger) (conn : IEventStoreConnection) (streamName : string) (version : int64) (events : EventData[]) : Async = async { - try - let! wr = conn.AppendToStreamAsync(streamName, version, events) |> Async.AwaitTaskCorrect + try let! wr = conn.AppendToStreamAsync(streamName, version, events) |> Async.AwaitTaskCorrect return EsSyncResult.Written wr with :? EventStore.ClientAPI.Exceptions.WrongExpectedVersionException as ex -> log.Information(ex, "Ges TrySync WrongExpectedVersionException writing {EventTypes}, actual {ActualVersion}", @@ -408,8 +407,7 @@ type EventStoreContext(conn : EventStoreConnection, batching : BatchingPolicy) = member _.TrySync log streamName (Token.Unpack token as streamToken) (events, encodedEvents : EventData array) isCompactionEventType : Async = async { let streamVersion = token.streamVersion - let! wr = Write.writeEvents log conn.WriteRetryPolicy conn.WriteConnection streamName streamVersion encodedEvents - match wr with + match! Write.writeEvents log conn.WriteRetryPolicy conn.WriteConnection streamName streamVersion encodedEvents with | EsSyncResult.Conflict actualVersion -> return GatewaySyncResult.ConflictUnknown (Token.ofNonCompacting actualVersion) | EsSyncResult.Written wr -> @@ -426,8 +424,7 @@ type EventStoreContext(conn : EventStoreConnection, batching : BatchingPolicy) = member _.Sync(log, streamName, streamVersion, events : FsCodec.IEventData[]) : Async = async { let encodedEvents : EventData[] = events |> Array.map UnionEncoderAdapters.eventDataOfEncodedEvent - let! wr = Write.writeEvents log conn.WriteRetryPolicy conn.WriteConnection streamName streamVersion encodedEvents - match wr with + match! Write.writeEvents log conn.WriteRetryPolicy conn.WriteConnection streamName streamVersion encodedEvents with | EsSyncResult.Conflict actualVersion -> return GatewaySyncResult.ConflictUnknown (Token.ofNonCompacting actualVersion) | EsSyncResult.Written wr -> @@ -493,62 +490,13 @@ type private Category<'event, 'state, 'context>(context : EventStoreContext, cod | Some (AccessStrategy.RollingSnapshots (_, compact)) -> let cc = CompactionContext(List.length events, token.batchCapacityLimit.Value) if cc.IsCompactionDue then events @ [fold state events |> compact] else events - let encodedEvents : EventData[] = events |> Seq.map (encode >> UnionEncoderAdapters.eventDataOfEncodedEvent) |> Array.ofSeq - let! syncRes = context.TrySync log streamName streamToken (events, encodedEvents) compactionPredicate - match syncRes with + match! context.TrySync log streamName streamToken (events, encodedEvents) compactionPredicate with | GatewaySyncResult.ConflictUnknown _ -> return SyncResult.Conflict (load fold state (context.LoadFromToken true streamName log streamToken (tryDecode, compactionPredicate))) | GatewaySyncResult.Written token' -> return SyncResult.Written (token', fold state (Seq.ofList events)) } -module Caching = - /// Forwards all state changes in all streams of an ICategory to a `tee` function - type CategoryTee<'event, 'state, 'context>(inner : ICategory<'event, 'state, string, 'context>, tee : string -> StreamToken * 'state -> Async) = - let intercept streamName tokenAndState = async { - let! _ = tee streamName tokenAndState - return tokenAndState } - - let loadAndIntercept load streamName = async { - let! tokenAndState = load - return! intercept streamName tokenAndState } - - interface ICategory<'event, 'state, string, 'context> with - member _.Load(log, streamName : string, opt) : Async = - loadAndIntercept (inner.Load(log, streamName, opt)) streamName - - member _.TrySync(log : ILogger, stream, token, state, events : 'event list, context) : Async> = async { - let! syncRes = inner.TrySync(log, stream, token, state, events, context) - match syncRes with - | SyncResult.Conflict resync -> return SyncResult.Conflict (loadAndIntercept resync stream) - | SyncResult.Written (token', state') -> - let! intercepted = intercept stream (token', state') - return SyncResult.Written intercepted } - - let applyCacheUpdatesWithSlidingExpiration - (cache : ICache) - (prefix : string) - (slidingExpiration : TimeSpan) - (category : ICategory<'event, 'state, string, 'context>) - : ICategory<'event, 'state, string, 'context> = - let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes) - let options = CacheItemOptions.RelativeExpiration slidingExpiration - let addOrUpdateSlidingExpirationCacheEntry streamName value = cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value) - CategoryTee<'event, 'state, 'context>(category, addOrUpdateSlidingExpirationCacheEntry) :> _ - - let applyCacheUpdatesWithFixedTimeSpan - (cache : ICache) - (prefix : string) - (period : TimeSpan) - (category : ICategory<'event, 'state, string, 'context>) - : ICategory<'event, 'state, string, 'context> = - let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = CacheEntry<'state>(initialToken, initialState, Token.supersedes) - let addOrUpdateFixedLifetimeCacheEntry streamName value = - let expirationPoint = let creationDate = DateTimeOffset.UtcNow in creationDate.Add period - let options = CacheItemOptions.AbsoluteExpiration expirationPoint - cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value) - CategoryTee<'event, 'state, 'context>(category, addOrUpdateFixedLifetimeCacheEntry) :> _ - type private Folder<'event, 'state, 'context>(category : Category<'event, 'state, 'context>, fold : 'state -> 'event seq -> 'state, initial : 'state, ?readCache) = let batched log streamName = category.Load fold initial streamName log interface ICategory<'event, 'state, string, 'context> with @@ -560,10 +508,8 @@ type private Folder<'event, 'state, 'context>(category : Category<'event, 'state | None -> return! batched log streamName | Some tokenAndState when allowStale -> return tokenAndState | Some (token, state) -> return! category.LoadFromToken fold state streamName token log } - - member _.TrySync(log : ILogger, streamName, token, initialState, events : 'event list, context) : Async> = async { - let! syncRes = category.TrySync(log, fold, streamName, token, initialState, events, context) - match syncRes with + member _.TrySync(log, streamName, token, initialState, events : 'event list, context) : Async> = async { + match! category.TrySync(log, fold, streamName, token, initialState, events, context) with | SyncResult.Conflict resync -> return SyncResult.Conflict resync | SyncResult.Written (token', state') -> return SyncResult.Written (token', state') } @@ -597,7 +543,6 @@ type EventStoreCategory<'event, 'state, 'context> + "mixing AccessStrategy.LatestKnownEvent with Caching at present." |> invalidOp | _ -> () - let inner = Category<'event, 'state, 'context>(context, codec, ?access = access) let readCacheOption = match caching with @@ -605,18 +550,16 @@ type EventStoreCategory<'event, 'state, 'context> | Some (CachingStrategy.SlidingWindow (cache, _)) | Some (CachingStrategy.FixedTimeSpan (cache, _)) -> Some (cache, null) | Some (CachingStrategy.SlidingWindowPrefixed (cache, _, prefix)) -> Some (cache, prefix) - let folder = Folder<'event, 'state, 'context>(inner, fold, initial, ?readCache = readCacheOption) - let category : ICategory<_, _, _, 'context> = match caching with | None -> folder :> _ | Some (CachingStrategy.SlidingWindow (cache, window)) -> - Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder + Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder Token.supersedes | Some (CachingStrategy.FixedTimeSpan (cache, period)) -> - Caching.applyCacheUpdatesWithFixedTimeSpan cache null period folder + Caching.applyCacheUpdatesWithFixedTimeSpan cache null period folder Token.supersedes | Some (CachingStrategy.SlidingWindowPrefixed (cache, window, prefix)) -> - Caching.applyCacheUpdatesWithSlidingExpiration cache prefix window folder + Caching.applyCacheUpdatesWithSlidingExpiration cache prefix window folder Token.supersedes let resolve streamName = category, FsCodec.StreamName.toString streamName, None let empty = context.TokenEmpty, initial let storeCategory = StoreCategory(resolve, empty) diff --git a/src/Equinox.SqlStreamStore/SqlStreamStore.fs b/src/Equinox.SqlStreamStore/SqlStreamStore.fs index 8ae2c3372..32c006c6e 100644 --- a/src/Equinox.SqlStreamStore/SqlStreamStore.fs +++ b/src/Equinox.SqlStreamStore/SqlStreamStore.fs @@ -136,8 +136,7 @@ module private Write = /// Yields `EsSyncResult.Written` or `EsSyncResult.Conflict` to signify WrongExpectedVersion let private writeEventsAsync (log : ILogger) (conn : IEventStoreConnection) (streamName : string) (version : int64) (events : EventData[]) : Async = async { - try - let! wr = conn.AppendToStream(StreamId streamName, (if version = -1L then ExpectedVersion.NoStream else int version), events) |> Async.AwaitTaskCorrect + try let! wr = conn.AppendToStream(StreamId streamName, (if version = -1L then ExpectedVersion.NoStream else int version), events) |> Async.AwaitTaskCorrect return EsSyncResult.Written wr with :? WrongExpectedVersionException as ex -> log.Information(ex, "SqlEs TrySync WrongExpectedVersionException writing {EventTypes}, expected {ExpectedVersion}", @@ -379,8 +378,7 @@ type SqlStreamStoreContext(connection : SqlStreamStoreConnection, batching : Bat | None -> return Token.ofPreviousTokenAndEventsLength streamToken events.Length batching.BatchSize version, Array.choose tryDecode events | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize version, Array.choose tryDecode events } member _.TrySync log streamName (Token.Unpack pos as streamToken) (events, encodedEvents : EventData array) isCompactionEventType : Async = async { - let! wr = Write.writeEvents log connection.WriteRetryPolicy connection.WriteConnection streamName pos.streamVersion encodedEvents - match wr with + match! Write.writeEvents log connection.WriteRetryPolicy connection.WriteConnection streamName pos.streamVersion encodedEvents with | EsSyncResult.ConflictUnknown -> return GatewaySyncResult.ConflictUnknown | EsSyncResult.Written wr -> @@ -396,8 +394,7 @@ type SqlStreamStoreContext(connection : SqlStreamStoreConnection, batching : Bat return GatewaySyncResult.Written token } member _.Sync(log, streamName, streamVersion, events : FsCodec.IEventData[]) : Async = async { let encodedEvents : EventData[] = events |> Array.map UnionEncoderAdapters.eventDataOfEncodedEvent - let! wr = Write.writeEvents log connection.WriteRetryPolicy connection.WriteConnection streamName streamVersion encodedEvents - match wr with + match! Write.writeEvents log connection.WriteRetryPolicy connection.WriteConnection streamName streamVersion encodedEvents with | EsSyncResult.ConflictUnknown -> return GatewaySyncResult.ConflictUnknown | EsSyncResult.Written wr -> @@ -456,59 +453,13 @@ type private Category<'event, 'state, 'context>(context : SqlStreamStoreContext, | Some (AccessStrategy.RollingSnapshots (_, compact)) -> let cc = CompactionContext(List.length events, token.batchCapacityLimit.Value) if cc.IsCompactionDue then events @ [fold state events |> compact] else events - let encodedEvents : EventData[] = events |> Seq.map (encode >> UnionEncoderAdapters.eventDataOfEncodedEvent) |> Array.ofSeq - let! syncRes = context.TrySync log streamName streamToken (events, encodedEvents) compactionPredicate - match syncRes with + match! context.TrySync log streamName streamToken (events, encodedEvents) compactionPredicate with | GatewaySyncResult.ConflictUnknown -> return SyncResult.Conflict (load fold state (context.LoadFromToken true streamName log streamToken (tryDecode, compactionPredicate))) | GatewaySyncResult.Written token' -> return SyncResult.Written (token', fold state (Seq.ofList events)) } -module Caching = - /// Forwards all state changes in all streams of an ICategory to a `tee` function - type CategoryTee<'event, 'state, 'context>(inner : ICategory<'event, 'state, string, 'context>, tee : string -> StreamToken * 'state -> Async) = - let intercept streamName tokenAndState = async { - let! _ = tee streamName tokenAndState - return tokenAndState } - let loadAndIntercept load streamName = async { - let! tokenAndState = load - return! intercept streamName tokenAndState } - interface ICategory<'event, 'state, string, 'context> with - member _.Load(log, streamName : string, opt) : Async = - loadAndIntercept (inner.Load(log, streamName, opt)) streamName - member _.TrySync(log : ILogger, streamName, streamToken, state, events : 'event list, context) : Async> = async { - let! syncRes = inner.TrySync(log, streamName, streamToken, state, events, context) - match syncRes with - | SyncResult.Conflict resync -> return SyncResult.Conflict (loadAndIntercept resync streamName) - | SyncResult.Written (token', state') -> - let! intercepted = intercept streamName (token', state') - return SyncResult.Written intercepted } - - let applyCacheUpdatesWithSlidingExpiration - (cache : ICache) - (prefix : string) - (slidingExpiration : TimeSpan) - (category : ICategory<'event, 'state, string, 'context>) - : ICategory<'event, 'state, string, 'context> = - let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes) - let options = CacheItemOptions.RelativeExpiration slidingExpiration - let addOrUpdateSlidingExpirationCacheEntry streamName value = cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value) - CategoryTee<'event, 'state, 'context>(category, addOrUpdateSlidingExpirationCacheEntry) :> _ - - let applyCacheUpdatesWithFixedTimeSpan - (cache : ICache) - (prefix : string) - (lifetime : TimeSpan) - (category : ICategory<'event, 'state, string, 'context>) - : ICategory<'event, 'state, string, 'context> = - let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = CacheEntry<'state>(initialToken, initialState, Token.supersedes) - let addOrUpdateFixedLifetimeCacheEntry streamName value = - let expirationPoint = let creationDate = DateTimeOffset.UtcNow in creationDate.Add lifetime - let options = CacheItemOptions.AbsoluteExpiration expirationPoint - cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value) - CategoryTee<'event, 'state, 'context>(category, addOrUpdateFixedLifetimeCacheEntry) :> _ - type private Folder<'event, 'state, 'context>(category : Category<'event, 'state, 'context>, fold : 'state -> 'event seq -> 'state, initial : 'state, ?readCache) = let batched log streamName = category.Load fold initial streamName log interface ICategory<'event, 'state, string, 'context> with @@ -521,8 +472,7 @@ type private Folder<'event, 'state, 'context>(category : Category<'event, 'state | Some tokenAndState when allowStale -> return tokenAndState | Some (token, state) -> return! category.LoadFromToken fold state streamName token log } member _.TrySync(log : ILogger, streamName, streamToken, initialState, events : 'event list, context) : Async> = async { - let! syncRes = category.TrySync(log, fold, streamName, streamToken, initialState, events, context) - match syncRes with + match! category.TrySync(log, fold, streamName, streamToken, initialState, events, context) with | SyncResult.Conflict resync -> return SyncResult.Conflict resync | SyncResult.Written (token', state') -> return SyncResult.Written (token', state') } @@ -568,11 +518,11 @@ type SqlStreamStoreCategory<'event, 'state, 'context> match caching with | None -> folder :> _ | Some (CachingStrategy.SlidingWindow (cache, window)) -> - Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder + Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder Token.supersedes | Some (CachingStrategy.FixedTimeSpan (cache, period)) -> - Caching.applyCacheUpdatesWithFixedTimeSpan cache null period folder + Caching.applyCacheUpdatesWithFixedTimeSpan cache null period folder Token.supersedes | Some (CachingStrategy.SlidingWindowPrefixed (cache, window, prefix)) -> - Caching.applyCacheUpdatesWithSlidingExpiration cache prefix window folder + Caching.applyCacheUpdatesWithSlidingExpiration cache prefix window folder Token.supersedes let resolve streamName = category, FsCodec.StreamName.toString streamName, None let empty = context.TokenEmpty, initial let storeCategory = StoreCategory<'event, 'state, FsCodec.StreamName, 'context>(resolve, empty)