Skip to content

Commit

Permalink
Moving Caching logic to central file (#318)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Mar 10, 2022
1 parent 0767c57 commit 6a9093d
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 135 deletions.
45 changes: 45 additions & 0 deletions src/Equinox.Core/Caching.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
module Equinox.Core.Caching

type internal Decorator<'event, 'state, 'context>(inner : ICategory<'event, 'state, string, 'context>, updateCache : string -> StreamToken * 'state -> Async<unit>) =

let cache streamName inner = async {
let! tokenAndState = inner
do! updateCache streamName tokenAndState
return tokenAndState }

interface ICategory<'event, 'state, string, 'context> with
member _.Load(log, streamName : string, opt) : Async<StreamToken * 'state> =
inner.Load(log, streamName, opt) |> cache streamName

member _.TrySync(log : Serilog.ILogger, streamName, streamToken, state, events : 'event list, context) : Async<SyncResult<'state>> = async {
match! inner.TrySync(log, streamName, streamToken, state, events, context) with
| SyncResult.Conflict resync -> return SyncResult.Conflict (resync |> cache streamName)
| SyncResult.Written (token', state') ->
do! updateCache streamName (token', state')
return SyncResult.Written (token', state') }

let applyCacheUpdatesWithSlidingExpiration
(cache : ICache)
(prefix : string)
(slidingExpiration : System.TimeSpan)
(category : ICategory<'event, 'state, string, 'context>)
supersedes
: ICategory<'event, 'state, string, 'context> =
let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = new CacheEntry<'state>(initialToken, initialState, supersedes)
let options = CacheItemOptions.RelativeExpiration slidingExpiration
let addOrUpdateSlidingExpirationCacheEntry streamName value = cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value)
Decorator<'event, 'state, 'context>(category, addOrUpdateSlidingExpirationCacheEntry) :> _

let applyCacheUpdatesWithFixedTimeSpan
(cache : ICache)
(prefix : string)
(lifetime : System.TimeSpan)
(category : ICategory<'event, 'state, string, 'context>)
supersedes
: ICategory<'event, 'state, string, 'context> =
let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = CacheEntry<'state>(initialToken, initialState, supersedes)
let addOrUpdateFixedLifetimeCacheEntry streamName value =
let expirationPoint = let creationDate = System.DateTimeOffset.UtcNow in creationDate.Add lifetime
let options = CacheItemOptions.AbsoluteExpiration expirationPoint
cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value)
Decorator<'event, 'state, 'context>(category, addOrUpdateFixedLifetimeCacheEntry) :> _
1 change: 1 addition & 0 deletions src/Equinox.Core/Equinox.Core.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<Compile Include="AsyncCacheCell.fs" />
<Compile Include="Cache.fs" />
<Compile Include="AsyncBatchingGate.fs" />
<Compile Include="Caching.fs" />
</ItemGroup>

<ItemGroup>
Expand Down
17 changes: 6 additions & 11 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1132,23 +1132,22 @@ module internal Caching =
checkUnfolds, compressUnfolds,
mapUnfolds : Choice<unit, 'event list -> 'state -> 'event seq, 'event list -> 'state -> 'event list * 'event list>) =
let cache streamName inner = async {
let! ts = inner
do! updateCache streamName ts
return ts }
let! tokenAndState = inner
do! updateCache streamName tokenAndState
return tokenAndState }
interface ICategory<'event, 'state, string, 'context> with
member _.Load(log, streamName, allowStale) : Async<StreamToken * 'state> = async {
match! tryReadCache streamName with
| None -> return! category.Load(log, streamName, initial, checkUnfolds, fold, isOrigin) |> cache streamName
| Some tokenAndState when allowStale -> return tokenAndState // read already updated TTL, no need to write
| Some (token, state) -> return! category.Reload(log, streamName, token, state, fold, isOrigin) |> cache streamName }
member _.TrySync(log : ILogger, streamName, streamToken, state, events : 'event list, context)
: Async<SyncResult<'state>> = async {
member _.TrySync(log : ILogger, streamName, streamToken, state, events : 'event list, context) : Async<SyncResult<'state>> = async {
match! category.Sync(log, streamName, streamToken, state, events, mapUnfolds, fold, isOrigin, context, compressUnfolds) with
| SyncResult.Conflict resync ->
return SyncResult.Conflict (cache streamName resync)
| SyncResult.Written (token', state') ->
let! res = cache streamName (async { return token', state' })
return SyncResult.Written res }
do! updateCache streamName (token', state')
return SyncResult.Written (token', state') }

module ConnectionString =

Expand All @@ -1159,12 +1158,10 @@ module ConnectionString =

namespace Equinox.CosmosStore

open Equinox
open Equinox.Core
open Equinox.CosmosStore.Core
open FsCodec
open Microsoft.Azure.Cosmos
open Serilog
open System

[<RequireQualifiedAccess; NoComparison>]
Expand Down Expand Up @@ -1447,13 +1444,11 @@ type CosmosStoreCategory<'event, 'state, 'context>
let cosmosCat = Category<'event, 'state, 'context>(container, codec)
Caching.CachingCategory<'event, 'state, 'context>(cosmosCat, fold, initial, isOrigin, tryReadCache, updateCache, checkUnfolds, compressUnfolds, mapUnfolds) :> _
categories.GetOrAdd(categoryName, createCategory)

let resolve (StreamName.CategoryAndId (categoryName, streamId)) =
let container, streamName, maybeContainerInitializationGate = context.ResolveContainerClientAndStreamIdAndInit(categoryName, streamId)
resolveCategory (categoryName, container), streamName, maybeContainerInitializationGate
let empty = Token.create Position.fromKnownEmpty, initial
let storeCategory = StoreCategory(resolve, empty)

member _.Resolve(streamName, ?context) = storeCategory.Resolve(streamName, ?context = context)

namespace Equinox.CosmosStore.Core
Expand Down
75 changes: 9 additions & 66 deletions src/Equinox.EventStore/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,7 @@ module private Write =
/// Yields `EsSyncResult.Written` or `EsSyncResult.Conflict` to signify WrongExpectedVersion
let private writeEventsAsync (log : ILogger) (conn : IEventStoreConnection) (streamName : string) (version : int64) (events : EventData[])
: Async<EsSyncResult> = async {
try
let! wr = conn.AppendToStreamAsync(streamName, version, events) |> Async.AwaitTaskCorrect
try let! wr = conn.AppendToStreamAsync(streamName, version, events) |> Async.AwaitTaskCorrect
return EsSyncResult.Written wr
with :? EventStore.ClientAPI.Exceptions.WrongExpectedVersionException as ex ->
log.Information(ex, "Ges TrySync WrongExpectedVersionException writing {EventTypes}, actual {ActualVersion}",
Expand Down Expand Up @@ -408,8 +407,7 @@ type EventStoreContext(conn : EventStoreConnection, batching : BatchingPolicy) =

member _.TrySync log streamName (Token.Unpack token as streamToken) (events, encodedEvents : EventData array) isCompactionEventType : Async<GatewaySyncResult> = async {
let streamVersion = token.streamVersion
let! wr = Write.writeEvents log conn.WriteRetryPolicy conn.WriteConnection streamName streamVersion encodedEvents
match wr with
match! Write.writeEvents log conn.WriteRetryPolicy conn.WriteConnection streamName streamVersion encodedEvents with
| EsSyncResult.Conflict actualVersion ->
return GatewaySyncResult.ConflictUnknown (Token.ofNonCompacting actualVersion)
| EsSyncResult.Written wr ->
Expand All @@ -426,8 +424,7 @@ type EventStoreContext(conn : EventStoreConnection, batching : BatchingPolicy) =

member _.Sync(log, streamName, streamVersion, events : FsCodec.IEventData<byte[]>[]) : Async<GatewaySyncResult> = async {
let encodedEvents : EventData[] = events |> Array.map UnionEncoderAdapters.eventDataOfEncodedEvent
let! wr = Write.writeEvents log conn.WriteRetryPolicy conn.WriteConnection streamName streamVersion encodedEvents
match wr with
match! Write.writeEvents log conn.WriteRetryPolicy conn.WriteConnection streamName streamVersion encodedEvents with
| EsSyncResult.Conflict actualVersion ->
return GatewaySyncResult.ConflictUnknown (Token.ofNonCompacting actualVersion)
| EsSyncResult.Written wr ->
Expand Down Expand Up @@ -493,62 +490,13 @@ type private Category<'event, 'state, 'context>(context : EventStoreContext, cod
| Some (AccessStrategy.RollingSnapshots (_, compact)) ->
let cc = CompactionContext(List.length events, token.batchCapacityLimit.Value)
if cc.IsCompactionDue then events @ [fold state events |> compact] else events

let encodedEvents : EventData[] = events |> Seq.map (encode >> UnionEncoderAdapters.eventDataOfEncodedEvent) |> Array.ofSeq
let! syncRes = context.TrySync log streamName streamToken (events, encodedEvents) compactionPredicate
match syncRes with
match! context.TrySync log streamName streamToken (events, encodedEvents) compactionPredicate with
| GatewaySyncResult.ConflictUnknown _ ->
return SyncResult.Conflict (load fold state (context.LoadFromToken true streamName log streamToken (tryDecode, compactionPredicate)))
| GatewaySyncResult.Written token' ->
return SyncResult.Written (token', fold state (Seq.ofList events)) }

module Caching =
/// Forwards all state changes in all streams of an ICategory to a `tee` function
type CategoryTee<'event, 'state, 'context>(inner : ICategory<'event, 'state, string, 'context>, tee : string -> StreamToken * 'state -> Async<unit>) =
let intercept streamName tokenAndState = async {
let! _ = tee streamName tokenAndState
return tokenAndState }

let loadAndIntercept load streamName = async {
let! tokenAndState = load
return! intercept streamName tokenAndState }

interface ICategory<'event, 'state, string, 'context> with
member _.Load(log, streamName : string, opt) : Async<StreamToken * 'state> =
loadAndIntercept (inner.Load(log, streamName, opt)) streamName

member _.TrySync(log : ILogger, stream, token, state, events : 'event list, context) : Async<SyncResult<'state>> = async {
let! syncRes = inner.TrySync(log, stream, token, state, events, context)
match syncRes with
| SyncResult.Conflict resync -> return SyncResult.Conflict (loadAndIntercept resync stream)
| SyncResult.Written (token', state') ->
let! intercepted = intercept stream (token', state')
return SyncResult.Written intercepted }

let applyCacheUpdatesWithSlidingExpiration
(cache : ICache)
(prefix : string)
(slidingExpiration : TimeSpan)
(category : ICategory<'event, 'state, string, 'context>)
: ICategory<'event, 'state, string, 'context> =
let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes)
let options = CacheItemOptions.RelativeExpiration slidingExpiration
let addOrUpdateSlidingExpirationCacheEntry streamName value = cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value)
CategoryTee<'event, 'state, 'context>(category, addOrUpdateSlidingExpirationCacheEntry) :> _

let applyCacheUpdatesWithFixedTimeSpan
(cache : ICache)
(prefix : string)
(period : TimeSpan)
(category : ICategory<'event, 'state, string, 'context>)
: ICategory<'event, 'state, string, 'context> =
let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = CacheEntry<'state>(initialToken, initialState, Token.supersedes)
let addOrUpdateFixedLifetimeCacheEntry streamName value =
let expirationPoint = let creationDate = DateTimeOffset.UtcNow in creationDate.Add period
let options = CacheItemOptions.AbsoluteExpiration expirationPoint
cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value)
CategoryTee<'event, 'state, 'context>(category, addOrUpdateFixedLifetimeCacheEntry) :> _

type private Folder<'event, 'state, 'context>(category : Category<'event, 'state, 'context>, fold : 'state -> 'event seq -> 'state, initial : 'state, ?readCache) =
let batched log streamName = category.Load fold initial streamName log
interface ICategory<'event, 'state, string, 'context> with
Expand All @@ -560,10 +508,8 @@ type private Folder<'event, 'state, 'context>(category : Category<'event, 'state
| None -> return! batched log streamName
| Some tokenAndState when allowStale -> return tokenAndState
| Some (token, state) -> return! category.LoadFromToken fold state streamName token log }

member _.TrySync(log : ILogger, streamName, token, initialState, events : 'event list, context) : Async<SyncResult<'state>> = async {
let! syncRes = category.TrySync(log, fold, streamName, token, initialState, events, context)
match syncRes with
member _.TrySync(log, streamName, token, initialState, events : 'event list, context) : Async<SyncResult<'state>> = async {
match! category.TrySync(log, fold, streamName, token, initialState, events, context) with
| SyncResult.Conflict resync -> return SyncResult.Conflict resync
| SyncResult.Written (token', state') -> return SyncResult.Written (token', state') }

Expand Down Expand Up @@ -597,26 +543,23 @@ type EventStoreCategory<'event, 'state, 'context>
+ "mixing AccessStrategy.LatestKnownEvent with Caching at present."
|> invalidOp
| _ -> ()

let inner = Category<'event, 'state, 'context>(context, codec, ?access = access)
let readCacheOption =
match caching with
| None -> None
| Some (CachingStrategy.SlidingWindow (cache, _))
| Some (CachingStrategy.FixedTimeSpan (cache, _)) -> Some (cache, null)
| Some (CachingStrategy.SlidingWindowPrefixed (cache, _, prefix)) -> Some (cache, prefix)

let folder = Folder<'event, 'state, 'context>(inner, fold, initial, ?readCache = readCacheOption)

let category : ICategory<_, _, _, 'context> =
match caching with
| None -> folder :> _
| Some (CachingStrategy.SlidingWindow (cache, window)) ->
Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder
Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder Token.supersedes
| Some (CachingStrategy.FixedTimeSpan (cache, period)) ->
Caching.applyCacheUpdatesWithFixedTimeSpan cache null period folder
Caching.applyCacheUpdatesWithFixedTimeSpan cache null period folder Token.supersedes
| Some (CachingStrategy.SlidingWindowPrefixed (cache, window, prefix)) ->
Caching.applyCacheUpdatesWithSlidingExpiration cache prefix window folder
Caching.applyCacheUpdatesWithSlidingExpiration cache prefix window folder Token.supersedes
let resolve streamName = category, FsCodec.StreamName.toString streamName, None
let empty = context.TokenEmpty, initial
let storeCategory = StoreCategory(resolve, empty)
Expand Down
Loading

0 comments on commit 6a9093d

Please sign in to comment.