From 9869cb98cec75d2e82ca97b412ad4d8482e00ef2 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 4 Mar 2022 15:28:05 +0000 Subject: [PATCH] CosmosStore: Switch to JsonElement bodies (#305) --- CHANGELOG.md | 3 + azure-pipelines.yml | 4 - samples/Infrastructure/Services.fs | 3 +- samples/Store/Domain/Cart.fs | 1 + samples/Store/Domain/ContactPreferences.fs | 1 + samples/Store/Domain/Domain.fsproj | 1 + samples/Store/Domain/Favorites.fs | 1 + samples/Store/Domain/Infrastructure.fs | 6 +- samples/Store/Integration/CartIntegration.fs | 5 +- .../ContactPreferencesIntegration.fs | 7 +- .../Store/Integration/FavoritesIntegration.fs | 9 +- samples/Tutorial/Gapless.fs | 2 +- samples/Tutorial/Index.fs | 2 +- samples/Tutorial/Sequence.fs | 2 +- samples/Tutorial/Set.fs | 2 +- samples/Tutorial/Tutorial.fsproj | 2 + samples/Tutorial/Upload.fs | 3 +- src/Equinox.CosmosStore/CosmosStore.fs | 125 ++++++------------ .../CosmosStoreSerialization.fs | 57 ++++++++ .../Equinox.CosmosStore.fsproj | 4 +- .../AccessStrategies.fs | 2 +- .../CosmosCoreIntegration.fs | 7 +- .../CosmosIntegration.fs | 4 +- .../JsonConverterTests.fs | 24 ++-- 24 files changed, 156 insertions(+), 121 deletions(-) create mode 100644 src/Equinox.CosmosStore/CosmosStoreSerialization.fs diff --git a/CHANGELOG.md b/CHANGELOG.md index ecaabad3f..384f2c9cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,9 @@ The `Unreleased` section name is replaced by the expected version of next releas - `eqx`/`Equinox.Tool`: Flip `-P` option to opt _in_ to pretty printing [#313](https://github.com/jet/equinox/pull/313) - `CosmosStore`: Require `Microsoft.Azure.Cosmos` v `3.0.25` [#310](https://github.com/jet/equinox/pull/310) +- `CosmosStore`: Switch to natively using `JsonElement` event bodies [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach) +- `CosmosStore`: Switch to natively using `System.Text.Json` for serialization of all `Microsoft.Azure.Cosmos` round-trips [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach) +- `CosmosStore`: Only log `bytes` when log level is `Debug` [#305](https://github.com/jet/equinox/pull/305) - `SqlStreamStore`.*: Target `SqlStreamStore` v `1.2.0` (`Postgres` remains at `1.2.0-beta.8` as that's the last released version) [#227](https://github.com/jet/equinox/pull/227) :pray: [@rajivhost](https://github.com/rajivhost) - Update all non-Client dependencies except `FSharp.Core`, `FSharp.Control.AsyncSeq` [#310](https://github.com/jet/equinox/pull/310) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index b4d9c5f08..369ffcdc9 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -1,8 +1,4 @@ name: $(Rev:r) -trigger: -- master -- main -- refs/tags/* jobs: - job: Windows pool: diff --git a/samples/Infrastructure/Services.fs b/samples/Infrastructure/Services.fs index 400b5a835..8f104f840 100644 --- a/samples/Infrastructure/Services.fs +++ b/samples/Infrastructure/Services.fs @@ -1,6 +1,7 @@ module Samples.Infrastructure.Services open Domain +open FsCodec.SystemTextJson open Microsoft.Extensions.DependencyInjection open System @@ -13,7 +14,7 @@ type StreamResolver(storage) = match storage with | Storage.StorageConfig.Cosmos (store, caching, unfolds) -> let accessStrategy = if unfolds then Equinox.CosmosStore.AccessStrategy.Snapshot snapshot else Equinox.CosmosStore.AccessStrategy.Unoptimized - Equinox.CosmosStore.CosmosStoreCategory<'event,'state,_>(store, codec, fold, initial, caching, accessStrategy).Resolve + Equinox.CosmosStore.CosmosStoreCategory<'event,'state,_>(store, codec.ToJsonElementCodec(), fold, initial, caching, accessStrategy).Resolve | Storage.StorageConfig.Es (context, caching, unfolds) -> let accessStrategy = if unfolds then Equinox.EventStore.AccessStrategy.RollingSnapshots snapshot |> Some else None Equinox.EventStore.EventStoreCategory<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve diff --git a/samples/Store/Domain/Cart.fs b/samples/Store/Domain/Cart.fs index bafc7d0dc..da2a13fda 100644 --- a/samples/Store/Domain/Cart.fs +++ b/samples/Store/Domain/Cart.fs @@ -25,6 +25,7 @@ module Events = | ItemPropertiesChanged of ItemPropertiesChangedInfo interface TypeShape.UnionContract.IUnionContract let codec = FsCodec.NewtonsoftJson.Codec.Create() + let codecStj = FsCodec.SystemTextJson.Codec.Create() module Fold = diff --git a/samples/Store/Domain/ContactPreferences.fs b/samples/Store/Domain/ContactPreferences.fs index c5f3200f7..0242ac78c 100644 --- a/samples/Store/Domain/ContactPreferences.fs +++ b/samples/Store/Domain/ContactPreferences.fs @@ -13,6 +13,7 @@ module Events = | []Updated of Value interface TypeShape.UnionContract.IUnionContract let codec = FsCodec.NewtonsoftJson.Codec.Create() + let codecStj = FsCodec.SystemTextJson.Codec.Create() module Fold = diff --git a/samples/Store/Domain/Domain.fsproj b/samples/Store/Domain/Domain.fsproj index 6b9f135c5..3ea73d6ed 100644 --- a/samples/Store/Domain/Domain.fsproj +++ b/samples/Store/Domain/Domain.fsproj @@ -22,6 +22,7 @@ + diff --git a/samples/Store/Domain/Favorites.fs b/samples/Store/Domain/Favorites.fs index 16a6923d8..6f61673a8 100644 --- a/samples/Store/Domain/Favorites.fs +++ b/samples/Store/Domain/Favorites.fs @@ -15,6 +15,7 @@ module Events = | Unfavorited of Unfavorited interface TypeShape.UnionContract.IUnionContract let codec = FsCodec.NewtonsoftJson.Codec.Create() + let codecStj = FsCodec.SystemTextJson.Codec.Create() module Fold = diff --git a/samples/Store/Domain/Infrastructure.fs b/samples/Store/Domain/Infrastructure.fs index 0bb09a774..43db9b05e 100644 --- a/samples/Store/Domain/Infrastructure.fs +++ b/samples/Store/Domain/Infrastructure.fs @@ -32,7 +32,7 @@ module Guid = /// - Ensures canonical rendering without dashes via ToString + Newtonsoft.Json /// - Guards against XSS by only permitting initialization based on Guid.Parse /// - Implements comparison/equality solely to enable tests to leverage structural equality -[)>] +[); System.Text.Json.Serialization.JsonConverter(typeof)>] type SkuId private (id : string) = inherit StringId(id) new(value : Guid) = SkuId(value.ToString "N") @@ -45,6 +45,10 @@ and private SkuIdJsonConverter() = override __.Pickle value = string value /// Input must be a `Guid.Parse`able value override __.UnPickle input = Guid.Parse input |> SkuId +and private SkuIdJsonConverterStj() = + inherit FsCodec.SystemTextJson.JsonIsomorphism() + override _.Pickle value = string value + override _.UnPickle input = Guid.Parse input |> SkuId /// RequestId strongly typed id, represented internally as a string /// - Ensures canonical rendering without dashes via ToString, Newtonsoft.Json, sprintf "%s" etc diff --git a/samples/Store/Integration/CartIntegration.fs b/samples/Store/Integration/CartIntegration.fs index 5765421c2..4c0bf3b44 100644 --- a/samples/Store/Integration/CartIntegration.fs +++ b/samples/Store/Integration/CartIntegration.fs @@ -15,6 +15,7 @@ let createServiceMemory log store = Cart.create log (fun (id,opt) -> MemoryStore.MemoryStoreCategory(store, Domain.Cart.Events.codec, fold, initial).Resolve(id,?option=opt)) let codec = Cart.Events.codec +let codecStj = Cart.Events.codecStj let resolveGesStreamWithRollingSnapshots context = fun (id,opt) -> EventStore.EventStoreCategory(context, codec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot).Resolve(id,?option=opt) @@ -22,9 +23,9 @@ let resolveGesStreamWithoutCustomAccessStrategy context = fun (id,opt) -> EventStore.EventStoreCategory(context, codec, fold, initial).Resolve(id,?option=opt) let resolveCosmosStreamWithSnapshotStrategy context = - fun (id,opt) -> CosmosStore.CosmosStoreCategory(context, codec, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Snapshot snapshot).Resolve(id,?option=opt) + fun (id,opt) -> CosmosStore.CosmosStoreCategory(context, codecStj, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Snapshot snapshot).Resolve(id,?option=opt) let resolveCosmosStreamWithoutCustomAccessStrategy context = - fun (id,opt) -> CosmosStore.CosmosStoreCategory(context, codec, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Unoptimized).Resolve(id,?option=opt) + fun (id,opt) -> CosmosStore.CosmosStoreCategory(context, codecStj, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Unoptimized).Resolve(id,?option=opt) let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Cart.Service) count = service.ExecuteManyAsync(cartId, false, seq { diff --git a/samples/Store/Integration/ContactPreferencesIntegration.fs b/samples/Store/Integration/ContactPreferencesIntegration.fs index 545b609f6..734573984 100644 --- a/samples/Store/Integration/ContactPreferencesIntegration.fs +++ b/samples/Store/Integration/ContactPreferencesIntegration.fs @@ -14,18 +14,19 @@ let createServiceMemory log store = ContactPreferences.create log (MemoryStore.MemoryStoreCategory(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve) let codec = ContactPreferences.Events.codec +let codecStj = ContactPreferences.Events.codecStj let resolveStreamGesWithOptimizedStorageSemantics context = EventStore.EventStoreCategory(context 1, codec, fold, initial, access = EventStore.AccessStrategy.LatestKnownEvent).Resolve let resolveStreamGesWithoutAccessStrategy context = EventStore.EventStoreCategory(context defaultBatchSize, codec, fold, initial).Resolve let resolveStreamCosmosWithLatestKnownEventSemantics context = - CosmosStore.CosmosStoreCategory(context, codec, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.LatestKnownEvent).Resolve + CosmosStore.CosmosStoreCategory(context, codecStj, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.LatestKnownEvent).Resolve let resolveStreamCosmosUnoptimized context = - CosmosStore.CosmosStoreCategory(context, codec, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Unoptimized).Resolve + CosmosStore.CosmosStoreCategory(context, codecStj, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Unoptimized).Resolve let resolveStreamCosmosRollingUnfolds context = let access = CosmosStore.AccessStrategy.Custom(ContactPreferences.Fold.isOrigin, ContactPreferences.Fold.transmute) - CosmosStore.CosmosStoreCategory(context, codec, fold, initial, CosmosStore.CachingStrategy.NoCaching, access).Resolve + CosmosStore.CosmosStoreCategory(context, codecStj, fold, initial, CosmosStore.CachingStrategy.NoCaching, access).Resolve type Tests(testOutputHelper) = let testOutput = TestOutputAdapter testOutputHelper diff --git a/samples/Store/Integration/FavoritesIntegration.fs b/samples/Store/Integration/FavoritesIntegration.fs index 31d0f822f..8e0604f47 100644 --- a/samples/Store/Integration/FavoritesIntegration.fs +++ b/samples/Store/Integration/FavoritesIntegration.fs @@ -14,18 +14,19 @@ let createMemoryStore () = MemoryStore.VolatileStore<_>() let createServiceMemory log store = Favorites.create log (MemoryStore.MemoryStoreCategory(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve) -let codec = Domain.Favorites.Events.codec +let codec = Favorites.Events.codec +let codecStj = Favorites.Events.codecStj let createServiceGes log context = let cat = EventStore.EventStoreCategory(context, codec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot) Favorites.create log cat.Resolve let createServiceCosmosSnapshotsUncached log context = - let cat = CosmosStore.CosmosStoreCategory(context, codec, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Snapshot snapshot) + let cat = CosmosStore.CosmosStoreCategory(context, codecStj, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Snapshot snapshot) Favorites.create log cat.Resolve let createServiceCosmosRollingStateUncached log context = let access = CosmosStore.AccessStrategy.RollingState Favorites.Fold.snapshot - let cat = CosmosStore.CosmosStoreCategory(context, codec, fold, initial, CosmosStore.CachingStrategy.NoCaching, access) + let cat = CosmosStore.CosmosStoreCategory(context, codecStj, fold, initial, CosmosStore.CachingStrategy.NoCaching, access) Favorites.create log cat.Resolve let createServiceCosmosUnoptimizedButCached log context = @@ -33,7 +34,7 @@ let createServiceCosmosUnoptimizedButCached log context = let caching = let cache = Equinox.Cache ("name", 10) CosmosStore.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) - let cat = CosmosStore.CosmosStoreCategory(context, codec, fold, initial, caching, access) + let cat = CosmosStore.CosmosStoreCategory(context, codecStj, fold, initial, caching, access) Favorites.create log cat.Resolve type Command = diff --git a/samples/Tutorial/Gapless.fs b/samples/Tutorial/Gapless.fs index ac1e540f6..7bab2b357 100644 --- a/samples/Tutorial/Gapless.fs +++ b/samples/Tutorial/Gapless.fs @@ -18,7 +18,7 @@ module Events = | Released of Item | Snapshotted of Snapshotted interface TypeShape.UnionContract.IUnionContract - let codec = FsCodec.NewtonsoftJson.Codec.Create() + let codec = FsCodec.SystemTextJson.Codec.Create() module Fold = diff --git a/samples/Tutorial/Index.fs b/samples/Tutorial/Index.fs index 91840f477..3ee5e51a6 100644 --- a/samples/Tutorial/Index.fs +++ b/samples/Tutorial/Index.fs @@ -13,7 +13,7 @@ module Events = | Deleted of ItemIds | Snapshotted of Items<'v> interface TypeShape.UnionContract.IUnionContract - let codec<'v> = FsCodec.NewtonsoftJson.Codec.Create>() + let codec<'v> = FsCodec.SystemTextJson.Codec.Create>() module Fold = diff --git a/samples/Tutorial/Sequence.fs b/samples/Tutorial/Sequence.fs index 387c6ae3c..493d1c6b0 100644 --- a/samples/Tutorial/Sequence.fs +++ b/samples/Tutorial/Sequence.fs @@ -14,7 +14,7 @@ module Events = type Event = | Reserved of Reserved interface TypeShape.UnionContract.IUnionContract - let codec = FsCodec.NewtonsoftJson.Codec.Create() + let codec = FsCodec.SystemTextJson.Codec.Create() module Fold = diff --git a/samples/Tutorial/Set.fs b/samples/Tutorial/Set.fs index 30039753b..453f7b5b8 100644 --- a/samples/Tutorial/Set.fs +++ b/samples/Tutorial/Set.fs @@ -12,7 +12,7 @@ module Events = | Deleted of Items | Snapshotted of Items interface TypeShape.UnionContract.IUnionContract - let codec = FsCodec.NewtonsoftJson.Codec.Create() + let codec = FsCodec.SystemTextJson.Codec.Create() module Fold = diff --git a/samples/Tutorial/Tutorial.fsproj b/samples/Tutorial/Tutorial.fsproj index cb6003b34..efc05d368 100644 --- a/samples/Tutorial/Tutorial.fsproj +++ b/samples/Tutorial/Tutorial.fsproj @@ -28,6 +28,8 @@ + + diff --git a/samples/Tutorial/Upload.fs b/samples/Tutorial/Upload.fs index 2a24f8d87..4430fd812 100644 --- a/samples/Tutorial/Upload.fs +++ b/samples/Tutorial/Upload.fs @@ -30,6 +30,7 @@ module Events = | IdAssigned of IdAssigned interface TypeShape.UnionContract.IUnionContract let codec = FsCodec.NewtonsoftJson.Codec.Create() + let codecStj = FsCodec.SystemTextJson.Codec.Create() module Fold = @@ -62,7 +63,7 @@ module Cosmos = open Equinox.CosmosStore let create (context,cache) = let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching - let category = CosmosStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, AccessStrategy.LatestKnownEvent) + let category = CosmosStoreCategory(context, Events.codecStj, Fold.fold, Fold.initial, cacheStrategy, AccessStrategy.LatestKnownEvent) create category.Resolve module EventStore = diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index a145cddfc..a0e456fa6 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -4,15 +4,15 @@ open Equinox.Core open FsCodec open FSharp.Control open Microsoft.Azure.Cosmos -open Newtonsoft.Json open Serilog open System +open System.Text.Json -type EventBody = byte[] +type EventBody = JsonElement /// A single Domain Event from the array held in a Batch -type [] - Event = +[] +type Event = // TODO for STJ v5: All fields required unless explicitly optional { /// Creation datetime (as opposed to system-defined _lastUpdated which is touched by triggers, replication etc.) t: DateTimeOffset // ISO 8601 @@ -20,22 +20,17 @@ type [] c: string // required /// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for CosmosDB - [)>] - [] - d: EventBody // Required, but can be null so Nullary cases can work + d: EventBody // TODO for STJ v5: Required, but can be null so Nullary cases can work - /// Optional metadata, as UTF-8 encoded json, ready to emit directly (null, not written if missing) - [)>] - [] - m: EventBody + /// Optional metadata, as UTF-8 encoded json, ready to emit directly + m: EventBody // TODO for STJ v5: Optional, not serialized if missing - /// Optional correlationId (can be null, not written if missing) - [] - correlationId : string + /// Optional correlationId + correlationId : string // TODO for STJ v5: Optional, not serialized if missing - /// Optional causationId (can be null, not written if missing) - [] - causationId : string } + /// Optional causationId + causationId : string // TODO for STJ v5: Optional, not serialized if missing + } interface IEventData with member x.EventType = x.c @@ -47,12 +42,11 @@ type [] member x.Timestamp = x.t /// A 'normal' (frozen, not Tip) Batch of Events (without any Unfolds) -type [] - Batch = +[] +type Batch = // TODO for STJ v5: All fields required unless explicitly optional { /// CosmosDB-mandated Partition Key, must be maintained within the document /// Not actually required if running in single partition mode, but for simplicity, we always write it - [] // Not requested in queries - p: string // "{streamName}" + p: string // "{streamName}" TODO for STJ v5: Optional, not requested in queries /// CosmosDB-mandated unique row key; needs to be unique within any partition it is maintained; must be string /// At the present time, one can't perform an ORDER BY on this field, hence we also have i shadowing it @@ -62,8 +56,7 @@ type [] /// When we read, we need to capture the value so we can retain it for caching purposes /// NB this is not relevant to fill in when we pass it to the writing stored procedure /// as it will do: 1. read 2. merge 3. write merged version contingent on the _etag not having changed - [] - _etag: string + _etag: string // TODO for STJ v5: Optional, not serialized if missing /// base 'i' value for the Events held herein i: int64 // {index} @@ -91,65 +84,31 @@ type Unfold = /// The Case (Event Type) of this compaction/snapshot, used to drive deserialization c: string // required - /// UTF-8 JSON OR Event body - Json -> UTF-8 -> Deflate -> Base64 - [)>] - d: byte[] // required + /// Event body - Json -> Deflate -> Base64 -> JsonElement + [)>] + d: EventBody // required /// Optional metadata, same encoding as `d` (can be null; not written if missing) - [)>] - [] - m: EventBody } // optional - -/// Transparently encodes/decodes fields that can optionally by compressed by -/// 1. Writing outgoing values (which may be JSON string, JSON object, or null) from a UTF-8 JSON array representation as per VerbatimUtf8Converter -/// 2a. Decoding incoming JSON String values by Decompressing it to a UTF-8 JSON array representation -/// 2b. Decoding incoming JSON non-string values by reading the raw value directly into a UTF-8 JSON array as per VerbatimUtf8Converter -and Base64MaybeDeflateUtf8JsonConverter() = - inherit JsonConverter() - let inflate str : byte[] = - let compressedBytes = System.Convert.FromBase64String str - use input = new System.IO.MemoryStream(compressedBytes) - use decompressor = new System.IO.Compression.DeflateStream(input, System.IO.Compression.CompressionMode.Decompress) - use output = new System.IO.MemoryStream() - decompressor.CopyTo(output) - output.ToArray() - static member Compress(input : byte[]) : byte[] = - if input = null || input.Length = 0 then null else - - use output = new System.IO.MemoryStream() - use compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal) - compressor.Write(input, 0, input.Length) - compressor.Close() - String.Concat("\"", System.Convert.ToBase64String(output.ToArray()), "\"") - |> System.Text.Encoding.UTF8.GetBytes - override _.CanConvert(objectType) = - typeof.Equals(objectType) - override _.ReadJson(reader, _, _, serializer) = - match reader.TokenType with - | JsonToken.Null -> null - | JsonToken.String -> serializer.Deserialize(reader, typedefof) :?> string |> inflate |> box - | _ -> Newtonsoft.Json.Linq.JToken.Load reader |> string |> System.Text.Encoding.UTF8.GetBytes |> box - override _.WriteJson(writer, value, serializer) = - let array = value :?> byte[] - if array = null || array.Length = 0 then serializer.Serialize(writer, null) - else System.Text.Encoding.UTF8.GetString array |> writer.WriteRawValue + [)>] + m: EventBody // TODO for STJ v5: Optional, not serialized if missing + } /// The special-case 'Pending' Batch Format used to read the currently active (and mutable) document /// Stored representation has the following diffs vs a 'normal' (frozen/completed) Batch: a) `id` = `-1` b) contains unfolds (`u`) /// NB the type does double duty as a) model for when we read it b) encoding a batch being sent to the stored proc -type [] - Tip = - { [] // Not requested in queries +[] +type Tip = // TODO for STJ v5: All fields required unless explicitly optional + { /// Partition key, as per Batch - p: string // "{streamName}" + p: string // "{streamName}" TODO for STJ v5: Optional, not requested in queries + /// Document Id within partition, as per Batch id: string // "{-1}" - Well known IdConstant used while this remains the pending batch /// When we read, we need to capture the value so we can retain it for caching purposes /// NB this is not relevant to fill in when we pass it to the writing stored procedure /// as it will do: 1. read 2. merge 3. write merged version contingent on the _etag not having changed - [] - _etag: string + _etag: string // TODO for STJ v5: Optional, not serialized if missing /// base 'i' value for the Events held herein i: int64 @@ -250,7 +209,7 @@ module Log = | Trim of Measurement let internal prop name value (log : ILogger) = log.ForContext(name, value) let internal propData name (events : #IEventData seq) (log : ILogger) = - let render = function null -> "null" | bytes -> System.Text.Encoding.UTF8.GetString bytes + let render (body : EventBody) = body.GetRawText() let items = seq { for e in events do yield sprintf "{\"%s\": %s}" e.EventType (render e.Data) } log.ForContext(name, sprintf "[%s]" (String.concat ",\n\r" items)) let internal propEvents = propData "events" @@ -272,9 +231,9 @@ module Log = let enrich (e : Serilog.Events.LogEvent) = e.AddPropertyIfAbsent(Serilog.Events.LogEventProperty(PropertyTag, Serilog.Events.ScalarValue(value))) log.ForContext({ new Serilog.Core.ILogEventEnricher with member _.Enrich(evt,_) = enrich evt }) - let internal (|BlobLen|) = function null -> 0 | (x : byte[]) -> x.Length - let internal (|EventLen|) (x : #IEventData<_>) = let (BlobLen bytes), (BlobLen metaBytes) = x.Data, x.Meta in bytes + metaBytes + 80 - let internal (|BatchLen|) = Seq.sumBy (|EventLen|) + let internal (|BlobLen|) (x : EventBody) = if x.ValueKind = JsonValueKind.Null then 0 else x.GetRawText().Length + let internal eventLen (x: #IEventData<_>) = let BlobLen bytes, BlobLen metaBytes = x.Data, x.Meta in bytes + metaBytes + 80 + let internal batchLen = Seq.sumBy eventLen let internal (|SerilogScalar|_|) : Serilog.Events.LogEventPropertyValue -> obj option = function | :? Serilog.Events.ScalarValue as x -> Some x.Value | _ -> None @@ -521,7 +480,7 @@ module internal Sync = : Async = async { let! t, (ru, result) = run (container, stream) (maxEventsInTip, maxStringifyLen) (exp, req) |> Stopwatch.Time let verbose = log.IsEnabled Serilog.Events.LogEventLevel.Debug - let (Log.BatchLen bytes), count = Enum.Events req, req.e.Length + let count, bytes = req.e.Length, if verbose then Enum.Events req |> Log.batchLen else 0 let log = let inline mkMetric ru : Log.Measurement = { database = container.Database.Id; container = container.Id; stream = stream; interval = t; bytes = bytes; count = count; ru = ru } @@ -548,7 +507,7 @@ module internal Sync = Log.withLoggedRetries retryPolicy "writeAttempt" call log let private mkEvent (e : IEventData<_>) = - { t = e.Timestamp; c = e.EventType; d = e.Data; m = e.Meta; correlationId = e.CorrelationId; causationId = e.CausationId } + { t = e.Timestamp; c = e.EventType; d = JsonHelper.fixup e.Data; m = JsonHelper.fixup e.Meta; correlationId = e.CorrelationId; causationId = e.CausationId } let mkBatch (stream : string) (events : IEventData<_>[]) unfolds : Tip = { p = stream; id = Tip.WellKnownDocumentId; n = -1L(*Server-managed*); i = -1L(*Server-managed*); _etag = null e = Array.map mkEvent events; u = Array.ofSeq unfolds } @@ -654,7 +613,7 @@ module internal Tip = (log 0 0 Log.Metric.TipNotFound).Information("EqxCosmos {action:l} {stream} {res} {ms}ms rc={ru}", "Tip", stream, 404, (let e = t.Elapsed in e.TotalMilliseconds), ru) | ReadResult.Found tip -> let log = - let (Log.BatchLen bytes), count = Enum.Unfolds tip.u, tip.u.Length + let count, bytes = tip.u.Length, if verbose then Enum.Unfolds tip.u |> Log.batchLen else 0 log bytes count Log.Metric.Tip let log = if verbose then log |> Log.propDataUnfolds tip.u else log let log = match maybePos with Some p -> log |> Log.propStartPos p |> Log.propStartEtag p | None -> log @@ -683,7 +642,7 @@ module internal Query = if query.HasMoreResults then yield! loop (i + 1) } // earlier versions, such as 3.9.0, do not implement IDisposable; see linked issue for detail on when SDK team added it - use __ = query // see https://github.com/jet/equinox/issues/225 - in the Cosmos V4 SDK, all this is managed IAsyncEnumerable + use _ = query // see https://github.com/jet/equinox/issues/225 - in the Cosmos V4 SDK, all this is managed IAsyncEnumerable loop 0 let private mkQuery (log : ILogger) (container : Container, stream : string) includeTip maxItems (direction : Direction, minIndex, maxIndex) : FeedIterator = let order = if direction = Direction.Forward then "ASC" else "DESC" @@ -718,7 +677,7 @@ module internal Query = |> if direction = Direction.Backward then System.Linq.Enumerable.Reverse else id let events = batches |> Seq.collect unwrapBatch |> Array.ofSeq let verbose = log.IsEnabled Events.LogEventLevel.Debug - let (Log.BatchLen bytes), count = events, events.Length + let count, bytes = events.Length, if verbose then events |> Log.batchLen else 0 let reqMetric : Log.Measurement = { database = container.Database.Id; container = container.Id; stream = streamName; interval = t; bytes = bytes; count = count; ru = ru } let log = let evt = Log.Metric.QueryResponse (direction, reqMetric) in log |> Log.event evt let log = if verbose then log |> Log.propEvents events else log @@ -732,7 +691,8 @@ module internal Query = events, maybePosition, ru let private logQuery direction queryMaxItems (container : Container, streamName) interval (responsesCount, events : ITimelineEvent[]) n (ru : float) (log : ILogger) = - let (Log.BatchLen bytes), count = events, events.Length + let verbose = log.IsEnabled Events.LogEventLevel.Debug + let count, bytes = events.Length, if verbose then events |> Log.batchLen else 0 let reqMetric : Log.Measurement = { database = container.Database.Id; container = container.Id; stream = streamName; interval = interval; bytes = bytes; count = count; ru = ru } let evt = Log.Metric.Query (direction, responsesCount, reqMetric) let action = match direction with Direction.Forward -> "QueryF" | Direction.Backward -> "QueryB" @@ -744,7 +704,7 @@ module internal Query = let mutable used, dropped = 0, 0 let mutable found = false for x in xs do - let (Log.EventLen bytes) = x + let bytes = Log.eventLen x if found then dropped <- dropped + bytes else used <- used + bytes if x.Index = stopIndex then found <- true @@ -1143,7 +1103,7 @@ type internal Category<'event, 'state, 'context>(store : StoreClient, codec : IE let events', unfolds = transmute events state' SyncExp.Etag (defaultArg pos.etag null), events', Seq.map encode events' |> Array.ofSeq, Seq.map encode unfolds let baseIndex = pos.index + int64 (List.length events) - let compressor = if compressUnfolds then Base64MaybeDeflateUtf8JsonConverter.Compress else id + let compressor = if compressUnfolds then JsonCompressedBase64Converter.Compress else JsonHelper.fixup let projections = Sync.mkUnfold compressor baseIndex projectionsEncoded let batch = Sync.mkBatch stream eventsEncoded projections match! store.Sync(log, stream, exp, batch) with @@ -1244,7 +1204,8 @@ type CosmosClientFactory CosmosClientOptions( MaxRetryAttemptsOnRateLimitedRequests = maxAttempts, MaxRetryWaitTimeOnRateLimitedRequests = maxWait, - RequestTimeout = timeout) + RequestTimeout = timeout, + Serializer = CosmosJsonSerializer(System.Text.Json.JsonSerializerOptions())) match mode with | None | Some ConnectionMode.Direct -> co.ConnectionMode <- ConnectionMode.Direct | Some ConnectionMode.Gateway | Some _ (* enum total match :( *) -> co.ConnectionMode <- ConnectionMode.Gateway // only supports Https diff --git a/src/Equinox.CosmosStore/CosmosStoreSerialization.fs b/src/Equinox.CosmosStore/CosmosStoreSerialization.fs new file mode 100644 index 000000000..b81a88eeb --- /dev/null +++ b/src/Equinox.CosmosStore/CosmosStoreSerialization.fs @@ -0,0 +1,57 @@ +namespace Equinox.CosmosStore.Core + +open Microsoft.Azure.Cosmos +open System.IO +open System.Text.Json +open System.Text.Json.Serialization + +module JsonHelper = + + let Null = JsonSerializer.SerializeToElement null + let fixup (e : JsonElement) = if e.ValueKind = JsonValueKind.Undefined then Null else e + +type CosmosJsonSerializer(options: JsonSerializerOptions) = + inherit CosmosSerializer() + + override _.FromStream<'T>(stream) = + use __ = stream + + if stream.Length = 0L then Unchecked.defaultof<'T> + elif typeof.IsAssignableFrom(typeof<'T>) then box stream :?> 'T + else JsonSerializer.Deserialize<'T>(stream, options) + + override _.ToStream<'T>(input: 'T) = + let memoryStream = new MemoryStream() + JsonSerializer.Serialize(memoryStream, input, input.GetType(), options) + memoryStream.Position <- 0L + memoryStream :> Stream + +/// Manages zipping of the UTF-8 json bytes to make the index record minimal from the perspective of the writer stored proc +/// Only applied to snapshots in the Tip +and JsonCompressedBase64Converter() = + inherit JsonConverter() + + static member Compress(value: JsonElement) = + if value.ValueKind = JsonValueKind.Undefined then JsonHelper.Null + elif value.ValueKind = JsonValueKind.Null then value + else + let input = System.Text.Encoding.UTF8.GetBytes(value.GetRawText()) + use output = new MemoryStream() + use compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal) + compressor.Write(input, 0, input.Length) + compressor.Close() + JsonSerializer.Deserialize("\"" + System.Convert.ToBase64String(output.ToArray()) + "\"") + + override _.Read(reader, _typeToConvert, options) = + if reader.TokenType <> JsonTokenType.String then + JsonSerializer.Deserialize(&reader, options) + else + let compressedBytes = reader.GetBytesFromBase64() + use input = new MemoryStream(compressedBytes) + use decompressor = new System.IO.Compression.DeflateStream(input, System.IO.Compression.CompressionMode.Decompress) + use output = new MemoryStream() + decompressor.CopyTo(output) + JsonSerializer.Deserialize(System.ReadOnlySpan.op_Implicit(output.ToArray()), options) + + override _.Write(writer, value, options) = + JsonSerializer.Serialize(writer, value, options) diff --git a/src/Equinox.CosmosStore/Equinox.CosmosStore.fsproj b/src/Equinox.CosmosStore/Equinox.CosmosStore.fsproj index 157b134f7..2f4714ba0 100644 --- a/src/Equinox.CosmosStore/Equinox.CosmosStore.fsproj +++ b/src/Equinox.CosmosStore/Equinox.CosmosStore.fsproj @@ -10,6 +10,7 @@ + @@ -23,9 +24,10 @@ - + + diff --git a/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs b/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs index 73b64d3f5..edf925529 100644 --- a/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs +++ b/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs @@ -33,7 +33,7 @@ module SequenceCheck = type Event = | Add of {| value : int |} interface TypeShape.UnionContract.IUnionContract - let codec = FsCodec.NewtonsoftJson.Codec.Create() + let codec = FsCodec.SystemTextJson.Codec.Create() module Fold = diff --git a/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs b/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs index 2da11114f..5d48fb4e4 100644 --- a/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs +++ b/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs @@ -14,10 +14,11 @@ open System.Text type TestEvents() = static member private Create(i, ?eventType, ?json) = + let ser = System.Text.Json.JsonSerializer.SerializeToElement EventData.FromUtf8Bytes ( sprintf "%s:%d" (defaultArg eventType "test_event") i, - Encoding.UTF8.GetBytes(defaultArg json "{\"d\":\"d\"}"), - Encoding.UTF8.GetBytes "{\"m\":\"m\"}") + ser (defaultArg json "{\"d\":\"d\"}"), + ser "{\"m\":\"m\"}") static member Create(i, c) = Array.init c (fun x -> TestEvents.Create(x+i)) type Tests(testOutputHelper) = @@ -69,7 +70,7 @@ type Tests(testOutputHelper) = } let blobEquals (x: byte[]) (y: byte[]) = System.Linq.Enumerable.SequenceEqual(x,y) - let stringOfEventBody (x: byte[]) = Encoding.UTF8.GetString(x) + let stringOfEventBody (x : System.Text.Json.JsonElement) = x.GetRawText() let xmlDiff (x: string) (y: string) = match JsonDiffPatchDotNet.JsonDiffPatch().Diff(JToken.Parse x,JToken.Parse y) with | null -> "" diff --git a/tests/Equinox.CosmosStore.Integration/CosmosIntegration.fs b/tests/Equinox.CosmosStore.Integration/CosmosIntegration.fs index 551e6bbf9..b804af1d0 100644 --- a/tests/Equinox.CosmosStore.Integration/CosmosIntegration.fs +++ b/tests/Equinox.CosmosStore.Integration/CosmosIntegration.fs @@ -11,7 +11,7 @@ open System.Threading module Cart = let fold, initial = Cart.Fold.fold, Cart.Fold.initial let snapshot = Cart.Fold.isOrigin, Cart.Fold.snapshot - let codec = Cart.Events.codec + let codec = Cart.Events.codecStj let createServiceWithoutOptimization log context = let resolve (id,opt) = CosmosStoreCategory(context, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.Unoptimized).Resolve(id,?option=opt) Cart.create log resolve @@ -34,7 +34,7 @@ module Cart = module ContactPreferences = let fold, initial = ContactPreferences.Fold.fold, ContactPreferences.Fold.initial - let codec = ContactPreferences.Events.codec + let codec = ContactPreferences.Events.codecStj let createServiceWithoutOptimization createContext queryMaxItems log _ignoreWindowSize _ignoreCompactionPredicate = let context = createContext queryMaxItems let resolveStream = CosmosStoreCategory(context, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.Unoptimized).Resolve diff --git a/tests/Equinox.CosmosStore.Integration/JsonConverterTests.fs b/tests/Equinox.CosmosStore.Integration/JsonConverterTests.fs index 2eb8ca17a..fad94f645 100644 --- a/tests/Equinox.CosmosStore.Integration/JsonConverterTests.fs +++ b/tests/Equinox.CosmosStore.Integration/JsonConverterTests.fs @@ -2,7 +2,6 @@ open Equinox.CosmosStore open FsCheck.Xunit -open Newtonsoft.Json open Swensen.Unquote open System open Xunit @@ -13,43 +12,44 @@ type Union = | B of Embedded interface TypeShape.UnionContract.IUnionContract -let defaultSettings = FsCodec.NewtonsoftJson.Settings.CreateDefault() +let defaultOptions = FsCodec.SystemTextJson.Options.CreateDefault() type Base64ZipUtf8Tests() = - let eventCodec = FsCodec.NewtonsoftJson.Codec.Create(defaultSettings) + let eventCodec = FsCodec.SystemTextJson.Codec.Create(defaultOptions) let ser eventType data = let e : Core.Unfold = { i = 42L c = eventType d = data - m = null + m = System.Text.Json.JsonSerializer.SerializeToElement null t = DateTimeOffset.MinValue } - JsonConvert.SerializeObject e + System.Text.Json.JsonSerializer.Serialize e [] let ``serializes, achieving expected compression`` () = let encoded = eventCodec.Encode(None,A { embed = String('x',5000) }) - let res = ser encoded.EventType (Core.Base64MaybeDeflateUtf8JsonConverter.Compress encoded.Data) + let res = ser encoded.EventType (Core.JsonCompressedBase64Converter.Compress encoded.Data) test <@ res.Contains("\"d\":\"") && res.Length < 138 @> [] let roundtrips compress value = let encoded = eventCodec.Encode(None, value) - let maybeCompressor = if compress then Core.Base64MaybeDeflateUtf8JsonConverter.Compress else id + let maybeCompressor = if compress then Core.JsonCompressedBase64Converter.Compress else id let actualData = maybeCompressor encoded.Data let ser = ser encoded.EventType actualData test <@ if compress then ser.Contains("\"d\":\"") else ser.Contains("\"d\":{") @> - let des = JsonConvert.DeserializeObject(ser) + let des = System.Text.Json.JsonSerializer.Deserialize(ser) let d = FsCodec.Core.TimelineEvent.Create(-1L, des.c, des.d) let decoded = eventCodec.TryDecode d |> Option.get test <@ value = decoded @> [] let handlesNulls compress = - let maybeCompressor = if compress then Core.Base64MaybeDeflateUtf8JsonConverter.Compress else id - let maybeCompressed = maybeCompressor null + let maybeCompressor = if compress then Core.JsonCompressedBase64Converter.Compress else id + let nullElement = System.Text.Json.JsonSerializer.SerializeToElement null + let maybeCompressed = maybeCompressor nullElement let ser = ser "AnEventType" maybeCompressed - let des = JsonConvert.DeserializeObject(ser) - test <@ null = des.d @> + let des = System.Text.Json.JsonSerializer.Deserialize(ser) + test <@ System.Text.Json.JsonValueKind.Null = let d = des.d in d.ValueKind @>