Skip to content

Commit

Permalink
feat(CosmosStore): Decouple Compression support
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Dec 31, 2024
1 parent d0c0334 commit 5b27bea
Show file tree
Hide file tree
Showing 18 changed files with 185 additions and 235 deletions.
3 changes: 1 addition & 2 deletions samples/Infrastructure/Services.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

open Domain
open Equinox
open FsCodec.SystemTextJson.Interop // use ToJsonElementCodec because we are doing an overkill example
open Microsoft.Extensions.DependencyInjection
open System

Expand All @@ -18,7 +17,7 @@ type Store(store) =
MemoryStore.MemoryStoreCategory(store, name, codec, fold, initial)
| Store.Config.Cosmos (store, caching, unfolds) ->
let accessStrategy = if unfolds then CosmosStore.AccessStrategy.Snapshot snapshot else CosmosStore.AccessStrategy.Unoptimized
CosmosStore.CosmosStoreCategory<'event,'state,_>(store, name, codec.ToJsonElementCodec(), fold, initial, accessStrategy, caching)
CosmosStore.CosmosStoreCategory<'event,'state,_>(store, name, FsCodec.SystemTextJson.Compression.EncodeTryCompress codec, fold, initial, accessStrategy, caching)
| Store.Config.Dynamo (store, caching, unfolds) ->
let accessStrategy = if unfolds then DynamoStore.AccessStrategy.Snapshot snapshot else DynamoStore.AccessStrategy.Unoptimized
DynamoStore.DynamoStoreCategory<'event,'state,_>(store, name, FsCodec.Compression.EncodeTryCompress codec, fold, initial, accessStrategy, caching)
Expand Down
2 changes: 1 addition & 1 deletion samples/Store/Domain/Domain.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<PackageReference Include="FSharp.Core" Version="6.0.7" ExcludeAssets="contentfiles" />

<PackageReference Include="FsCodec.NewtonsoftJson" Version="3.0.0" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.3" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.4-alpha.0.2" />

<ProjectReference Include="..\..\..\src\Equinox\Equinox.fsproj" />
</ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion samples/Store/Domain/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ module EventCodec =

/// For CosmosStore - we encode to JsonElement as that's what the store talks
let genJsonElement<'t when 't :> TypeShape.UnionContract.IUnionContract> =
FsCodec.SystemTextJson.CodecJsonElement.Create<'t>()
FsCodec.SystemTextJson.CodecJsonElement.Create<'t>() |> FsCodec.SystemTextJson.Compression.EncodeUncompressed

/// For stores other than CosmosStore, we encode to UTF-8 and have the store do the right thing
let gen<'t when 't :> TypeShape.UnionContract.IUnionContract> =
Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Gapless.fs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ module Events =
| Released of Item
| Snapshotted of Snapshotted
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.SystemTextJson.CodecJsonElement.Create<Event>()
let codec = EventCodec.genJsonElement<Event>

module Fold =

Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Index.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ module Events =
| Deleted of ItemIds
| Snapshotted of Items<'v>
interface TypeShape.UnionContract.IUnionContract
let codec<'v> = FsCodec.SystemTextJson.CodecJsonElement.Create<Event<'v>>()
let codec<'v> = EventCodec.genJsonElement<Event<'v>>

module Fold =

Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ module EventCodec =

/// For CosmosStore - we encode to JsonElement as that's what the store talks
let genJsonElement<'t when 't :> TypeShape.UnionContract.IUnionContract> =
FsCodec.SystemTextJson.CodecJsonElement.Create<'t>()
FsCodec.SystemTextJson.CodecJsonElement.Create<'t>() |> FsCodec.SystemTextJson.Compression.EncodeUncompressed

/// For stores other than CosmosStore, we encode to UTF-8 and have the store do the right thing
let gen<'t when 't :> TypeShape.UnionContract.IUnionContract> =
Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Sequence.fs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ module Events =
type Event =
| Reserved of Reserved
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.SystemTextJson.CodecJsonElement.Create<Event>()
let codec = EventCodec.genJsonElement<Event>

