Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moving Caching logic to central file #318

Merged
merged 6 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions src/Equinox.Core/Caching.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
module Equinox.Core.Caching

type internal Decorator<'event, 'state, 'context>(inner : ICategory<'event, 'state, string, 'context>, updateCache : string -> StreamToken * 'state -> Async<unit>) =

let cache streamName inner = async {
let! tokenAndState = inner
do! updateCache streamName tokenAndState
return tokenAndState }

interface ICategory<'event, 'state, string, 'context> with
member _.Load(log, streamName : string, opt) : Async<StreamToken * 'state> =
inner.Load(log, streamName, opt) |> cache streamName

member _.TrySync(log : Serilog.ILogger, streamName, streamToken, state, events : 'event list, context) : Async<SyncResult<'state>> = async {
match! inner.TrySync(log, streamName, streamToken, state, events, context) with
| SyncResult.Conflict resync -> return SyncResult.Conflict (resync |> cache streamName)
| SyncResult.Written (token', state') ->
do! updateCache streamName (token', state')
return SyncResult.Written (token', state') }

let applyCacheUpdatesWithSlidingExpiration
(cache : ICache)
(prefix : string)
(slidingExpiration : System.TimeSpan)
(category : ICategory<'event, 'state, string, 'context>)
supersedes
: ICategory<'event, 'state, string, 'context> =
let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = new CacheEntry<'state>(initialToken, initialState, supersedes)
let options = CacheItemOptions.RelativeExpiration slidingExpiration
let addOrUpdateSlidingExpirationCacheEntry streamName value = cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value)
Decorator<'event, 'state, 'context>(category, addOrUpdateSlidingExpirationCacheEntry) :> _

let applyCacheUpdatesWithFixedTimeSpan
(cache : ICache)
(prefix : string)
(lifetime : System.TimeSpan)
(category : ICategory<'event, 'state, string, 'context>)
supersedes
: ICategory<'event, 'state, string, 'context> =
let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = CacheEntry<'state>(initialToken, initialState, supersedes)
let addOrUpdateFixedLifetimeCacheEntry streamName value =
let expirationPoint = let creationDate = System.DateTimeOffset.UtcNow in creationDate.Add lifetime
let options = CacheItemOptions.AbsoluteExpiration expirationPoint
cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value)
Decorator<'event, 'state, 'context>(category, addOrUpdateFixedLifetimeCacheEntry) :> _
1 change: 1 addition & 0 deletions src/Equinox.Core/Equinox.Core.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<Compile Include="AsyncCacheCell.fs" />
<Compile Include="Cache.fs" />
<Compile Include="AsyncBatchingGate.fs" />
<Compile Include="Caching.fs" />
</ItemGroup>

<ItemGroup>
Expand Down
17 changes: 6 additions & 11 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1132,23 +1132,22 @@ module internal Caching =
checkUnfolds, compressUnfolds,
mapUnfolds : Choice<unit, 'event list -> 'state -> 'event seq, 'event list -> 'state -> 'event list * 'event list>) =
let cache streamName inner = async {
let! ts = inner
do! updateCache streamName ts
return ts }
let! tokenAndState = inner
do! updateCache streamName tokenAndState
return tokenAndState }
interface ICategory<'event, 'state, string, 'context> with
member _.Load(log, streamName, allowStale) : Async<StreamToken * 'state> = async {
match! tryReadCache streamName with
| None -> return! category.Load(log, streamName, initial, checkUnfolds, fold, isOrigin) |> cache streamName
| Some tokenAndState when allowStale -> return tokenAndState // read already updated TTL, no need to write
| Some (token, state) -> return! category.Reload(log, streamName, token, state, fold, isOrigin) |> cache streamName }
member _.TrySync(log : ILogger, streamName, streamToken, state, events : 'event list, context)
: Async<SyncResult<'state>> = async {
member _.TrySync(log : ILogger, streamName, streamToken, state, events : 'event list, context) : Async<SyncResult<'state>> = async {
match! category.Sync(log, streamName, streamToken, state, events, mapUnfolds, fold, isOrigin, context, compressUnfolds) with
| SyncResult.Conflict resync ->
return SyncResult.Conflict (cache streamName resync)
| SyncResult.Written (token', state') ->
let! res = cache streamName (async { return token', state' })
return SyncResult.Written res }
do! updateCache streamName (token', state')
return SyncResult.Written (token', state') }

module ConnectionString =

Expand All @@ -1159,12 +1158,10 @@ module ConnectionString =

namespace Equinox.CosmosStore

open Equinox
open Equinox.Core
open Equinox.CosmosStore.Core
open FsCodec
open Microsoft.Azure.Cosmos
open Serilog
open System

[<RequireQualifiedAccess; NoComparison>]
Expand Down Expand Up @@ -1447,13 +1444,11 @@ type CosmosStoreCategory<'event, 'state, 'context>
let cosmosCat = Category<'event, 'state, 'context>(container, codec)
Caching.CachingCategory<'event, 'state, 'context>(cosmosCat, fold, initial, isOrigin, tryReadCache, updateCache, checkUnfolds, compressUnfolds, mapUnfolds) :> _
categories.GetOrAdd(categoryName, createCategory)

let resolve (StreamName.CategoryAndId (categoryName, streamId)) =
let container, streamName, maybeContainerInitializationGate = context.ResolveContainerClientAndStreamIdAndInit(categoryName, streamId)
resolveCategory (categoryName, container), streamName, maybeContainerInitializationGate
let empty = Token.create Position.fromKnownEmpty, initial
let storeCategory = StoreCategory(resolve, empty)

member _.Resolve(streamName, ?context) = storeCategory.Resolve(streamName, ?context = context)

namespace Equinox.CosmosStore.Core
Expand Down
75 changes: 9 additions & 66 deletions src/Equinox.EventStore/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,7 @@ module private Write =
/// Yields `EsSyncResult.Written` or `EsSyncResult.Conflict` to signify WrongExpectedVersion
let private writeEventsAsync (log : ILogger) (conn : IEventStoreConnection) (streamName : string) (version : int64) (events : EventData[])
: Async<EsSyncResult> = async {
try
let! wr = conn.AppendToStreamAsync(streamName, version, events) |> Async.AwaitTaskCorrect
try let! wr = conn.AppendToStreamAsync(streamName, version, events) |> Async.AwaitTaskCorrect
return EsSyncResult.Written wr
with :? EventStore.ClientAPI.Exceptions.WrongExpectedVersionException as ex ->
log.Information(ex, "Ges TrySync WrongExpectedVersionException writing {EventTypes}, actual {ActualVersion}",
Expand Down Expand Up @@ -408,8 +407,7 @@ type EventStoreContext(conn : EventStoreConnection, batching : BatchingPolicy) =

member _.TrySync log streamName (Token.Unpack token as streamToken) (events, encodedEvents : EventData array) isCompactionEventType : Async<GatewaySyncResult> = async {
let streamVersion = token.streamVersion
let! wr = Write.writeEvents log conn.WriteRetryPolicy conn.WriteConnection streamName streamVersion encodedEvents
match wr with
match! Write.writeEvents log conn.WriteRetryPolicy conn.WriteConnection streamName streamVersion encodedEvents with
| EsSyncResult.Conflict actualVersion ->
return GatewaySyncResult.ConflictUnknown (Token.ofNonCompacting actualVersion)
| EsSyncResult.Written wr ->
Expand All @@ -426,8 +424,7 @@ type EventStoreContext(conn : EventStoreConnection, batching : BatchingPolicy) =

member _.Sync(log, streamName, streamVersion, events : FsCodec.IEventData<byte[]>[]) : Async<GatewaySyncResult> = async {
let encodedEvents : EventData[] = events |> Array.map UnionEncoderAdapters.eventDataOfEncodedEvent
let! wr = Write.writeEvents log conn.WriteRetryPolicy conn.WriteConnection streamName streamVersion encodedEvents
match wr with
match! Write.writeEvents log conn.WriteRetryPolicy conn.WriteConnection streamName streamVersion encodedEvents with
| EsSyncResult.Conflict actualVersion ->
return GatewaySyncResult.ConflictUnknown (Token.ofNonCompacting actualVersion)
| EsSyncResult.Written wr ->
Expand Down Expand Up @@ -493,62 +490,13 @@ type private Category<'event, 'state, 'context>(context : EventStoreContext, cod
| Some (AccessStrategy.RollingSnapshots (_, compact)) ->
let cc = CompactionContext(List.length events, token.batchCapacityLimit.Value)
if cc.IsCompactionDue then events @ [fold state events |> compact] else events

let encodedEvents : EventData[] = events |> Seq.map (encode >> UnionEncoderAdapters.eventDataOfEncodedEvent) |> Array.ofSeq
let! syncRes = context.TrySync log streamName streamToken (events, encodedEvents) compactionPredicate
match syncRes with
match! context.TrySync log streamName streamToken (events, encodedEvents) compactionPredicate with
| GatewaySyncResult.ConflictUnknown _ ->
return SyncResult.Conflict (load fold state (context.LoadFromToken true streamName log streamToken (tryDecode, compactionPredicate)))
| GatewaySyncResult.Written token' ->
return SyncResult.Written (token', fold state (Seq.ofList events)) }

module Caching =
/// Forwards all state changes in all streams of an ICategory to a `tee` function
type CategoryTee<'event, 'state, 'context>(inner : ICategory<'event, 'state, string, 'context>, tee : string -> StreamToken * 'state -> Async<unit>) =
let intercept streamName tokenAndState = async {
let! _ = tee streamName tokenAndState
return tokenAndState }

let loadAndIntercept load streamName = async {
let! tokenAndState = load
return! intercept streamName tokenAndState }

interface ICategory<'event, 'state, string, 'context> with
member _.Load(log, streamName : string, opt) : Async<StreamToken * 'state> =
loadAndIntercept (inner.Load(log, streamName, opt)) streamName

member _.TrySync(log : ILogger, stream, token, state, events : 'event list, context) : Async<SyncResult<'state>> = async {
let! syncRes = inner.TrySync(log, stream, token, state, events, context)
match syncRes with
| SyncResult.Conflict resync -> return SyncResult.Conflict (loadAndIntercept resync stream)
| SyncResult.Written (token', state') ->
let! intercepted = intercept stream (token', state')
return SyncResult.Written intercepted }

let applyCacheUpdatesWithSlidingExpiration
(cache : ICache)
(prefix : string)
(slidingExpiration : TimeSpan)
(category : ICategory<'event, 'state, string, 'context>)
: ICategory<'event, 'state, string, 'context> =
let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes)
let options = CacheItemOptions.RelativeExpiration slidingExpiration
let addOrUpdateSlidingExpirationCacheEntry streamName value = cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value)
CategoryTee<'event, 'state, 'context>(category, addOrUpdateSlidingExpirationCacheEntry) :> _

let applyCacheUpdatesWithFixedTimeSpan
(cache : ICache)
(prefix : string)
(period : TimeSpan)
(category : ICategory<'event, 'state, string, 'context>)
: ICategory<'event, 'state, string, 'context> =
let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = CacheEntry<'state>(initialToken, initialState, Token.supersedes)
let addOrUpdateFixedLifetimeCacheEntry streamName value =
let expirationPoint = let creationDate = DateTimeOffset.UtcNow in creationDate.Add period
let options = CacheItemOptions.AbsoluteExpiration expirationPoint
cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value)
CategoryTee<'event, 'state, 'context>(category, addOrUpdateFixedLifetimeCacheEntry) :> _

type private Folder<'event, 'state, 'context>(category : Category<'event, 'state, 'context>, fold : 'state -> 'event seq -> 'state, initial : 'state, ?readCache) =
let batched log streamName = category.Load fold initial streamName log
interface ICategory<'event, 'state, string, 'context> with
Expand All @@ -560,10 +508,8 @@ type private Folder<'event, 'state, 'context>(category : Category<'event, 'state
| None -> return! batched log streamName
| Some tokenAndState when allowStale -> return tokenAndState
| Some (token, state) -> return! category.LoadFromToken fold state streamName token log }

member _.TrySync(log : ILogger, streamName, token, initialState, events : 'event list, context) : Async<SyncResult<'state>> = async {
let! syncRes = category.TrySync(log, fold, streamName, token, initialState, events, context)
match syncRes with
member _.TrySync(log, streamName, token, initialState, events : 'event list, context) : Async<SyncResult<'state>> = async {
match! category.TrySync(log, fold, streamName, token, initialState, events, context) with
| SyncResult.Conflict resync -> return SyncResult.Conflict resync
| SyncResult.Written (token', state') -> return SyncResult.Written (token', state') }

Expand Down Expand Up @@ -597,26 +543,23 @@ type EventStoreCategory<'event, 'state, 'context>
+ "mixing AccessStrategy.LatestKnownEvent with Caching at present."
|> invalidOp
| _ -> ()

let inner = Category<'event, 'state, 'context>(context, codec, ?access = access)
let readCacheOption =
match caching with
| None -> None
| Some (CachingStrategy.SlidingWindow (cache, _))
| Some (CachingStrategy.FixedTimeSpan (cache, _)) -> Some (cache, null)
| Some (CachingStrategy.SlidingWindowPrefixed (cache, _, prefix)) -> Some (cache, prefix)

let folder = Folder<'event, 'state, 'context>(inner, fold, initial, ?readCache = readCacheOption)

let category : ICategory<_, _, _, 'context> =
match caching with
| None -> folder :> _
| Some (CachingStrategy.SlidingWindow (cache, window)) ->
Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder
Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder Token.supersedes
| Some (CachingStrategy.FixedTimeSpan (cache, period)) ->
Caching.applyCacheUpdatesWithFixedTimeSpan cache null period folder
Caching.applyCacheUpdatesWithFixedTimeSpan cache null period folder Token.supersedes
| Some (CachingStrategy.SlidingWindowPrefixed (cache, window, prefix)) ->
Caching.applyCacheUpdatesWithSlidingExpiration cache prefix window folder
Caching.applyCacheUpdatesWithSlidingExpiration cache prefix window folder Token.supersedes
let resolve streamName = category, FsCodec.StreamName.toString streamName, None
let empty = context.TokenEmpty, initial
let storeCategory = StoreCategory(resolve, empty)
Expand Down
Loading