Skip to content

Commit

Permalink
Remove optionality from Cosmos caching (#107) resolves #104
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Mar 5, 2019
1 parent f166b8b commit 365f8ef
Show file tree
Hide file tree
Showing 12 changed files with 50 additions and 35 deletions.
7 changes: 5 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

The repo is versioned based on [SemVer 2.0](https://semver.org/spec/v2.0.0.html) using the tiny-but-mighty [MinVer](https://github.com/adamralph/minver) from [@adamralph](https://github.com/adamralph). [See here](https://github.com/adamralph/minver#how-it-works) for more information on how it works.

All notable changes to this project will be documented in this file.
All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
The `Unreleased` section name is replaced by the expected version of next release. A stable version's log contains all changes between that version and the previous stable version (can duplicate the prereleases logs).

_NB at the present time, this project does not adhere strictly to Semantic Versioning - small binary-breaking changes may occur without a change in Major at the until this notice is removed (it will be!)._
Expand All @@ -13,6 +12,10 @@ _NB at the present time, this project does not adhere strictly to Semantic Versi

### Added
### Changed

- Make `caching` non-optional in CosmosStreamResolver; add `NoCaching` cache mode for `Equinox.Cosmos` [#104](https://github.com/jet/equinox/issues/104) @jakzale
- Reorder `caching` and `access` in `GesStreamResolver` to match `CosmosStreamResolver` [#107](https://github.com/jet/equinox/issues/107)

### Removed
### Fixed

Expand Down
8 changes: 4 additions & 4 deletions samples/Infrastructure/Services.fs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ type StreamResolver(storage) =
match storage with
| Storage.StorageConfig.Memory store ->
Equinox.MemoryStore.MemoryResolver(store, fold, initial).Resolve
| Storage.StorageConfig.Es (gateway, cache, unfolds) ->
| Storage.StorageConfig.Es (gateway, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.EventStore.AccessStrategy.RollingSnapshots snapshot |> Some else None
Equinox.EventStore.GesResolver<'event,'state>(gateway, codec, fold, initial, ?access = accessStrategy, ?caching = cache).Resolve
| Storage.StorageConfig.Cosmos (gateway, cache, unfolds, databaseId, collectionId) ->
Equinox.EventStore.GesResolver<'event,'state>(gateway, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve
| Storage.StorageConfig.Cosmos (gateway, caching, unfolds, databaseId, collectionId) ->
let store = Equinox.Cosmos.CosmosStore(gateway, databaseId, collectionId)
let accessStrategy = if unfolds then Equinox.Cosmos.AccessStrategy.Snapshot snapshot |> Some else None
Equinox.Cosmos.CosmosResolver<'event,'state>(store, codec, fold, initial, ?access = accessStrategy, ?caching = cache).Resolve
Equinox.Cosmos.CosmosResolver<'event,'state>(store, codec, fold, initial, caching, ?access = accessStrategy).Resolve

type ServiceBuilder(storageConfig, handlerLog) =
let resolver = StreamResolver(storageConfig)
Expand Down
6 changes: 3 additions & 3 deletions samples/Infrastructure/Storage.fs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ let defaultBatchSize = 500
type StorageConfig =
| Memory of Equinox.MemoryStore.VolatileStore
| Es of Equinox.EventStore.GesGateway * Equinox.EventStore.CachingStrategy option * unfolds: bool
| Cosmos of Equinox.Cosmos.CosmosGateway * Equinox.Cosmos.CachingStrategy option * unfolds: bool * databaseId: string * collectionId: string
| Cosmos of Equinox.Cosmos.CosmosGateway * Equinox.Cosmos.CachingStrategy * unfolds: bool * databaseId: string * collectionId: string

module MemoryStore =
let config () =
Expand Down Expand Up @@ -129,6 +129,6 @@ module Cosmos =
let cacheStrategy =
if cache then
let c = Caching.Cache("equinox-tool", sizeMb = 50)
CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some
else None
CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.)
else CachingStrategy.NoCaching
StorageConfig.Cosmos (createGateway conn (defaultBatchSize,pageSize), cacheStrategy, unfolds, dbName, collName)
6 changes: 3 additions & 3 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ let createServiceMemory log store =
let codec = Equinox.EventStore.Integration.EventStoreIntegration.genCodec<Domain.Cart.Events.Event>()

let resolveGesStreamWithRollingSnapshots gateway =
GesResolver(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots snapshot).Resolve
GesResolver(gateway, codec, fold, initial, access = AccessStrategy.RollingSnapshots snapshot).Resolve
let resolveGesStreamWithoutCustomAccessStrategy gateway =
GesResolver(gateway, codec, fold, initial).Resolve

let resolveCosmosStreamWithProjection gateway =
CosmosResolver(gateway, codec, fold, initial, AccessStrategy.Snapshot snapshot).Resolve
CosmosResolver(gateway, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.Snapshot snapshot).Resolve
let resolveCosmosStreamWithoutCustomAccessStrategy gateway =
CosmosResolver(gateway, codec, fold, initial).Resolve
CosmosResolver(gateway, codec, fold, initial, CachingStrategy.NoCaching).Resolve

let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count =
service.FlowAsync(cartId, fun _ctx execute ->
Expand Down
6 changes: 3 additions & 3 deletions samples/Store/Integration/ContactPreferencesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ let createServiceMemory log store =

let codec = genCodec<Domain.ContactPreferences.Events.Event>()
let resolveStreamGesWithOptimizedStorageSemantics gateway =
GesResolver(gateway 1, codec, fold, initial, AccessStrategy.EventsAreState).Resolve
GesResolver(gateway 1, codec, fold, initial, access = AccessStrategy.EventsAreState).Resolve
let resolveStreamGesWithoutAccessStrategy gateway =
GesResolver(gateway defaultBatchSize, codec, fold, initial).Resolve

let resolveStreamCosmosWithKnownEventTypeSemantics gateway =
CosmosResolver(gateway 1, codec, fold, initial, AccessStrategy.AnyKnownEventType).Resolve
CosmosResolver(gateway 1, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.AnyKnownEventType).Resolve
let resolveStreamCosmosWithoutCustomAccessStrategy gateway =
CosmosResolver(gateway defaultBatchSize, codec, fold, initial).Resolve
CosmosResolver(gateway defaultBatchSize, codec, fold, initial, CachingStrategy.NoCaching).Resolve

type Tests(testOutputHelper) =
let testOutput = TestOutputAdapter testOutputHelper
Expand Down
4 changes: 2 additions & 2 deletions samples/Store/Integration/FavoritesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ let createServiceMemory log store =

let codec = genCodec<Domain.Favorites.Events.Event>()
let createServiceGes gateway log =
let resolveStream = GesResolver(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots snapshot).Resolve
let resolveStream = GesResolver(gateway, codec, fold, initial, access = AccessStrategy.RollingSnapshots snapshot).Resolve
Backend.Favorites.Service(log, resolveStream)

let createServiceCosmos gateway log =
let resolveStream = CosmosResolver(gateway, codec, fold, initial, AccessStrategy.Snapshot snapshot).Resolve
let resolveStream = CosmosResolver(gateway, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.Snapshot snapshot).Resolve
Backend.Favorites.Service(log, resolveStream)

type Tests(testOutputHelper) =
Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Cosmos.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ module Store =

module FavoritesCategory =
let codec = Equinox.UnionCodec.JsonUtf8.Create<Favorites.Event>(Newtonsoft.Json.JsonSerializerSettings())
let resolve = CosmosResolver(Store.store, codec, Favorites.fold, Favorites.initial, caching=Store.cacheStrategy).Resolve
let resolve = CosmosResolver(Store.store, codec, Favorites.fold, Favorites.initial, Store.cacheStrategy).Resolve

let service = Favorites.Service(Log.log, FavoritesCategory.resolve)

Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Todo.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ module Store =
module TodosCategory =
let codec = Equinox.UnionCodec.JsonUtf8.Create<Event>(Newtonsoft.Json.JsonSerializerSettings())
let access = Equinox.Cosmos.AccessStrategy.Snapshot (isOrigin,compact)
let resolve = CosmosResolver(Store.store, codec, fold, initial, access=access, caching=Store.cacheStrategy).Resolve
let resolve = CosmosResolver(Store.store, codec, fold, initial, Store.cacheStrategy, access=access).Resolve

let service = Service(log, TodosCategory.resolve)

Expand Down
17 changes: 12 additions & 5 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,13 @@ type CosmosStore(gateway: CosmosGateway, collections: CosmosCollections, [<O; D(

[<NoComparison; NoEquality; RequireQualifiedAccess>]
type CachingStrategy =
/// Do not apply any caching strategy for this Stream.
/// NB opting not to leverage caching when using CosmosDb can have significant implications for the scalability
/// of your application, both in terms of latency and running costs.
/// While the cost of a cache miss can be ameliorated to varying degrees by employing an appropriate `AccessStrategy`
/// [that works well and has been validated for your scenario with real data], even a cache with a low Hit Rate provides
/// a direct benefit in terms of the number of Request Unit (RU)s that need to be provisioned to your CosmosDb instances.
| NoCaching
/// Retain a single 'state per streamName, together with the associated etag
/// NB while a strategy like EventStore.Caching.SlidingWindowPrefixed is obviously easy to implement, the recommended approach is to
/// track all relevant data in the state, and/or have the `unfold` function ensure _all_ relevant events get held in the `u`nfolds in tip
Expand All @@ -890,11 +897,11 @@ type AccessStrategy<'event,'state> =
/// Trust every event type as being an origin
| AnyKnownEventType

type CosmosResolver<'event, 'state>(store : CosmosStore, codec, fold, initial, [<O; D(null)>]?access, [<O; D(null)>]?caching) =
type CosmosResolver<'event, 'state>(store : CosmosStore, codec, fold, initial, caching, [<O; D(null)>]?access) =
let readCacheOption =
match caching with
| None -> None
| Some (CachingStrategy.SlidingWindow(cache, _)) -> Some(cache, null)
| CachingStrategy.NoCaching -> None
| CachingStrategy.SlidingWindow(cache, _) -> Some(cache, null)
let isOrigin, projectOption =
match access with
| None -> (fun _ -> false), None
Expand All @@ -905,8 +912,8 @@ type CosmosResolver<'event, 'state>(store : CosmosStore, codec, fold, initial, [
let folder = Folder<'event, 'state>(cosmosCat, fold, initial, isOrigin, ?unfold=projectOption, ?readCache = readCacheOption)
let category : Store.ICategory<_,_,CollectionStream> =
match caching with
| None -> folder :> _
| Some (CachingStrategy.SlidingWindow(cache, window)) ->
| CachingStrategy.NoCaching -> folder :> _
| CachingStrategy.SlidingWindow(cache, window) ->
Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder

let resolveStream (streamId, maybeCollectionInitializationGate) =
Expand Down
7 changes: 6 additions & 1 deletion src/Equinox.EventStore/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,12 @@ type CachingStrategy =
/// Prefix is used to segregate multiple folds per stream when they are stored in the cache
| SlidingWindowPrefixed of Caching.Cache * window: TimeSpan * prefix: string

type GesResolver<'event,'state>(gateway : GesGateway, codec, fold, initial, [<O; D(null)>]?access, [<O; D(null)>]?caching) =
type GesResolver<'event,'state>
( gateway : GesGateway, codec, fold, initial,
/// Caching can be overkill for EventStore esp considering the degree to which its intrinsic caching is a first class feature
/// e.g., A key benefit is that reads of streams more than a few pages long get completed in constant time after the initial load
[<O; D(null)>]?caching,
[<O; D(null)>]?access) =
do match access with
| Some (AccessStrategy.EventsAreState) when Option.isSome caching ->
"Equinox.EventStore does not support (and it would make things _less_ efficient even if it did)"
Expand Down
12 changes: 6 additions & 6 deletions tests/Equinox.Cosmos.Integration/CosmosIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,32 @@ module Cart =
let codec = genCodec<Domain.Cart.Events.Event>()
let createServiceWithoutOptimization connection batchSize log =
let store = createCosmosStore connection batchSize
let resolveStream = CosmosResolver(store, codec, fold, initial).Resolve
let resolveStream = CosmosResolver(store, codec, fold, initial, CachingStrategy.NoCaching).Resolve
Backend.Cart.Service(log, resolveStream)
let createServiceWithoutOptimizationAndMaxItems connection batchSize maxEventsPerSlice log =
let store = createCosmosStoreWithMaxEventsPerSlice connection batchSize maxEventsPerSlice
let resolveStream = CosmosResolver(store, codec, fold, initial).Resolve
let resolveStream = CosmosResolver(store, codec, fold, initial, CachingStrategy.NoCaching).Resolve
Backend.Cart.Service(log, resolveStream)
let projection = "Compacted",snd snapshot
let createServiceWithProjection connection batchSize log =
let store = createCosmosStore connection batchSize
let resolveStream = CosmosResolver(store, codec, fold, initial, AccessStrategy.Snapshot snapshot).Resolve
let resolveStream = CosmosResolver(store, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.Snapshot snapshot).Resolve
Backend.Cart.Service(log, resolveStream)
let createServiceWithProjectionAndCaching connection batchSize log cache =
let store = createCosmosStore connection batchSize
let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.)
let resolveStream = CosmosResolver(store, codec, fold, initial, AccessStrategy.Snapshot snapshot, sliding20m).Resolve
let resolveStream = CosmosResolver(store, codec, fold, initial, sliding20m, AccessStrategy.Snapshot snapshot).Resolve
Backend.Cart.Service(log, resolveStream)

module ContactPreferences =
let fold, initial = Domain.ContactPreferences.Folds.fold, Domain.ContactPreferences.Folds.initial
let codec = genCodec<Domain.ContactPreferences.Events.Event>()
let createServiceWithoutOptimization createGateway defaultBatchSize log _ignoreWindowSize _ignoreCompactionPredicate =
let gateway = createGateway defaultBatchSize
let resolveStream = CosmosResolver(gateway, codec, fold, initial).Resolve
let resolveStream = CosmosResolver(gateway, codec, fold, initial, CachingStrategy.NoCaching).Resolve
Backend.ContactPreferences.Service(log, resolveStream)
let createService createGateway log =
let resolveStream = CosmosResolver(createGateway 1, codec, fold, initial, AccessStrategy.AnyKnownEventType).Resolve
let resolveStream = CosmosResolver(createGateway 1, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.AnyKnownEventType).Resolve
Backend.ContactPreferences.Service(log, resolveStream)

#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)
Expand Down
8 changes: 4 additions & 4 deletions tests/Equinox.EventStore.Integration/EventStoreIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ module Cart =
let createServiceWithoutOptimization log gateway =
Backend.Cart.Service(log, GesResolver(gateway, codec, fold, initial).Resolve)
let createServiceWithCompaction log gateway =
let resolveStream = GesResolver(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots snapshot).Resolve
let resolveStream = GesResolver(gateway, codec, fold, initial, access = AccessStrategy.RollingSnapshots snapshot).Resolve
Backend.Cart.Service(log, resolveStream)
let createServiceWithCaching log gateway cache =
let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.)
Backend.Cart.Service(log, GesResolver(gateway, codec, fold, initial, caching = sliding20m).Resolve)
Backend.Cart.Service(log, GesResolver(gateway, codec, fold, initial, sliding20m).Resolve)
let createServiceWithCompactionAndCaching log gateway cache =
let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.)
Backend.Cart.Service(log, GesResolver(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots snapshot, sliding20m).Resolve)
Backend.Cart.Service(log, GesResolver(gateway, codec, fold, initial, sliding20m, AccessStrategy.RollingSnapshots snapshot).Resolve)

module ContactPreferences =
let fold, initial = Domain.ContactPreferences.Folds.fold, Domain.ContactPreferences.Folds.initial
Expand All @@ -46,7 +46,7 @@ module ContactPreferences =
let gateway = createGesGateway connection defaultBatchSize
Backend.ContactPreferences.Service(log, GesResolver(gateway, codec, fold, initial).Resolve)
let createService log connection =
let resolveStream = GesResolver(createGesGateway connection 1, codec, fold, initial, AccessStrategy.EventsAreState).Resolve
let resolveStream = GesResolver(createGesGateway connection 1, codec, fold, initial, access = AccessStrategy.EventsAreState).Resolve
Backend.ContactPreferences.Service(log, resolveStream)

#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)
Expand Down

0 comments on commit 365f8ef

Please sign in to comment.