module Fold =

Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Set.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module Events =
| Deleted of Items
| Snapshotted of Items
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.SystemTextJson.CodecJsonElement.Create<Event>()
let codec = EventCodec.genJsonElement<Event>

module Fold =

Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Tutorial.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

<ItemGroup>
<PackageReference Include="MinVer" Version="5.0.0" PrivateAssets="All" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.3" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.4-alpha.0.2" />
<PackageReference Include="Serilog.Sinks.Console" Version="5.0.1" />
<PackageReference Include="Serilog.Sinks.Seq" Version="7.0.0" />
</ItemGroup>
Expand Down
184 changes: 92 additions & 92 deletions src/Equinox.CosmosStore/CosmosStore.fs

Large diffs are not rendered by default.

63 changes: 0 additions & 63 deletions src/Equinox.CosmosStore/CosmosStoreSerialization.fs

This file was deleted.

3 changes: 1 addition & 2 deletions src/Equinox.CosmosStore/Equinox.CosmosStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<PackageValidationBaselineVersion>4.0.0</PackageValidationBaselineVersion>
<!-- <PackageValidationBaselineVersion>4.1.0</PackageValidationBaselineVersion>-->
</PropertyGroup>

<ItemGroup>
<Compile Include="..\Equinox\Infrastructure.fs" />
<Compile Include="..\Equinox\LazyTask.fs" />
<Compile Include="..\Equinox.Core\TaskCell.fs" />
<Compile Include="..\Equinox.Core\Internal.fs" />
<Compile Include="CosmosStoreSerialization.fs" />
<Compile Include="CosmosStore.fs" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion tests/Equinox.CosmosStore.Integration/AccessStrategies.fs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ module SequenceCheck =
#if STORE_DYNAMO
let codec = FsCodec.SystemTextJson.Codec.Create<Event>() |> FsCodec.Compression.EncodeTryCompress
#else
let codec = FsCodec.SystemTextJson.CodecJsonElement.Create<Event>()
let codec = FsCodec.SystemTextJson.CodecJsonElement.Create<Event>() |> FsCodec.SystemTextJson.Compression.EncodeTryCompress
#endif

module Fold =
Expand Down
17 changes: 9 additions & 8 deletions tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ open System

type TestEvents() =
static member private Create(i, ?eventType, ?json) =
let enc = System.Text.Json.JsonSerializer.SerializeToElement >> FsCodec.SystemTextJson.Compression.Encode
FsCodec.Core.EventData.Create
( sprintf "%s:%d" (defaultArg eventType "test_event") i,
System.Text.Json.JsonSerializer.SerializeToElement (defaultArg json "{\"d\":\"d\"}"),
System.Text.Json.JsonSerializer.SerializeToElement "{\"m\":\"m\"}")
enc (defaultArg json "{\"d\":\"d\"}"),
enc "{\"m\":\"m\"}")
static member Create(i, c) = Array.init c (fun x -> TestEvents.Create(x + i))

