From d22930785545b54ce5e2ab0ed7364ca8e1de1fef Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 9 Mar 2022 01:14:31 +0000 Subject: [PATCH 1/6] Remove Asyncness from Cache --- src/Equinox.Core/Cache.fs | 21 ++++++++++---------- src/Equinox.CosmosStore/CosmosStore.fs | 6 +++--- src/Equinox.EventStore/EventStore.fs | 14 ++++++------- src/Equinox.SqlStreamStore/SqlStreamStore.fs | 14 ++++++------- 4 files changed, 27 insertions(+), 28 deletions(-) diff --git a/src/Equinox.Core/Cache.fs b/src/Equinox.Core/Cache.fs index f98b6376b..cc1107658 100755 --- a/src/Equinox.Core/Cache.fs +++ b/src/Equinox.Core/Cache.fs @@ -21,8 +21,8 @@ type CacheEntry<'state>(initialToken: StreamToken, initialState: 'state, superse currentToken, currentState type ICache = - abstract member UpdateIfNewer : key: string * options: CacheItemOptions * entry: CacheEntry<'state> -> Async - abstract member TryGet : key: string -> Async<(StreamToken * 'state) option> + abstract member UpdateIfNewer : key: string * options: CacheItemOptions * entry: CacheEntry<'state> -> unit + abstract member TryGet : key: string -> (StreamToken * 'state) option namespace Equinox @@ -41,16 +41,15 @@ type Cache(name, sizeMb : int) = | RelativeExpiration relative -> CacheItemPolicy(SlidingExpiration = relative) interface ICache with - member _.UpdateIfNewer(key, options, entry) = async { + member _.UpdateIfNewer(key, options, entry) = let policy = toPolicy options match cache.AddOrGetExisting(key, box entry, policy) with | null -> () | :? CacheEntry<'state> as existingEntry -> existingEntry.UpdateIfNewer entry - | x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x } - - member _.TryGet key = async { - return - match cache.Get key with - | null -> None - | :? CacheEntry<'state> as existingEntry -> Some existingEntry.Value - | x -> failwithf "TryGet Incompatible cache entry %A" x } + | x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x + + member _.TryGet key = + match cache.Get key with + | null -> None + | :? CacheEntry<'state> as existingEntry -> Some existingEntry.Value + | x -> failwithf "TryGet Incompatible cache entry %A" x diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index 9cb9f3119..6c5697038 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -1133,11 +1133,11 @@ module internal Caching = mapUnfolds : Choice 'state -> 'event seq, 'event list -> 'state -> 'event list * 'event list>) = let cache streamName inner = async { let! ts = inner - do! updateCache streamName ts + do updateCache streamName ts return ts } interface ICategory<'event, 'state, string, 'context> with member _.Load(log, streamName, allowStale) : Async = async { - match! tryReadCache streamName with + match tryReadCache streamName with | None -> return! category.Load(log, streamName, initial, checkUnfolds, fold, isOrigin) |> cache streamName | Some tokenAndState when allowStale -> return tokenAndState // read already updated TTL, no need to write | Some (token, state) -> return! category.Reload(log, streamName, token, state, fold, isOrigin) |> cache streamName } @@ -1433,7 +1433,7 @@ type CosmosStoreCategory<'event, 'state, 'context> let createCategory _name : ICategory<_, _, string, 'context> = let tryReadCache, updateCache = match caching with - | CachingStrategy.NoCaching -> (fun _ -> async { return None }), fun _ _ -> async { () } + | CachingStrategy.NoCaching -> (fun _ -> None), fun _ _ -> () | CachingStrategy.SlidingWindow (cache, window) -> cache.TryGet, Caching.applyCacheUpdatesWithSlidingExpiration (cache, null) window | CachingStrategy.FixedTimeSpan (cache, period) -> cache.TryGet, Caching.applyCacheUpdatesWithFixedTimeSpan (cache, null) period let isOrigin, checkUnfolds, mapUnfolds = diff --git a/src/Equinox.EventStore/EventStore.fs b/src/Equinox.EventStore/EventStore.fs index 805fc3878..0b0dd9b03 100755 --- a/src/Equinox.EventStore/EventStore.fs +++ b/src/Equinox.EventStore/EventStore.fs @@ -504,14 +504,14 @@ type private Category<'event, 'state, 'context>(context : EventStoreContext, cod module Caching = /// Forwards all state changes in all streams of an ICategory to a `tee` function - type CategoryTee<'event, 'state, 'context>(inner : ICategory<'event, 'state, string, 'context>, tee : string -> StreamToken * 'state -> Async) = - let intercept streamName tokenAndState = async { - let! _ = tee streamName tokenAndState - return tokenAndState } + type CategoryTee<'event, 'state, 'context>(inner : ICategory<'event, 'state, string, 'context>, tee : string -> StreamToken * 'state -> unit) = + let intercept streamName tokenAndState = + let _ = tee streamName tokenAndState + tokenAndState let loadAndIntercept load streamName = async { let! tokenAndState = load - return! intercept streamName tokenAndState } + return intercept streamName tokenAndState } interface ICategory<'event, 'state, string, 'context> with member _.Load(log, streamName : string, opt) : Async = @@ -522,7 +522,7 @@ module Caching = match syncRes with | SyncResult.Conflict resync -> return SyncResult.Conflict (loadAndIntercept resync stream) | SyncResult.Written (token', state') -> - let! intercepted = intercept stream (token', state') + let intercepted = intercept stream (token', state') return SyncResult.Written intercepted } let applyCacheUpdatesWithSlidingExpiration @@ -556,7 +556,7 @@ type private Folder<'event, 'state, 'context>(category : Category<'event, 'state match readCache with | None -> batched log streamName | Some (cache : ICache, prefix : string) -> async { - match! cache.TryGet(prefix + streamName) with + match cache.TryGet(prefix + streamName) with | None -> return! batched log streamName | Some tokenAndState when allowStale -> return tokenAndState | Some (token, state) -> return! category.LoadFromToken fold state streamName token log } diff --git a/src/Equinox.SqlStreamStore/SqlStreamStore.fs b/src/Equinox.SqlStreamStore/SqlStreamStore.fs index 8ae2c3372..55a7309ce 100644 --- a/src/Equinox.SqlStreamStore/SqlStreamStore.fs +++ b/src/Equinox.SqlStreamStore/SqlStreamStore.fs @@ -467,13 +467,13 @@ type private Category<'event, 'state, 'context>(context : SqlStreamStoreContext, module Caching = /// Forwards all state changes in all streams of an ICategory to a `tee` function - type CategoryTee<'event, 'state, 'context>(inner : ICategory<'event, 'state, string, 'context>, tee : string -> StreamToken * 'state -> Async) = - let intercept streamName tokenAndState = async { - let! _ = tee streamName tokenAndState - return tokenAndState } + type CategoryTee<'event, 'state, 'context>(inner : ICategory<'event, 'state, string, 'context>, tee : string -> StreamToken * 'state -> unit) = + let intercept streamName tokenAndState = + let _ = tee streamName tokenAndState + tokenAndState let loadAndIntercept load streamName = async { let! tokenAndState = load - return! intercept streamName tokenAndState } + return intercept streamName tokenAndState } interface ICategory<'event, 'state, string, 'context> with member _.Load(log, streamName : string, opt) : Async = loadAndIntercept (inner.Load(log, streamName, opt)) streamName @@ -482,7 +482,7 @@ module Caching = match syncRes with | SyncResult.Conflict resync -> return SyncResult.Conflict (loadAndIntercept resync streamName) | SyncResult.Written (token', state') -> - let! intercepted = intercept streamName (token', state') + let intercepted = intercept streamName (token', state') return SyncResult.Written intercepted } let applyCacheUpdatesWithSlidingExpiration @@ -516,7 +516,7 @@ type private Folder<'event, 'state, 'context>(category : Category<'event, 'state match readCache with | None -> batched log streamName | Some (cache : ICache, prefix : string) -> async { - match! cache.TryGet(prefix + streamName) with + match cache.TryGet(prefix + streamName) with | None -> return! batched log streamName | Some tokenAndState when allowStale -> return tokenAndState | Some (token, state) -> return! category.LoadFromToken fold state streamName token log } From 9dff08b552e11acc05bae66d17efcc15412ef873 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 9 Mar 2022 09:48:48 +0000 Subject: [PATCH 2/6] Formatting --- src/Equinox.CosmosStore/CosmosStore.fs | 25 ++++----- src/Equinox.EventStore/EventStore.fs | 58 +++++++------------- src/Equinox.SqlStreamStore/SqlStreamStore.fs | 47 +++++++--------- 3 files changed, 50 insertions(+), 80 deletions(-) diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index 6c5697038..faa99bfc3 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -1132,23 +1132,22 @@ module internal Caching = checkUnfolds, compressUnfolds, mapUnfolds : Choice 'state -> 'event seq, 'event list -> 'state -> 'event list * 'event list>) = let cache streamName inner = async { - let! ts = inner - do updateCache streamName ts - return ts } + let! tokenAndState = inner + updateCache streamName tokenAndState + return tokenAndState } interface ICategory<'event, 'state, string, 'context> with - member _.Load(log, streamName, allowStale) : Async = async { + member _.Load(log, streamName, allowStale) : Async = match tryReadCache streamName with - | None -> return! category.Load(log, streamName, initial, checkUnfolds, fold, isOrigin) |> cache streamName - | Some tokenAndState when allowStale -> return tokenAndState // read already updated TTL, no need to write - | Some (token, state) -> return! category.Reload(log, streamName, token, state, fold, isOrigin) |> cache streamName } - member _.TrySync(log : ILogger, streamName, streamToken, state, events : 'event list, context) - : Async> = async { + | None -> category.Load(log, streamName, initial, checkUnfolds, fold, isOrigin) |> cache streamName + | Some tokenAndState when allowStale -> async { return tokenAndState } // read already updated TTL, no need to write + | Some (token, state) -> category.Reload(log, streamName, token, state, fold, isOrigin) |> cache streamName + member _.TrySync(log : ILogger, streamName, streamToken, state, events : 'event list, context) : Async> = async { match! category.Sync(log, streamName, streamToken, state, events, mapUnfolds, fold, isOrigin, context, compressUnfolds) with | SyncResult.Conflict resync -> return SyncResult.Conflict (cache streamName resync) | SyncResult.Written (token', state') -> - let! res = cache streamName (async { return token', state' }) - return SyncResult.Written res } + updateCache streamName (token', state') + return SyncResult.Written (token', state') } module ConnectionString = @@ -1159,12 +1158,10 @@ module ConnectionString = namespace Equinox.CosmosStore -open Equinox open Equinox.Core open Equinox.CosmosStore.Core open FsCodec open Microsoft.Azure.Cosmos -open Serilog open System [] @@ -1447,13 +1444,11 @@ type CosmosStoreCategory<'event, 'state, 'context> let cosmosCat = Category<'event, 'state, 'context>(container, codec) Caching.CachingCategory<'event, 'state, 'context>(cosmosCat, fold, initial, isOrigin, tryReadCache, updateCache, checkUnfolds, compressUnfolds, mapUnfolds) :> _ categories.GetOrAdd(categoryName, createCategory) - let resolve (StreamName.CategoryAndId (categoryName, streamId)) = let container, streamName, maybeContainerInitializationGate = context.ResolveContainerClientAndStreamIdAndInit(categoryName, streamId) resolveCategory (categoryName, container), streamName, maybeContainerInitializationGate let empty = Token.create Position.fromKnownEmpty, initial let storeCategory = StoreCategory(resolve, empty) - member _.Resolve(streamName, ?context) = storeCategory.Resolve(streamName, ?context = context) namespace Equinox.CosmosStore.Core diff --git a/src/Equinox.EventStore/EventStore.fs b/src/Equinox.EventStore/EventStore.fs index 0b0dd9b03..12d6abd54 100755 --- a/src/Equinox.EventStore/EventStore.fs +++ b/src/Equinox.EventStore/EventStore.fs @@ -143,8 +143,7 @@ module private Write = /// Yields `EsSyncResult.Written` or `EsSyncResult.Conflict` to signify WrongExpectedVersion let private writeEventsAsync (log : ILogger) (conn : IEventStoreConnection) (streamName : string) (version : int64) (events : EventData[]) : Async = async { - try - let! wr = conn.AppendToStreamAsync(streamName, version, events) |> Async.AwaitTaskCorrect + try let! wr = conn.AppendToStreamAsync(streamName, version, events) |> Async.AwaitTaskCorrect return EsSyncResult.Written wr with :? EventStore.ClientAPI.Exceptions.WrongExpectedVersionException as ex -> log.Information(ex, "Ges TrySync WrongExpectedVersionException writing {EventTypes}, actual {ActualVersion}", @@ -408,8 +407,7 @@ type EventStoreContext(conn : EventStoreConnection, batching : BatchingPolicy) = member _.TrySync log streamName (Token.Unpack token as streamToken) (events, encodedEvents : EventData array) isCompactionEventType : Async = async { let streamVersion = token.streamVersion - let! wr = Write.writeEvents log conn.WriteRetryPolicy conn.WriteConnection streamName streamVersion encodedEvents - match wr with + match! Write.writeEvents log conn.WriteRetryPolicy conn.WriteConnection streamName streamVersion encodedEvents with | EsSyncResult.Conflict actualVersion -> return GatewaySyncResult.ConflictUnknown (Token.ofNonCompacting actualVersion) | EsSyncResult.Written wr -> @@ -426,8 +424,7 @@ type EventStoreContext(conn : EventStoreConnection, batching : BatchingPolicy) = member _.Sync(log, streamName, streamVersion, events : FsCodec.IEventData[]) : Async = async { let encodedEvents : EventData[] = events |> Array.map UnionEncoderAdapters.eventDataOfEncodedEvent - let! wr = Write.writeEvents log conn.WriteRetryPolicy conn.WriteConnection streamName streamVersion encodedEvents - match wr with + match! Write.writeEvents log conn.WriteRetryPolicy conn.WriteConnection streamName streamVersion encodedEvents with | EsSyncResult.Conflict actualVersion -> return GatewaySyncResult.ConflictUnknown (Token.ofNonCompacting actualVersion) | EsSyncResult.Written wr -> @@ -493,10 +490,8 @@ type private Category<'event, 'state, 'context>(context : EventStoreContext, cod | Some (AccessStrategy.RollingSnapshots (_, compact)) -> let cc = CompactionContext(List.length events, token.batchCapacityLimit.Value) if cc.IsCompactionDue then events @ [fold state events |> compact] else events - let encodedEvents : EventData[] = events |> Seq.map (encode >> UnionEncoderAdapters.eventDataOfEncodedEvent) |> Array.ofSeq - let! syncRes = context.TrySync log streamName streamToken (events, encodedEvents) compactionPredicate - match syncRes with + match! context.TrySync log streamName streamToken (events, encodedEvents) compactionPredicate with | GatewaySyncResult.ConflictUnknown _ -> return SyncResult.Conflict (load fold state (context.LoadFromToken true streamName log streamToken (tryDecode, compactionPredicate))) | GatewaySyncResult.Written token' -> @@ -504,26 +499,20 @@ type private Category<'event, 'state, 'context>(context : EventStoreContext, cod module Caching = /// Forwards all state changes in all streams of an ICategory to a `tee` function - type CategoryTee<'event, 'state, 'context>(inner : ICategory<'event, 'state, string, 'context>, tee : string -> StreamToken * 'state -> unit) = - let intercept streamName tokenAndState = - let _ = tee streamName tokenAndState - tokenAndState - - let loadAndIntercept load streamName = async { - let! tokenAndState = load - return intercept streamName tokenAndState } - + type CategoryTee<'event, 'state, 'context>(inner : ICategory<'event, 'state, string, 'context>, updateCache : string -> StreamToken * 'state -> unit) = + let cache streamName inner = async { + let! tokenAndState = inner + updateCache streamName tokenAndState + return tokenAndState } interface ICategory<'event, 'state, string, 'context> with member _.Load(log, streamName : string, opt) : Async = - loadAndIntercept (inner.Load(log, streamName, opt)) streamName - - member _.TrySync(log : ILogger, stream, token, state, events : 'event list, context) : Async> = async { - let! syncRes = inner.TrySync(log, stream, token, state, events, context) - match syncRes with - | SyncResult.Conflict resync -> return SyncResult.Conflict (loadAndIntercept resync stream) + inner.Load(log, streamName, opt) |> cache streamName + member _.TrySync(log : ILogger, streamName, token, state, events : 'event list, context) : Async> = async { + match! inner.TrySync(log, streamName, token, state, events, context) with + | SyncResult.Conflict resync -> return SyncResult.Conflict (resync |> cache streamName) | SyncResult.Written (token', state') -> - let intercepted = intercept stream (token', state') - return SyncResult.Written intercepted } + updateCache streamName (token', state') + return SyncResult.Written (token', state') } let applyCacheUpdatesWithSlidingExpiration (cache : ICache) @@ -555,15 +544,13 @@ type private Folder<'event, 'state, 'context>(category : Category<'event, 'state member _.Load(log, streamName, allowStale) : Async = match readCache with | None -> batched log streamName - | Some (cache : ICache, prefix : string) -> async { + | Some (cache : ICache, prefix : string) -> match cache.TryGet(prefix + streamName) with - | None -> return! batched log streamName - | Some tokenAndState when allowStale -> return tokenAndState - | Some (token, state) -> return! category.LoadFromToken fold state streamName token log } - - member _.TrySync(log : ILogger, streamName, token, initialState, events : 'event list, context) : Async> = async { - let! syncRes = category.TrySync(log, fold, streamName, token, initialState, events, context) - match syncRes with + | None -> batched log streamName + | Some tokenAndState when allowStale -> async { return tokenAndState } + | Some (token, state) -> category.LoadFromToken fold state streamName token log + member _.TrySync(log, streamName, token, initialState, events : 'event list, context) : Async> = async { + match! category.TrySync(log, fold, streamName, token, initialState, events, context) with | SyncResult.Conflict resync -> return SyncResult.Conflict resync | SyncResult.Written (token', state') -> return SyncResult.Written (token', state') } @@ -597,7 +584,6 @@ type EventStoreCategory<'event, 'state, 'context> + "mixing AccessStrategy.LatestKnownEvent with Caching at present." |> invalidOp | _ -> () - let inner = Category<'event, 'state, 'context>(context, codec, ?access = access) let readCacheOption = match caching with @@ -605,9 +591,7 @@ type EventStoreCategory<'event, 'state, 'context> | Some (CachingStrategy.SlidingWindow (cache, _)) | Some (CachingStrategy.FixedTimeSpan (cache, _)) -> Some (cache, null) | Some (CachingStrategy.SlidingWindowPrefixed (cache, _, prefix)) -> Some (cache, prefix) - let folder = Folder<'event, 'state, 'context>(inner, fold, initial, ?readCache = readCacheOption) - let category : ICategory<_, _, _, 'context> = match caching with | None -> folder :> _ diff --git a/src/Equinox.SqlStreamStore/SqlStreamStore.fs b/src/Equinox.SqlStreamStore/SqlStreamStore.fs index 55a7309ce..7fd4819c0 100644 --- a/src/Equinox.SqlStreamStore/SqlStreamStore.fs +++ b/src/Equinox.SqlStreamStore/SqlStreamStore.fs @@ -136,8 +136,7 @@ module private Write = /// Yields `EsSyncResult.Written` or `EsSyncResult.Conflict` to signify WrongExpectedVersion let private writeEventsAsync (log : ILogger) (conn : IEventStoreConnection) (streamName : string) (version : int64) (events : EventData[]) : Async = async { - try - let! wr = conn.AppendToStream(StreamId streamName, (if version = -1L then ExpectedVersion.NoStream else int version), events) |> Async.AwaitTaskCorrect + try let! wr = conn.AppendToStream(StreamId streamName, (if version = -1L then ExpectedVersion.NoStream else int version), events) |> Async.AwaitTaskCorrect return EsSyncResult.Written wr with :? WrongExpectedVersionException as ex -> log.Information(ex, "SqlEs TrySync WrongExpectedVersionException writing {EventTypes}, expected {ExpectedVersion}", @@ -379,8 +378,7 @@ type SqlStreamStoreContext(connection : SqlStreamStoreConnection, batching : Bat | None -> return Token.ofPreviousTokenAndEventsLength streamToken events.Length batching.BatchSize version, Array.choose tryDecode events | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize version, Array.choose tryDecode events } member _.TrySync log streamName (Token.Unpack pos as streamToken) (events, encodedEvents : EventData array) isCompactionEventType : Async = async { - let! wr = Write.writeEvents log connection.WriteRetryPolicy connection.WriteConnection streamName pos.streamVersion encodedEvents - match wr with + match! Write.writeEvents log connection.WriteRetryPolicy connection.WriteConnection streamName pos.streamVersion encodedEvents with | EsSyncResult.ConflictUnknown -> return GatewaySyncResult.ConflictUnknown | EsSyncResult.Written wr -> @@ -396,8 +394,7 @@ type SqlStreamStoreContext(connection : SqlStreamStoreConnection, batching : Bat return GatewaySyncResult.Written token } member _.Sync(log, streamName, streamVersion, events : FsCodec.IEventData[]) : Async = async { let encodedEvents : EventData[] = events |> Array.map UnionEncoderAdapters.eventDataOfEncodedEvent - let! wr = Write.writeEvents log connection.WriteRetryPolicy connection.WriteConnection streamName streamVersion encodedEvents - match wr with + match! Write.writeEvents log connection.WriteRetryPolicy connection.WriteConnection streamName streamVersion encodedEvents with | EsSyncResult.ConflictUnknown -> return GatewaySyncResult.ConflictUnknown | EsSyncResult.Written wr -> @@ -456,10 +453,8 @@ type private Category<'event, 'state, 'context>(context : SqlStreamStoreContext, | Some (AccessStrategy.RollingSnapshots (_, compact)) -> let cc = CompactionContext(List.length events, token.batchCapacityLimit.Value) if cc.IsCompactionDue then events @ [fold state events |> compact] else events - let encodedEvents : EventData[] = events |> Seq.map (encode >> UnionEncoderAdapters.eventDataOfEncodedEvent) |> Array.ofSeq - let! syncRes = context.TrySync log streamName streamToken (events, encodedEvents) compactionPredicate - match syncRes with + match! context.TrySync log streamName streamToken (events, encodedEvents) compactionPredicate with | GatewaySyncResult.ConflictUnknown -> return SyncResult.Conflict (load fold state (context.LoadFromToken true streamName log streamToken (tryDecode, compactionPredicate))) | GatewaySyncResult.Written token' -> @@ -467,23 +462,20 @@ type private Category<'event, 'state, 'context>(context : SqlStreamStoreContext, module Caching = /// Forwards all state changes in all streams of an ICategory to a `tee` function - type CategoryTee<'event, 'state, 'context>(inner : ICategory<'event, 'state, string, 'context>, tee : string -> StreamToken * 'state -> unit) = - let intercept streamName tokenAndState = - let _ = tee streamName tokenAndState - tokenAndState - let loadAndIntercept load streamName = async { - let! tokenAndState = load - return intercept streamName tokenAndState } + type CategoryTee<'event, 'state, 'context>(inner : ICategory<'event, 'state, string, 'context>, updateCache : string -> StreamToken * 'state -> unit) = + let cache streamName inner = async { + let! tokenAndState = inner + updateCache streamName tokenAndState + return tokenAndState } interface ICategory<'event, 'state, string, 'context> with member _.Load(log, streamName : string, opt) : Async = - loadAndIntercept (inner.Load(log, streamName, opt)) streamName + inner.Load(log, streamName, opt) |> cache streamName member _.TrySync(log : ILogger, streamName, streamToken, state, events : 'event list, context) : Async> = async { - let! syncRes = inner.TrySync(log, streamName, streamToken, state, events, context) - match syncRes with - | SyncResult.Conflict resync -> return SyncResult.Conflict (loadAndIntercept resync streamName) + match! inner.TrySync(log, streamName, streamToken, state, events, context) with + | SyncResult.Conflict resync -> return SyncResult.Conflict (resync |> cache streamName) | SyncResult.Written (token', state') -> - let intercepted = intercept streamName (token', state') - return SyncResult.Written intercepted } + updateCache streamName (token', state') + return SyncResult.Written (token', state') } let applyCacheUpdatesWithSlidingExpiration (cache : ICache) @@ -515,14 +507,13 @@ type private Folder<'event, 'state, 'context>(category : Category<'event, 'state member _.Load(log, streamName, allowStale) : Async = match readCache with | None -> batched log streamName - | Some (cache : ICache, prefix : string) -> async { + | Some (cache : ICache, prefix : string) -> match cache.TryGet(prefix + streamName) with - | None -> return! batched log streamName - | Some tokenAndState when allowStale -> return tokenAndState - | Some (token, state) -> return! category.LoadFromToken fold state streamName token log } + | None -> batched log streamName + | Some tokenAndState when allowStale -> async { return tokenAndState } + | Some (token, state) -> category.LoadFromToken fold state streamName token log member _.TrySync(log : ILogger, streamName, streamToken, initialState, events : 'event list, context) : Async> = async { - let! syncRes = category.TrySync(log, fold, streamName, streamToken, initialState, events, context) - match syncRes with + match! category.TrySync(log, fold, streamName, streamToken, initialState, events, context) with | SyncResult.Conflict resync -> return SyncResult.Conflict resync | SyncResult.Written (token', state') -> return SyncResult.Written (token', state') } From e63550466e4b362c5dfa5b6d4e56222f73aec350 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 9 Mar 2022 09:58:43 +0000 Subject: [PATCH 3/6] Extrat common CachingDecorator --- src/Equinox.Core/Cache.fs | 46 +++++++++++++++++++ src/Equinox.EventStore/EventStore.fs | 47 ++------------------ src/Equinox.SqlStreamStore/SqlStreamStore.fs | 47 ++------------------ 3 files changed, 52 insertions(+), 88 deletions(-) diff --git a/src/Equinox.Core/Cache.fs b/src/Equinox.Core/Cache.fs index cc1107658..d04a4c20c 100755 --- a/src/Equinox.Core/Cache.fs +++ b/src/Equinox.Core/Cache.fs @@ -24,6 +24,52 @@ type ICache = abstract member UpdateIfNewer : key: string * options: CacheItemOptions * entry: CacheEntry<'state> -> unit abstract member TryGet : key: string -> (StreamToken * 'state) option +module Caching = + + type internal CachingDecorator<'event, 'state, 'context>(inner : ICategory<'event, 'state, string, 'context>, updateCache : string -> StreamToken * 'state -> unit) = + + let cache streamName inner = async { + let! tokenAndState = inner + updateCache streamName tokenAndState + return tokenAndState } + + interface ICategory<'event, 'state, string, 'context> with + member _.Load(log, streamName : string, opt) : Async = + inner.Load(log, streamName, opt) |> cache streamName + + member _.TrySync(log : Serilog.ILogger, streamName, streamToken, state, events : 'event list, context) : Async> = async { + match! inner.TrySync(log, streamName, streamToken, state, events, context) with + | SyncResult.Conflict resync -> return SyncResult.Conflict (resync |> cache streamName) + | SyncResult.Written (token', state') -> + updateCache streamName (token', state') + return SyncResult.Written (token', state') } + + let applyCacheUpdatesWithSlidingExpiration + (cache : ICache) + (prefix : string) + (slidingExpiration : TimeSpan) + (category : ICategory<'event, 'state, string, 'context>) + supersedes + : ICategory<'event, 'state, string, 'context> = + let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = new CacheEntry<'state>(initialToken, initialState, supersedes) + let options = CacheItemOptions.RelativeExpiration slidingExpiration + let addOrUpdateSlidingExpirationCacheEntry streamName value = cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value) + CachingDecorator<'event, 'state, 'context>(category, addOrUpdateSlidingExpirationCacheEntry) :> _ + + let applyCacheUpdatesWithFixedTimeSpan + (cache : ICache) + (prefix : string) + (lifetime : TimeSpan) + (category : ICategory<'event, 'state, string, 'context>) + supersedes + : ICategory<'event, 'state, string, 'context> = + let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = CacheEntry<'state>(initialToken, initialState, supersedes) + let addOrUpdateFixedLifetimeCacheEntry streamName value = + let expirationPoint = let creationDate = DateTimeOffset.UtcNow in creationDate.Add lifetime + let options = CacheItemOptions.AbsoluteExpiration expirationPoint + cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value) + CachingDecorator<'event, 'state, 'context>(category, addOrUpdateFixedLifetimeCacheEntry) :> _ + namespace Equinox open System.Runtime.Caching diff --git a/src/Equinox.EventStore/EventStore.fs b/src/Equinox.EventStore/EventStore.fs index 12d6abd54..016bf5d5b 100755 --- a/src/Equinox.EventStore/EventStore.fs +++ b/src/Equinox.EventStore/EventStore.fs @@ -497,47 +497,6 @@ type private Category<'event, 'state, 'context>(context : EventStoreContext, cod | GatewaySyncResult.Written token' -> return SyncResult.Written (token', fold state (Seq.ofList events)) } -module Caching = - /// Forwards all state changes in all streams of an ICategory to a `tee` function - type CategoryTee<'event, 'state, 'context>(inner : ICategory<'event, 'state, string, 'context>, updateCache : string -> StreamToken * 'state -> unit) = - let cache streamName inner = async { - let! tokenAndState = inner - updateCache streamName tokenAndState - return tokenAndState } - interface ICategory<'event, 'state, string, 'context> with - member _.Load(log, streamName : string, opt) : Async = - inner.Load(log, streamName, opt) |> cache streamName - member _.TrySync(log : ILogger, streamName, token, state, events : 'event list, context) : Async> = async { - match! inner.TrySync(log, streamName, token, state, events, context) with - | SyncResult.Conflict resync -> return SyncResult.Conflict (resync |> cache streamName) - | SyncResult.Written (token', state') -> - updateCache streamName (token', state') - return SyncResult.Written (token', state') } - - let applyCacheUpdatesWithSlidingExpiration - (cache : ICache) - (prefix : string) - (slidingExpiration : TimeSpan) - (category : ICategory<'event, 'state, string, 'context>) - : ICategory<'event, 'state, string, 'context> = - let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes) - let options = CacheItemOptions.RelativeExpiration slidingExpiration - let addOrUpdateSlidingExpirationCacheEntry streamName value = cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value) - CategoryTee<'event, 'state, 'context>(category, addOrUpdateSlidingExpirationCacheEntry) :> _ - - let applyCacheUpdatesWithFixedTimeSpan - (cache : ICache) - (prefix : string) - (period : TimeSpan) - (category : ICategory<'event, 'state, string, 'context>) - : ICategory<'event, 'state, string, 'context> = - let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = CacheEntry<'state>(initialToken, initialState, Token.supersedes) - let addOrUpdateFixedLifetimeCacheEntry streamName value = - let expirationPoint = let creationDate = DateTimeOffset.UtcNow in creationDate.Add period - let options = CacheItemOptions.AbsoluteExpiration expirationPoint - cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value) - CategoryTee<'event, 'state, 'context>(category, addOrUpdateFixedLifetimeCacheEntry) :> _ - type private Folder<'event, 'state, 'context>(category : Category<'event, 'state, 'context>, fold : 'state -> 'event seq -> 'state, initial : 'state, ?readCache) = let batched log streamName = category.Load fold initial streamName log interface ICategory<'event, 'state, string, 'context> with @@ -596,11 +555,11 @@ type EventStoreCategory<'event, 'state, 'context> match caching with | None -> folder :> _ | Some (CachingStrategy.SlidingWindow (cache, window)) -> - Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder + Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder Token.supersedes | Some (CachingStrategy.FixedTimeSpan (cache, period)) -> - Caching.applyCacheUpdatesWithFixedTimeSpan cache null period folder + Caching.applyCacheUpdatesWithFixedTimeSpan cache null period folder Token.supersedes | Some (CachingStrategy.SlidingWindowPrefixed (cache, window, prefix)) -> - Caching.applyCacheUpdatesWithSlidingExpiration cache prefix window folder + Caching.applyCacheUpdatesWithSlidingExpiration cache prefix window folder Token.supersedes let resolve streamName = category, FsCodec.StreamName.toString streamName, None let empty = context.TokenEmpty, initial let storeCategory = StoreCategory(resolve, empty) diff --git a/src/Equinox.SqlStreamStore/SqlStreamStore.fs b/src/Equinox.SqlStreamStore/SqlStreamStore.fs index 7fd4819c0..04c17b77a 100644 --- a/src/Equinox.SqlStreamStore/SqlStreamStore.fs +++ b/src/Equinox.SqlStreamStore/SqlStreamStore.fs @@ -460,47 +460,6 @@ type private Category<'event, 'state, 'context>(context : SqlStreamStoreContext, | GatewaySyncResult.Written token' -> return SyncResult.Written (token', fold state (Seq.ofList events)) } -module Caching = - /// Forwards all state changes in all streams of an ICategory to a `tee` function - type CategoryTee<'event, 'state, 'context>(inner : ICategory<'event, 'state, string, 'context>, updateCache : string -> StreamToken * 'state -> unit) = - let cache streamName inner = async { - let! tokenAndState = inner - updateCache streamName tokenAndState - return tokenAndState } - interface ICategory<'event, 'state, string, 'context> with - member _.Load(log, streamName : string, opt) : Async = - inner.Load(log, streamName, opt) |> cache streamName - member _.TrySync(log : ILogger, streamName, streamToken, state, events : 'event list, context) : Async> = async { - match! inner.TrySync(log, streamName, streamToken, state, events, context) with - | SyncResult.Conflict resync -> return SyncResult.Conflict (resync |> cache streamName) - | SyncResult.Written (token', state') -> - updateCache streamName (token', state') - return SyncResult.Written (token', state') } - - let applyCacheUpdatesWithSlidingExpiration - (cache : ICache) - (prefix : string) - (slidingExpiration : TimeSpan) - (category : ICategory<'event, 'state, string, 'context>) - : ICategory<'event, 'state, string, 'context> = - let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes) - let options = CacheItemOptions.RelativeExpiration slidingExpiration - let addOrUpdateSlidingExpirationCacheEntry streamName value = cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value) - CategoryTee<'event, 'state, 'context>(category, addOrUpdateSlidingExpirationCacheEntry) :> _ - - let applyCacheUpdatesWithFixedTimeSpan - (cache : ICache) - (prefix : string) - (lifetime : TimeSpan) - (category : ICategory<'event, 'state, string, 'context>) - : ICategory<'event, 'state, string, 'context> = - let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = CacheEntry<'state>(initialToken, initialState, Token.supersedes) - let addOrUpdateFixedLifetimeCacheEntry streamName value = - let expirationPoint = let creationDate = DateTimeOffset.UtcNow in creationDate.Add lifetime - let options = CacheItemOptions.AbsoluteExpiration expirationPoint - cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value) - CategoryTee<'event, 'state, 'context>(category, addOrUpdateFixedLifetimeCacheEntry) :> _ - type private Folder<'event, 'state, 'context>(category : Category<'event, 'state, 'context>, fold : 'state -> 'event seq -> 'state, initial : 'state, ?readCache) = let batched log streamName = category.Load fold initial streamName log interface ICategory<'event, 'state, string, 'context> with @@ -559,11 +518,11 @@ type SqlStreamStoreCategory<'event, 'state, 'context> match caching with | None -> folder :> _ | Some (CachingStrategy.SlidingWindow (cache, window)) -> - Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder + Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder Token.supersedes | Some (CachingStrategy.FixedTimeSpan (cache, period)) -> - Caching.applyCacheUpdatesWithFixedTimeSpan cache null period folder + Caching.applyCacheUpdatesWithFixedTimeSpan cache null period folder Token.supersedes | Some (CachingStrategy.SlidingWindowPrefixed (cache, window, prefix)) -> - Caching.applyCacheUpdatesWithSlidingExpiration cache prefix window folder + Caching.applyCacheUpdatesWithSlidingExpiration cache prefix window folder Token.supersedes let resolve streamName = category, FsCodec.StreamName.toString streamName, None let empty = context.TokenEmpty, initial let storeCategory = StoreCategory<'event, 'state, FsCodec.StreamName, 'context>(resolve, empty) From 00793ae502953abab678c57a032b821eda9c6e0a Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 9 Mar 2022 10:04:14 +0000 Subject: [PATCH 4/6] Shift to own file --- src/Equinox.Core/Cache.fs | 46 ------------------------------------- src/Equinox.Core/Caching.fs | 45 ++++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 46 deletions(-) create mode 100644 src/Equinox.Core/Caching.fs diff --git a/src/Equinox.Core/Cache.fs b/src/Equinox.Core/Cache.fs index d04a4c20c..cc1107658 100755 --- a/src/Equinox.Core/Cache.fs +++ b/src/Equinox.Core/Cache.fs @@ -24,52 +24,6 @@ type ICache = abstract member UpdateIfNewer : key: string * options: CacheItemOptions * entry: CacheEntry<'state> -> unit abstract member TryGet : key: string -> (StreamToken * 'state) option -module Caching = - - type internal CachingDecorator<'event, 'state, 'context>(inner : ICategory<'event, 'state, string, 'context>, updateCache : string -> StreamToken * 'state -> unit) = - - let cache streamName inner = async { - let! tokenAndState = inner - updateCache streamName tokenAndState - return tokenAndState } - - interface ICategory<'event, 'state, string, 'context> with - member _.Load(log, streamName : string, opt) : Async = - inner.Load(log, streamName, opt) |> cache streamName - - member _.TrySync(log : Serilog.ILogger, streamName, streamToken, state, events : 'event list, context) : Async> = async { - match! inner.TrySync(log, streamName, streamToken, state, events, context) with - | SyncResult.Conflict resync -> return SyncResult.Conflict (resync |> cache streamName) - | SyncResult.Written (token', state') -> - updateCache streamName (token', state') - return SyncResult.Written (token', state') } - - let applyCacheUpdatesWithSlidingExpiration - (cache : ICache) - (prefix : string) - (slidingExpiration : TimeSpan) - (category : ICategory<'event, 'state, string, 'context>) - supersedes - : ICategory<'event, 'state, string, 'context> = - let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = new CacheEntry<'state>(initialToken, initialState, supersedes) - let options = CacheItemOptions.RelativeExpiration slidingExpiration - let addOrUpdateSlidingExpirationCacheEntry streamName value = cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value) - CachingDecorator<'event, 'state, 'context>(category, addOrUpdateSlidingExpirationCacheEntry) :> _ - - let applyCacheUpdatesWithFixedTimeSpan - (cache : ICache) - (prefix : string) - (lifetime : TimeSpan) - (category : ICategory<'event, 'state, string, 'context>) - supersedes - : ICategory<'event, 'state, string, 'context> = - let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = CacheEntry<'state>(initialToken, initialState, supersedes) - let addOrUpdateFixedLifetimeCacheEntry streamName value = - let expirationPoint = let creationDate = DateTimeOffset.UtcNow in creationDate.Add lifetime - let options = CacheItemOptions.AbsoluteExpiration expirationPoint - cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value) - CachingDecorator<'event, 'state, 'context>(category, addOrUpdateFixedLifetimeCacheEntry) :> _ - namespace Equinox open System.Runtime.Caching diff --git a/src/Equinox.Core/Caching.fs b/src/Equinox.Core/Caching.fs new file mode 100644 index 000000000..35ae024c1 --- /dev/null +++ b/src/Equinox.Core/Caching.fs @@ -0,0 +1,45 @@ +module Equinox.Core.Caching + +type internal Decorator<'event, 'state, 'context>(inner : ICategory<'event, 'state, string, 'context>, updateCache : string -> StreamToken * 'state -> unit) = + + let cache streamName inner = async { + let! tokenAndState = inner + updateCache streamName tokenAndState + return tokenAndState } + + interface ICategory<'event, 'state, string, 'context> with + member _.Load(log, streamName : string, opt) : Async = + inner.Load(log, streamName, opt) |> cache streamName + + member _.TrySync(log : Serilog.ILogger, streamName, streamToken, state, events : 'event list, context) : Async> = async { + match! inner.TrySync(log, streamName, streamToken, state, events, context) with + | SyncResult.Conflict resync -> return SyncResult.Conflict (resync |> cache streamName) + | SyncResult.Written (token', state') -> + updateCache streamName (token', state') + return SyncResult.Written (token', state') } + +let applyCacheUpdatesWithSlidingExpiration + (cache : ICache) + (prefix : string) + (slidingExpiration : System.TimeSpan) + (category : ICategory<'event, 'state, string, 'context>) + supersedes + : ICategory<'event, 'state, string, 'context> = + let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = new CacheEntry<'state>(initialToken, initialState, supersedes) + let options = CacheItemOptions.RelativeExpiration slidingExpiration + let addOrUpdateSlidingExpirationCacheEntry streamName value = cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value) + Decorator<'event, 'state, 'context>(category, addOrUpdateSlidingExpirationCacheEntry) :> _ + +let applyCacheUpdatesWithFixedTimeSpan + (cache : ICache) + (prefix : string) + (lifetime : System.TimeSpan) + (category : ICategory<'event, 'state, string, 'context>) + supersedes + : ICategory<'event, 'state, string, 'context> = + let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = CacheEntry<'state>(initialToken, initialState, supersedes) + let addOrUpdateFixedLifetimeCacheEntry streamName value = + let expirationPoint = let creationDate = System.DateTimeOffset.UtcNow in creationDate.Add lifetime + let options = CacheItemOptions.AbsoluteExpiration expirationPoint + cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value) + Decorator<'event, 'state, 'context>(category, addOrUpdateFixedLifetimeCacheEntry) :> _ From 57005f8ec8ef4386e8ec5205d530d2afe68e691d Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 9 Mar 2022 10:04:14 +0000 Subject: [PATCH 5/6] Shift to own file --- src/Equinox.Core/Equinox.Core.fsproj | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Equinox.Core/Equinox.Core.fsproj b/src/Equinox.Core/Equinox.Core.fsproj index 979a1be0f..8970d0d68 100644 --- a/src/Equinox.Core/Equinox.Core.fsproj +++ b/src/Equinox.Core/Equinox.Core.fsproj @@ -16,6 +16,7 @@ + From 0902721aed154d3d16848015706f9c19e6afbabf Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 10 Mar 2022 10:23:56 +0000 Subject: [PATCH 6/6] Revert Asyncness removal --- src/Equinox.Core/Cache.fs | 21 ++++++++++---------- src/Equinox.Core/Caching.fs | 6 +++--- src/Equinox.CosmosStore/CosmosStore.fs | 16 +++++++-------- src/Equinox.EventStore/EventStore.fs | 10 +++++----- src/Equinox.SqlStreamStore/SqlStreamStore.fs | 10 +++++----- 5 files changed, 32 insertions(+), 31 deletions(-) diff --git a/src/Equinox.Core/Cache.fs b/src/Equinox.Core/Cache.fs index cc1107658..f98b6376b 100755 --- a/src/Equinox.Core/Cache.fs +++ b/src/Equinox.Core/Cache.fs @@ -21,8 +21,8 @@ type CacheEntry<'state>(initialToken: StreamToken, initialState: 'state, superse currentToken, currentState type ICache = - abstract member UpdateIfNewer : key: string * options: CacheItemOptions * entry: CacheEntry<'state> -> unit - abstract member TryGet : key: string -> (StreamToken * 'state) option + abstract member UpdateIfNewer : key: string * options: CacheItemOptions * entry: CacheEntry<'state> -> Async + abstract member TryGet : key: string -> Async<(StreamToken * 'state) option> namespace Equinox @@ -41,15 +41,16 @@ type Cache(name, sizeMb : int) = | RelativeExpiration relative -> CacheItemPolicy(SlidingExpiration = relative) interface ICache with - member _.UpdateIfNewer(key, options, entry) = + member _.UpdateIfNewer(key, options, entry) = async { let policy = toPolicy options match cache.AddOrGetExisting(key, box entry, policy) with | null -> () | :? CacheEntry<'state> as existingEntry -> existingEntry.UpdateIfNewer entry - | x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x - - member _.TryGet key = - match cache.Get key with - | null -> None - | :? CacheEntry<'state> as existingEntry -> Some existingEntry.Value - | x -> failwithf "TryGet Incompatible cache entry %A" x + | x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x } + + member _.TryGet key = async { + return + match cache.Get key with + | null -> None + | :? CacheEntry<'state> as existingEntry -> Some existingEntry.Value + | x -> failwithf "TryGet Incompatible cache entry %A" x } diff --git a/src/Equinox.Core/Caching.fs b/src/Equinox.Core/Caching.fs index 35ae024c1..db64acd20 100644 --- a/src/Equinox.Core/Caching.fs +++ b/src/Equinox.Core/Caching.fs @@ -1,10 +1,10 @@ module Equinox.Core.Caching -type internal Decorator<'event, 'state, 'context>(inner : ICategory<'event, 'state, string, 'context>, updateCache : string -> StreamToken * 'state -> unit) = +type internal Decorator<'event, 'state, 'context>(inner : ICategory<'event, 'state, string, 'context>, updateCache : string -> StreamToken * 'state -> Async) = let cache streamName inner = async { let! tokenAndState = inner - updateCache streamName tokenAndState + do! updateCache streamName tokenAndState return tokenAndState } interface ICategory<'event, 'state, string, 'context> with @@ -15,7 +15,7 @@ type internal Decorator<'event, 'state, 'context>(inner : ICategory<'event, 'sta match! inner.TrySync(log, streamName, streamToken, state, events, context) with | SyncResult.Conflict resync -> return SyncResult.Conflict (resync |> cache streamName) | SyncResult.Written (token', state') -> - updateCache streamName (token', state') + do! updateCache streamName (token', state') return SyncResult.Written (token', state') } let applyCacheUpdatesWithSlidingExpiration diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index faa99bfc3..54d8d1c35 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -1133,20 +1133,20 @@ module internal Caching = mapUnfolds : Choice 'state -> 'event seq, 'event list -> 'state -> 'event list * 'event list>) = let cache streamName inner = async { let! tokenAndState = inner - updateCache streamName tokenAndState + do! updateCache streamName tokenAndState return tokenAndState } interface ICategory<'event, 'state, string, 'context> with - member _.Load(log, streamName, allowStale) : Async = - match tryReadCache streamName with - | None -> category.Load(log, streamName, initial, checkUnfolds, fold, isOrigin) |> cache streamName - | Some tokenAndState when allowStale -> async { return tokenAndState } // read already updated TTL, no need to write - | Some (token, state) -> category.Reload(log, streamName, token, state, fold, isOrigin) |> cache streamName + member _.Load(log, streamName, allowStale) : Async = async { + match! tryReadCache streamName with + | None -> return! category.Load(log, streamName, initial, checkUnfolds, fold, isOrigin) |> cache streamName + | Some tokenAndState when allowStale -> return tokenAndState // read already updated TTL, no need to write + | Some (token, state) -> return! category.Reload(log, streamName, token, state, fold, isOrigin) |> cache streamName } member _.TrySync(log : ILogger, streamName, streamToken, state, events : 'event list, context) : Async> = async { match! category.Sync(log, streamName, streamToken, state, events, mapUnfolds, fold, isOrigin, context, compressUnfolds) with | SyncResult.Conflict resync -> return SyncResult.Conflict (cache streamName resync) | SyncResult.Written (token', state') -> - updateCache streamName (token', state') + do! updateCache streamName (token', state') return SyncResult.Written (token', state') } module ConnectionString = @@ -1430,7 +1430,7 @@ type CosmosStoreCategory<'event, 'state, 'context> let createCategory _name : ICategory<_, _, string, 'context> = let tryReadCache, updateCache = match caching with - | CachingStrategy.NoCaching -> (fun _ -> None), fun _ _ -> () + | CachingStrategy.NoCaching -> (fun _ -> async { return None }), fun _ _ -> async { () } | CachingStrategy.SlidingWindow (cache, window) -> cache.TryGet, Caching.applyCacheUpdatesWithSlidingExpiration (cache, null) window | CachingStrategy.FixedTimeSpan (cache, period) -> cache.TryGet, Caching.applyCacheUpdatesWithFixedTimeSpan (cache, null) period let isOrigin, checkUnfolds, mapUnfolds = diff --git a/src/Equinox.EventStore/EventStore.fs b/src/Equinox.EventStore/EventStore.fs index 016bf5d5b..dcf906855 100755 --- a/src/Equinox.EventStore/EventStore.fs +++ b/src/Equinox.EventStore/EventStore.fs @@ -503,11 +503,11 @@ type private Folder<'event, 'state, 'context>(category : Category<'event, 'state member _.Load(log, streamName, allowStale) : Async = match readCache with | None -> batched log streamName - | Some (cache : ICache, prefix : string) -> - match cache.TryGet(prefix + streamName) with - | None -> batched log streamName - | Some tokenAndState when allowStale -> async { return tokenAndState } - | Some (token, state) -> category.LoadFromToken fold state streamName token log + | Some (cache : ICache, prefix : string) -> async { + match! cache.TryGet(prefix + streamName) with + | None -> return! batched log streamName + | Some tokenAndState when allowStale -> return tokenAndState + | Some (token, state) -> return! category.LoadFromToken fold state streamName token log } member _.TrySync(log, streamName, token, initialState, events : 'event list, context) : Async> = async { match! category.TrySync(log, fold, streamName, token, initialState, events, context) with | SyncResult.Conflict resync -> return SyncResult.Conflict resync diff --git a/src/Equinox.SqlStreamStore/SqlStreamStore.fs b/src/Equinox.SqlStreamStore/SqlStreamStore.fs index 04c17b77a..32c006c6e 100644 --- a/src/Equinox.SqlStreamStore/SqlStreamStore.fs +++ b/src/Equinox.SqlStreamStore/SqlStreamStore.fs @@ -466,11 +466,11 @@ type private Folder<'event, 'state, 'context>(category : Category<'event, 'state member _.Load(log, streamName, allowStale) : Async = match readCache with | None -> batched log streamName - | Some (cache : ICache, prefix : string) -> - match cache.TryGet(prefix + streamName) with - | None -> batched log streamName - | Some tokenAndState when allowStale -> async { return tokenAndState } - | Some (token, state) -> category.LoadFromToken fold state streamName token log + | Some (cache : ICache, prefix : string) -> async { + match! cache.TryGet(prefix + streamName) with + | None -> return! batched log streamName + | Some tokenAndState when allowStale -> return tokenAndState + | Some (token, state) -> return! category.LoadFromToken fold state streamName token log } member _.TrySync(log : ILogger, streamName, streamToken, initialState, events : 'event list, context) : Async> = async { match! category.TrySync(log, fold, streamName, streamToken, initialState, events, context) with | SyncResult.Conflict resync -> return SyncResult.Conflict resync