Skip to content

Commit

Permalink
Fix eqx dump handling of null values (#319)
Browse files Browse the repository at this point in the history
Switched from Newtonsoft to STJ
  • Loading branch information
bartelink authored Mar 10, 2022
1 parent 40eaa2c commit 0767c57
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 47 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Added

- `eqx dump`/`Equinox.Tool`: Add `-F` option to opt out of pretty printing unfolds [#319](https://github.com/jet/equinox/pull/319)
- `Equinox`: `Decider.Transact(interpret : 'state -> Async<'event list>)` [#314](https://github.com/jet/equinox/pull/314)

### Changed

- `eqx`/`Equinox.Tool`: Flip `-P` option to opt _in_ to pretty printing [#313](https://github.com/jet/equinox/pull/313)
- `eqx dump`/`Equinox.Tool`: Flip `-P` option to opt _in_ to pretty printing for events [#313](https://github.com/jet/equinox/pull/313)
- `Equinox`: rename `Decider.TransactAsync` to `Transact` [#314](https://github.com/jet/equinox/pull/314)
- `Equinox`: Merge `ResolveOption` and `XXXStoreCategory.FromMemento` as `LoadOption` [#308](https://github.com/jet/equinox/pull/308)
- `Equinox`: Merge `XXXStoreCategory.Resolve(sn, ?ResolveOption)` and `XXXStoreCategory.FromMemento` as option `LoadOption` parameter on all `Transact` and `Query` methods [#308](https://github.com/jet/equinox/pull/308)
Expand All @@ -32,6 +33,7 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Fixed

- `eqx dump`/`Equinox.Tool`: Use `System.Text.Json` for pretty printing to handle `null` values correctly [#319](https://github.com/jet/equinox/pull/319)
- `EventStore`: Revise test rig to target a Docker-hosted cluster [#317](https://github.com/jet/equinox/pull/317)
- `EventStore/SqlStreamStore`: rename `Equinox.XXXStore.Log.Event` -> `Metric` to match `CosmosStore` [#311](https://github.com/jet/equinox/pull/311)
- `SqlStreamStore`: Fix `Metric` key to be `ssEvt` (was `esEvt`) [#311](https://github.com/jet/equinox/pull/311)
Expand Down
11 changes: 0 additions & 11 deletions tools/Equinox.Tool/Equinox.Tool.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
<WarningLevel>5</WarningLevel>
<IsTestProject>false</IsTestProject>

<DisableImplicitFSharpCoreReference>true</DisableImplicitFSharpCoreReference>
<DisableImplicitSystemValueTupleReference>true</DisableImplicitSystemValueTupleReference>

<PackageId>Equinox.Tool</PackageId>
Expand Down Expand Up @@ -36,16 +35,6 @@

<ItemGroup>
<PackageReference Include="MinVer" Version="2.5.0" PrivateAssets="All" />

<!-- NOTE cannot be 4.7.0 as Async.Sequential is broken-->
<PackageReference Include="FSharp.Core" Version="4.7.1" />

<!-- Disambigurate to remove warning-->
<PackageReference Include="System.Configuration.ConfigurationManager" Version="6.0.0" />
<PackageReference Include="System.Security.Cryptography.Cng" Version="5.0.0" />

<PackageReference Include="Serilog.Sinks.Console" Version="4.0.1" />
<PackageReference Include="Serilog.Sinks.Seq" Version="5.1.1" />
</ItemGroup>

<!-- workaround for not being able to make Domain inlined in a complete way https://github.com/nuget/home/issues/3891#issuecomment-377319939 -->
Expand Down
76 changes: 41 additions & 35 deletions tools/Equinox.Tool/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ and [<NoComparison; NoEquality>]DumpArguments =
| [<AltCommandLine "-C"; Unique>] Correlation
| [<AltCommandLine "-J"; Unique>] JsonSkip
| [<AltCommandLine "-P"; Unique>] Pretty
| [<AltCommandLine "-F"; Unique>] FlattenUnfolds
| [<AltCommandLine "-T"; Unique>] TimeRegular
| [<AltCommandLine "-U"; Unique>] UnfoldsOnly
| [<AltCommandLine "-E"; Unique >] EventsOnly
Expand All @@ -105,6 +106,7 @@ and [<NoComparison; NoEquality>]DumpArguments =
| Correlation -> "Include Correlation/Causation identifiers"
| JsonSkip -> "Don't attempt to decode JSON"
| Pretty -> "Pretty print the JSON over multiple lines"
| FlattenUnfolds -> "Don't pretty print the JSON over multiple lines for Unfolds"
| TimeRegular -> "Don't humanize time intervals between events"
| UnfoldsOnly -> "Exclude Events. Default: show both Events and Unfolds"
| EventsOnly -> "Exclude Unfolds/Snapshots. Default: show both Events and Unfolds."
Expand All @@ -122,7 +124,7 @@ and DumpInfo(args: ParseResults<DumpArguments>) =
storeLog, Storage.Cosmos.config log storeConfig (Storage.Cosmos.Info sargs)
| Some (DumpArguments.Es sargs) ->
let storeLog = createStoreLog <| sargs.Contains Storage.EventStore.Arguments.VerboseStore
storeLog, Storage.EventStore.config (log,storeLog) storeConfig sargs
storeLog, Storage.EventStore.config (log, storeLog) storeConfig sargs
| Some (DumpArguments.MsSql sargs) ->
let storeLog = createStoreLog false
storeLog, Storage.Sql.Ms.config log storeConfig sargs
Expand Down Expand Up @@ -175,10 +177,10 @@ and TestInfo(args: ParseResults<TestArguments>) =
member _.Options = args.GetResults Cached @ args.GetResults Unfolds
member x.Cache = x.Options |> List.exists (function Cached -> true | _ -> false)
member x.Unfolds = x.Options |> List.exists (function Unfolds -> true | _ -> false)
member _.Test = args.GetResult(Name,Test.Favorite)
member _.ErrorCutoff = args.GetResult(ErrorCutoff,10000L)
member _.TestsPerSecond = args.GetResult(TestsPerSecond,1000)
member _.Duration = args.GetResult(DurationM,30.) |> TimeSpan.FromMinutes
member _.Test = args.GetResult(Name, Test.Favorite)
member _.ErrorCutoff = args.GetResult(ErrorCutoff, 10000L)
member _.TestsPerSecond = args.GetResult(TestsPerSecond, 1000)
member _.Duration = args.GetResult(DurationM, 30.) |> TimeSpan.FromMinutes
member x.ReportingIntervals =
match args.GetResults(ReportIntervalS) with
| [] -> TimeSpan.FromSeconds 10.|> Seq.singleton
Expand All @@ -194,7 +196,7 @@ and TestInfo(args: ParseResults<TestArguments>) =
| Some (Es sargs) ->
let storeLog = createStoreLog <| sargs.Contains Storage.EventStore.Arguments.VerboseStore
log.Information("Running transactions in-process against EventStore with storage options: {options:l}", x.Options)
storeLog, Storage.EventStore.config (log,storeLog) (cache, x.Unfolds) sargs
storeLog, Storage.EventStore.config (log, storeLog) (cache, x.Unfolds) sargs
| Some (MsSql sargs) ->
let storeLog = createStoreLog false
log.Information("Running transactions in-process against MsSql with storage options: {options:l}", x.Options)
Expand All @@ -211,10 +213,10 @@ and TestInfo(args: ParseResults<TestArguments>) =
log.Warning("Running transactions in-process against Volatile Store with storage options: {options:l}", x.Options)
createStoreLog false, Storage.MemoryStore.config ()
member _.Tests =
match args.GetResult(Name,Favorite) with
match args.GetResult(Name, Favorite) with
| Favorite -> Tests.Favorite
| SaveForLater -> Tests.SaveForLater
| Todo -> Tests.Todo (args.GetResult(Size,100))
| Todo -> Tests.Todo (args.GetResult(Size, 100))
and Test = Favorite | SaveForLater | Todo
and CosmosModeType = Container | Db | Serverless

Expand All @@ -225,7 +227,7 @@ let createStoreLog verbose verboseConsole maybeSeqEndpoint =
let c = c.WriteTo.Sink(Equinox.EventStore.Log.InternalMetrics.Stats.LogSink())
let c = c.WriteTo.Sink(Equinox.SqlStreamStore.Log.InternalMetrics.Stats.LogSink())
let level =
match verbose,verboseConsole with
match verbose, verboseConsole with
| true, true -> LogEventLevel.Debug
| false, true -> LogEventLevel.Information
| _ -> LogEventLevel.Warning
Expand Down Expand Up @@ -254,24 +256,24 @@ module LoadTest =
with e -> domainLog.Warning(e, "Test threw an exception"); e.Reraise () }
execute
let private createResultLog fileName = LoggerConfiguration().WriteTo.File(fileName).CreateLogger()
let run (log: ILogger) (verbose,verboseConsole,maybeSeq) reportFilename (args: ParseResults<TestArguments>) =
let run (log: ILogger) (verbose, verboseConsole, maybeSeq) reportFilename (args: ParseResults<TestArguments>) =
let createStoreLog verboseStore = createStoreLog verboseStore verboseConsole maybeSeq
let a = TestInfo args
let storeLog, storeConfig, httpClient: ILogger * Storage.StorageConfig option * HttpClient option =
match args.TryGetSubCommand() with
| Some (Web wargs) ->
let uri = wargs.GetResult(WebArguments.Endpoint,"https://localhost:5001") |> Uri
log.Information("Running web test targeting: {url}", uri)
createStoreLog false, None, new HttpClient(BaseAddress=uri) |> Some
createStoreLog false, None, new HttpClient(BaseAddress = uri) |> Some
| _ ->
let storeLog, storeConfig = a.ConfigureStore(log,createStoreLog)
let storeLog, storeConfig = a.ConfigureStore(log, createStoreLog)
storeLog, Some storeConfig, None
let test, duration = a.Tests, a.Duration
let runSingleTest : ClientId -> Async<unit> =
match storeConfig, httpClient with
| None, Some client ->
let execForClient = Tests.executeRemote client test
decorateWithLogger (log,verbose) execForClient
decorateWithLogger (log, verbose) execForClient
| Some storeConfig, _ ->
let services = ServiceCollection()
Services.register(services, storeConfig, storeLog)
Expand Down Expand Up @@ -325,7 +327,7 @@ module CosmosInit =
match iargs.TryGetSubCommand() with
| Some (InitArguments.Cosmos sargs) ->
let skipStoredProc = iargs.Contains InitArguments.SkipStoredProc
let client,dName,cName = connect log sargs
let client, dName, cName = connect log sargs
let mode = (CosmosInitInfo iargs).ProvisioningMode
match mode with
| CosmosInit.Provisioning.Container ru ->
Expand All @@ -337,7 +339,7 @@ module CosmosInit =
| CosmosInit.Provisioning.Serverless ->
let modeStr = "Serverless"
log.Information("Provisioning `Equinox.CosmosStore` Store in {mode:l} mode with automatic RU/s as configured in account", modeStr)
return! CosmosInit.init log client (dName,cName) mode skipStoredProc
return! CosmosInit.init log client (dName, cName) mode skipStoredProc
| _ -> failwith "please specify a `cosmos` endpoint" }

module SqlInit =
Expand All @@ -346,13 +348,13 @@ module SqlInit =
match iargs.TryGetSubCommand() with
| Some (ConfigArguments.MsSql sargs) ->
let a = Storage.Sql.Ms.Info(sargs)
Storage.Sql.Ms.connect log (a.ConnectionString,a.Credentials,a.Schema,true) |> Async.Ignore |> Async.RunSynchronously
Storage.Sql.Ms.connect log (a.ConnectionString, a.Credentials, a.Schema, true) |> Async.Ignore |> Async.RunSynchronously
| Some (ConfigArguments.MySql sargs) ->
let a = Storage.Sql.My.Info(sargs)
Storage.Sql.My.connect log (a.ConnectionString,a.Credentials,true) |> Async.Ignore |> Async.RunSynchronously
Storage.Sql.My.connect log (a.ConnectionString, a.Credentials, true) |> Async.Ignore |> Async.RunSynchronously
| Some (ConfigArguments.Postgres sargs) ->
let a = Storage.Sql.Pg.Info(sargs)
Storage.Sql.Pg.connect log (a.ConnectionString,a.Credentials,a.Schema,true) |> Async.Ignore |> Async.RunSynchronously
Storage.Sql.Pg.connect log (a.ConnectionString, a.Credentials, a.Schema, true) |> Async.Ignore |> Async.RunSynchronously
| _ -> failwith "please specify a `ms`,`my` or `pg` endpoint" }

module CosmosStats =
Expand All @@ -365,17 +367,17 @@ module CosmosStats =
let run (log : ILogger, _verboseConsole, _maybeSeq) (args : ParseResults<StatsArguments>) = async {
match args.TryGetSubCommand() with
| Some (StatsArguments.Cosmos sargs) ->
let doS,doD,doE = args.Contains StatsArguments.Streams, args.Contains StatsArguments.Documents, args.Contains StatsArguments.Events
let doS, doD, doE = args.Contains StatsArguments.Streams, args.Contains StatsArguments.Documents, args.Contains StatsArguments.Events
let doS = doS || (not doD && not doE) // default to counting streams only unless otherwise specified
let inParallel = args.Contains Parallel
let client,dName,cName = CosmosInit.connect log sargs
let client, dName, cName = CosmosInit.connect log sargs
let container = client.GetContainer(dName, cName)
let ops =
[ if doS then yield "Streams", """SELECT VALUE COUNT(1) FROM c WHERE c.id="-1" """
if doD then yield "Documents", """SELECT VALUE COUNT(1) FROM c"""
if doE then yield "Events", """SELECT VALUE SUM(c.n) FROM c WHERE c.id="-1" """ ]
log.Information("Computing {measures} ({mode})", Seq.map fst ops, (if inParallel then "in parallel" else "serially"))
ops |> Seq.map (fun (name,sql) -> async {
ops |> Seq.map (fun (name, sql) -> async {
log.Debug("Running query: {sql}", sql)
let res = container.QueryValue<int>(sql)
log.Information("{stat}: {result:N0}", name, res)})
Expand All @@ -389,31 +391,35 @@ module Dump =
let run (log : ILogger, verboseConsole, maybeSeq) (args : ParseResults<DumpArguments>) =
let a = DumpInfo args
let createStoreLog verboseStore = createStoreLog verboseStore verboseConsole maybeSeq
let storeLog, storeConfig = a.ConfigureStore(log,createStoreLog)
let storeLog, storeConfig = a.ConfigureStore(log, createStoreLog)
let doU, doE = not (args.Contains EventsOnly), not (args.Contains UnfoldsOnly)
let doC,doJ,doP,doT = args.Contains Correlation, not (args.Contains JsonSkip), args.Contains Pretty, not (args.Contains TimeRegular)
let doC, doJ, doT = args.Contains Correlation, not (args.Contains JsonSkip), not (args.Contains TimeRegular)
let cat = Services.StreamResolver(storeConfig)

let streams = args.GetResults DumpArguments.Stream
log.ForContext("streams",streams).Information("Reading...")
let initial = List.empty
let fold state events = (events,state) ||> Seq.foldBack (fun e l -> e :: l)
let fold state events = (events, state) ||> Seq.foldBack (fun e l -> e :: l)
let tryDecode (x : FsCodec.ITimelineEvent<byte[]>) = Some x
let idCodec = FsCodec.Codec.Create((fun _ -> failwith "No encoding required"), tryDecode, (fun _ -> failwith "No mapCausation"))
let isOriginAndSnapshot = (fun (event : FsCodec.ITimelineEvent<_>) -> not doE && event.IsUnfold),fun _state -> failwith "no snapshot required"
let fo = if doP then Newtonsoft.Json.Formatting.Indented else Newtonsoft.Json.Formatting.None
let render fo (data : byte[]) =
let isOriginAndSnapshot = (fun (event : FsCodec.ITimelineEvent<_>) -> not doE && event.IsUnfold), fun _state -> failwith "no snapshot required"
let formatUnfolds, formatEvents =
let indentedOptions = FsCodec.SystemTextJson.Options.Create(indent = true)
let prettify : string -> _ = System.Text.Json.JsonDocument.Parse >> fun d -> System.Text.Json.JsonSerializer.Serialize(d, indentedOptions)
if args.Contains FlattenUnfolds then id else prettify
, if args.Contains Pretty then prettify else id
let render format (data : byte[]) =
try match data with
| null | [||] -> null
| _ when doJ -> System.Text.Encoding.UTF8.GetString data |> Newtonsoft.Json.Linq.JObject.Parse |> fun x -> x.ToString fo
| _ -> sprintf "(%d chars)" (System.Text.Encoding.UTF8.GetString(data).Length)
with e -> log.ForContext("str", System.Text.Encoding.UTF8.GetString data).Warning(e, "Parse failure"); reraise()
| _ when doJ -> System.Text.Encoding.UTF8.GetString data |> format
| _ -> $"(%d{System.Text.Encoding.UTF8.GetString(data).Length} chars)"
with e -> log.ForContext("str", System.Text.Encoding.UTF8.GetString data).Warning(e, "JSON Parse failure - use SkipJson option to inhibit"); reraise()
let readStream (streamName : FsCodec.StreamName) = async {
let stream = cat.Resolve(idCodec,fold,initial,isOriginAndSnapshot) streamName
let stream = cat.Resolve(idCodec, fold, initial, isOriginAndSnapshot) streamName
let! _token, events = stream.Load(storeLog, allowStale = false)
let mutable prevTs = None
for x in events |> Seq.filter (fun e -> (e.IsUnfold && doU) || (not e.IsUnfold && doE)) do
let ty,render = if x.IsUnfold then "U", render Newtonsoft.Json.Formatting.Indented else "E", render fo
let ty, render = if x.IsUnfold then "U", render formatUnfolds else "E", render formatEvents
let interval =
match prevTs with Some p when not x.IsUnfold -> Some (x.Timestamp - p) | _ -> None
|> function
Expand All @@ -431,7 +437,7 @@ module Dump =
streams
|> Seq.map readStream
|> Async.Parallel
|> Async.Ignore
|> Async.Ignore<unit[]>

[<EntryPoint>]
let main argv =
Expand All @@ -448,8 +454,8 @@ let main argv =
| Dump dargs -> Dump.run (log, verboseConsole, maybeSeq) dargs |> Async.RunSynchronously
| Stats sargs -> CosmosStats.run (log, verboseConsole, maybeSeq) sargs |> Async.RunSynchronously
| Run rargs ->
let reportFilename = args.GetResult(LogFile,programName+".log") |> fun n -> System.IO.FileInfo(n).FullName
LoadTest.run log (verbose,verboseConsole,maybeSeq) reportFilename rargs
let reportFilename = args.GetResult(LogFile, programName + ".log") |> fun n -> System.IO.FileInfo(n).FullName
LoadTest.run log (verbose, verboseConsole, maybeSeq) reportFilename rargs
| _ -> failwith "Please specify a valid subcommand :- init, config, dump, stats or run"
0
with e -> log.Debug(e, "Fatal error; exiting"); reraise ()
Expand Down

0 comments on commit 0767c57

Please sign in to comment.