type Tests(testOutputHelper) =
Expand Down Expand Up @@ -64,14 +65,14 @@ type Tests(testOutputHelper) =
test <@ match res with Choice2Of2 (:? InvalidOperationException as ex) -> ex.Message.StartsWith "Must write either events or unfolds." | x -> failwith $"%A{x}" @>
}
let stringOfEventBody (x : System.Text.Json.JsonElement) = x.GetRawText()
let xmlDiff (x: string) (y: string) =
match JsonDiffPatchDotNet.JsonDiffPatch().Diff(JToken.Parse x,JToken.Parse y) with
let stringOfEncodedBody (x: Equinox.CosmosStore.Core.EncodedBody) = FsCodec.SystemTextJson.Compression.DecodeToJsonElement(x).GetRawText()
let jsonDiff (x: string) (y: string) =
match JsonDiffPatchDotNet.JsonDiffPatch().Diff(JToken.Parse x, JToken.Parse y) with
| null -> ""
| d -> string d
let verifyUtf8JsonEquals i x y =
let sx,sy = stringOfEventBody x, stringOfEventBody y
test <@ ignore i; x = y || "" = xmlDiff sx sy @>
let sx,sy = stringOfEncodedBody x, stringOfEncodedBody y
test <@ ignore i; x = y || "" = jsonDiff sx sy @>
let add6EventsIn2BatchesEx ctx streamName splitAt = async {
let index = 0L
Expand All @@ -87,7 +88,7 @@ type Tests(testOutputHelper) =
let add6EventsIn2Batches ctx streamName = add6EventsIn2BatchesEx ctx streamName 1
let verifyCorrectEventsEx direction baseIndex (expected: IEventData<_>[]) (xs: ITimelineEvent<_>[]) =
let verifyCorrectEventsEx direction baseIndex (expected: IEventData<Equinox.CosmosStore.Core.EncodedBody>[]) (xs: ITimelineEvent<Equinox.CosmosStore.Core.EncodedBody>[]) =
let xs, baseIndex =
if direction = Direction.Forward then xs, baseIndex
else Array.rev xs, baseIndex - int64 (Array.length expected) + 1L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<Compile Include="DocumentStoreIntegration.fs" />
<Compile Include="AccessStrategies.fs" />
<Compile Include="CosmosCoreIntegration.fs" />
<Compile Include="JsonConverterTests.fs" />
<Compile Include="FsCodecCompressionTests.fs" />
</ItemGroup>

<ItemGroup>
Expand Down
68 changes: 68 additions & 0 deletions tests/Equinox.CosmosStore.Integration/FsCodecCompressionTests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Prior to version v 4.1.0, CosmosStore owned:
// - compression of snapshots (and APIs controlling conditionally of that)
// - inflation of snapshots
// This is now an external concern, fully implemented by APIs presented in FsCodec.SystemTextJson.Compression V 3.1.0 and later
// These tests are a sanity check pinning the basic mechanisms that are now externalized; any more thorough tests should be maintained in FsCodec
// NOTE there is no strong dependency on FsCodec; CosmosStore is happy to roundtrip arbitrary pairs of D/d and M/m values
// However, it is recommended to use that implementation as it provides for interop with (Deflate-compressed) snapshots as written by CosmosStore pre 4.1.0
// NOTE prior to v 4.1.0, CosmosStore provided a System.Text.Json integration for Microsoft.Azure.Cosmos
// Version 4.1.0 and later lean on the integrated support provided from Microsoft.Azure.Cosmos v 3.43.0 onward
module Equinox.CosmosStore.Integration.FsCodecCompressionTests

open Equinox.CosmosStore
open FsCheck.Xunit
open Swensen.Unquote
open System
open Xunit

type Embedded = { embed : string }
type Union =
| A of Embedded
| B of Embedded
interface TypeShape.UnionContract.IUnionContract

type CoreBehaviors() =
let defaultOptions = System.Text.Json.JsonSerializerOptions.Default // Rule out dependencies on any FsCodec conversions
let eventCodec = FsCodec.SystemTextJson.CodecJsonElement.Create(defaultOptions)
let nullElement = System.Text.Json.JsonSerializer.SerializeToElement null

let ser_ et struct (D, d) =
let e: Core.Unfold =
{ i = 42L
c = et
d = d
D = D
m = nullElement
M = Unchecked.defaultof<int>
t = DateTimeOffset.MinValue }
System.Text.Json.JsonSerializer.Serialize e
let ser (e: FsCodec.IEventData<Equinox.CosmosStore.Core.EncodedBody>) = ser_ e.EventType e.Data

[<Fact>]
let ``serializes, achieving expected compression`` () =
let encoded = eventCodec |> FsCodec.SystemTextJson.Compression.EncodeTryCompress |> _.Encode((), A { embed = String('x',5000) })
let res = ser encoded
test <@ res.Contains "\"d\":\"" && res.Length < 138 && res.Contains "\"D\":2" @>

let codec compress =
let forceCompression: FsCodec.SystemTextJson.CompressionOptions = { minSize = 0; minGain = -1000 }
if compress then FsCodec.SystemTextJson.Compression.EncodeTryCompress(eventCodec, forceCompression)
else FsCodec.SystemTextJson.Compression.EncodeUncompressed eventCodec

[<Property>]
let roundtrips compress value =
let codec = codec compress
let encoded = codec.Encode((), value)
let ser = ser encoded
test <@ if compress then ser.Contains("\"d\":\"")
else ser.Contains("\"d\":{") @>
let des = System.Text.Json.JsonSerializer.Deserialize<Core.Unfold>(ser)
let d = FsCodec.Core.TimelineEvent.Create(-1L, des.c, struct (des.D, des.d))
let decoded = codec.Decode d |> ValueOption.get
test <@ value = decoded @>

[<Fact>]
let handlesNulls () =
let ser = ser_ "et" (0, nullElement)
let des = System.Text.Json.JsonSerializer.Deserialize<Core.Unfold>(ser)
test <@ System.Text.Json.JsonValueKind.Null = let d = des.d in d.ValueKind @>
54 changes: 0 additions & 54 deletions tests/Equinox.CosmosStore.Integration/JsonConverterTests.fs

This file was deleted.

8 changes: 4 additions & 4 deletions tools/Equinox.Tool/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ module CosmosQuery =
let inline miB x = float x / 1024. / 1024.
let private unixEpoch = DateTime.UnixEpoch
type System.Text.Json.JsonElement with
member x.Size = if x.ValueKind = System.Text.Json.JsonValueKind.Null then 0 else x.GetRawText().Length
member x.Utf8ByteCount = if x.ValueKind = System.Text.Json.JsonValueKind.Null then 0 else x.GetRawText() |> System.Text.Encoding.UTF8.GetByteCount
type System.Text.Json.JsonDocument with
member x.Cast<'T>() = System.Text.Json.JsonSerializer.Deserialize<'T>(x.RootElement)
member x.Timestamp =
Expand Down Expand Up @@ -406,7 +406,7 @@ module CosmosQuery =
try while query.HasMoreResults do
sw.Restart()
let! page = query.ReadNextAsync(CancellationToken.None) |> Async.AwaitTaskCorrect
let pageSize = page.Resource |> Seq.sumBy _.RootElement.Size
let pageSize = page.Resource |> Seq.sumBy _.RootElement.Utf8ByteCount
let newestAge = page.Resource |> Seq.choose _.Timestamp |> Seq.tryLast |> Option.map (fun ts -> ts - DateTime.UtcNow)
let items = [| for x in page.Resource -> x.Cast<Equinox.CosmosStore.Core.Tip>() |]
let inline arrayLen x = if isNull x then 0 else Array.length x
Expand Down Expand Up @@ -484,14 +484,14 @@ module Dump =

let initial = List.empty
let fold state events = (events, state) ||> Seq.foldBack (fun e l -> e :: l)
let tryDecode (x : FsCodec.ITimelineEvent<ReadOnlyMemory<byte>>) = ValueSome x
let tryDecode (x: FsCodec.ITimelineEvent<ReadOnlyMemory<byte>>) = ValueSome x
let idCodec = FsCodec.Codec.Create((fun _ -> failwith "No encoding required"), tryDecode, (fun _ _ -> failwith "No mapCausation"))
let isOriginAndSnapshot = (fun (event : FsCodec.ITimelineEvent<_>) -> not doE && event.IsUnfold), fun _state -> failwith "no snapshot required"
let formatUnfolds, formatEvents =
if p.Contains FlattenUnfolds then id else prettifyJson
, if p.Contains Pretty then prettifyJson else id
let mutable payloadBytes = 0
let render format (data : ReadOnlyMemory<byte>) =
let render format (data: ReadOnlyMemory<byte>) =
payloadBytes <- payloadBytes + data.Length
if data.IsEmpty then null
elif not doS then $"%6d{data.Length}b"
Expand Down

0 comments on commit 5b27bea

Please sign in to comment.