From 0767c578b9b33cddedf87c1f54c4417d7bcf17be Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 10 Mar 2022 12:06:51 +0000 Subject: [PATCH] Fix eqx dump handling of null values (#319) Switched from Newtonsoft to STJ --- CHANGELOG.md | 4 +- tools/Equinox.Tool/Equinox.Tool.fsproj | 11 ---- tools/Equinox.Tool/Program.fs | 76 ++++++++++++++------------ 3 files changed, 44 insertions(+), 47 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fd6fda2b2..5ea92cb50 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,11 +10,12 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Added +- `eqx dump`/`Equinox.Tool`: Add `-F` option to opt out of pretty printing unfolds [#319](https://github.com/jet/equinox/pull/319) - `Equinox`: `Decider.Transact(interpret : 'state -> Async<'event list>)` [#314](https://github.com/jet/equinox/pull/314) ### Changed -- `eqx`/`Equinox.Tool`: Flip `-P` option to opt _in_ to pretty printing [#313](https://github.com/jet/equinox/pull/313) +- `eqx dump`/`Equinox.Tool`: Flip `-P` option to opt _in_ to pretty printing for events [#313](https://github.com/jet/equinox/pull/313) - `Equinox`: rename `Decider.TransactAsync` to `Transact` [#314](https://github.com/jet/equinox/pull/314) - `Equinox`: Merge `ResolveOption` and `XXXStoreCategory.FromMemento` as `LoadOption` [#308](https://github.com/jet/equinox/pull/308) - `Equinox`: Merge `XXXStoreCategory.Resolve(sn, ?ResolveOption)` and `XXXStoreCategory.FromMemento` as option `LoadOption` parameter on all `Transact` and `Query` methods [#308](https://github.com/jet/equinox/pull/308) @@ -32,6 +33,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Fixed +- `eqx dump`/`Equinox.Tool`: Use `System.Text.Json` for pretty printing to handle `null` values correctly [#319](https://github.com/jet/equinox/pull/319) - `EventStore`: Revise test rig to target a Docker-hosted cluster [#317](https://github.com/jet/equinox/pull/317) - `EventStore/SqlStreamStore`: rename `Equinox.XXXStore.Log.Event` -> `Metric` to match `CosmosStore` [#311](https://github.com/jet/equinox/pull/311) - `SqlStreamStore`: Fix `Metric` key to be `ssEvt` (was `esEvt`) [#311](https://github.com/jet/equinox/pull/311) diff --git a/tools/Equinox.Tool/Equinox.Tool.fsproj b/tools/Equinox.Tool/Equinox.Tool.fsproj index 3da660158..d3d1eb9b1 100644 --- a/tools/Equinox.Tool/Equinox.Tool.fsproj +++ b/tools/Equinox.Tool/Equinox.Tool.fsproj @@ -6,7 +6,6 @@ 5 false - true true Equinox.Tool @@ -36,16 +35,6 @@ - - - - - - - - - - diff --git a/tools/Equinox.Tool/Program.fs b/tools/Equinox.Tool/Program.fs index c2b1af7e5..b43819b43 100644 --- a/tools/Equinox.Tool/Program.fs +++ b/tools/Equinox.Tool/Program.fs @@ -91,6 +91,7 @@ and []DumpArguments = | [] Correlation | [] JsonSkip | [] Pretty + | [] FlattenUnfolds | [] TimeRegular | [] UnfoldsOnly | [] EventsOnly @@ -105,6 +106,7 @@ and []DumpArguments = | Correlation -> "Include Correlation/Causation identifiers" | JsonSkip -> "Don't attempt to decode JSON" | Pretty -> "Pretty print the JSON over multiple lines" + | FlattenUnfolds -> "Don't pretty print the JSON over multiple lines for Unfolds" | TimeRegular -> "Don't humanize time intervals between events" | UnfoldsOnly -> "Exclude Events. Default: show both Events and Unfolds" | EventsOnly -> "Exclude Unfolds/Snapshots. Default: show both Events and Unfolds." @@ -122,7 +124,7 @@ and DumpInfo(args: ParseResults) = storeLog, Storage.Cosmos.config log storeConfig (Storage.Cosmos.Info sargs) | Some (DumpArguments.Es sargs) -> let storeLog = createStoreLog <| sargs.Contains Storage.EventStore.Arguments.VerboseStore - storeLog, Storage.EventStore.config (log,storeLog) storeConfig sargs + storeLog, Storage.EventStore.config (log, storeLog) storeConfig sargs | Some (DumpArguments.MsSql sargs) -> let storeLog = createStoreLog false storeLog, Storage.Sql.Ms.config log storeConfig sargs @@ -175,10 +177,10 @@ and TestInfo(args: ParseResults) = member _.Options = args.GetResults Cached @ args.GetResults Unfolds member x.Cache = x.Options |> List.exists (function Cached -> true | _ -> false) member x.Unfolds = x.Options |> List.exists (function Unfolds -> true | _ -> false) - member _.Test = args.GetResult(Name,Test.Favorite) - member _.ErrorCutoff = args.GetResult(ErrorCutoff,10000L) - member _.TestsPerSecond = args.GetResult(TestsPerSecond,1000) - member _.Duration = args.GetResult(DurationM,30.) |> TimeSpan.FromMinutes + member _.Test = args.GetResult(Name, Test.Favorite) + member _.ErrorCutoff = args.GetResult(ErrorCutoff, 10000L) + member _.TestsPerSecond = args.GetResult(TestsPerSecond, 1000) + member _.Duration = args.GetResult(DurationM, 30.) |> TimeSpan.FromMinutes member x.ReportingIntervals = match args.GetResults(ReportIntervalS) with | [] -> TimeSpan.FromSeconds 10.|> Seq.singleton @@ -194,7 +196,7 @@ and TestInfo(args: ParseResults) = | Some (Es sargs) -> let storeLog = createStoreLog <| sargs.Contains Storage.EventStore.Arguments.VerboseStore log.Information("Running transactions in-process against EventStore with storage options: {options:l}", x.Options) - storeLog, Storage.EventStore.config (log,storeLog) (cache, x.Unfolds) sargs + storeLog, Storage.EventStore.config (log, storeLog) (cache, x.Unfolds) sargs | Some (MsSql sargs) -> let storeLog = createStoreLog false log.Information("Running transactions in-process against MsSql with storage options: {options:l}", x.Options) @@ -211,10 +213,10 @@ and TestInfo(args: ParseResults) = log.Warning("Running transactions in-process against Volatile Store with storage options: {options:l}", x.Options) createStoreLog false, Storage.MemoryStore.config () member _.Tests = - match args.GetResult(Name,Favorite) with + match args.GetResult(Name, Favorite) with | Favorite -> Tests.Favorite | SaveForLater -> Tests.SaveForLater - | Todo -> Tests.Todo (args.GetResult(Size,100)) + | Todo -> Tests.Todo (args.GetResult(Size, 100)) and Test = Favorite | SaveForLater | Todo and CosmosModeType = Container | Db | Serverless @@ -225,7 +227,7 @@ let createStoreLog verbose verboseConsole maybeSeqEndpoint = let c = c.WriteTo.Sink(Equinox.EventStore.Log.InternalMetrics.Stats.LogSink()) let c = c.WriteTo.Sink(Equinox.SqlStreamStore.Log.InternalMetrics.Stats.LogSink()) let level = - match verbose,verboseConsole with + match verbose, verboseConsole with | true, true -> LogEventLevel.Debug | false, true -> LogEventLevel.Information | _ -> LogEventLevel.Warning @@ -254,7 +256,7 @@ module LoadTest = with e -> domainLog.Warning(e, "Test threw an exception"); e.Reraise () } execute let private createResultLog fileName = LoggerConfiguration().WriteTo.File(fileName).CreateLogger() - let run (log: ILogger) (verbose,verboseConsole,maybeSeq) reportFilename (args: ParseResults) = + let run (log: ILogger) (verbose, verboseConsole, maybeSeq) reportFilename (args: ParseResults) = let createStoreLog verboseStore = createStoreLog verboseStore verboseConsole maybeSeq let a = TestInfo args let storeLog, storeConfig, httpClient: ILogger * Storage.StorageConfig option * HttpClient option = @@ -262,16 +264,16 @@ module LoadTest = | Some (Web wargs) -> let uri = wargs.GetResult(WebArguments.Endpoint,"https://localhost:5001") |> Uri log.Information("Running web test targeting: {url}", uri) - createStoreLog false, None, new HttpClient(BaseAddress=uri) |> Some + createStoreLog false, None, new HttpClient(BaseAddress = uri) |> Some | _ -> - let storeLog, storeConfig = a.ConfigureStore(log,createStoreLog) + let storeLog, storeConfig = a.ConfigureStore(log, createStoreLog) storeLog, Some storeConfig, None let test, duration = a.Tests, a.Duration let runSingleTest : ClientId -> Async = match storeConfig, httpClient with | None, Some client -> let execForClient = Tests.executeRemote client test - decorateWithLogger (log,verbose) execForClient + decorateWithLogger (log, verbose) execForClient | Some storeConfig, _ -> let services = ServiceCollection() Services.register(services, storeConfig, storeLog) @@ -325,7 +327,7 @@ module CosmosInit = match iargs.TryGetSubCommand() with | Some (InitArguments.Cosmos sargs) -> let skipStoredProc = iargs.Contains InitArguments.SkipStoredProc - let client,dName,cName = connect log sargs + let client, dName, cName = connect log sargs let mode = (CosmosInitInfo iargs).ProvisioningMode match mode with | CosmosInit.Provisioning.Container ru -> @@ -337,7 +339,7 @@ module CosmosInit = | CosmosInit.Provisioning.Serverless -> let modeStr = "Serverless" log.Information("Provisioning `Equinox.CosmosStore` Store in {mode:l} mode with automatic RU/s as configured in account", modeStr) - return! CosmosInit.init log client (dName,cName) mode skipStoredProc + return! CosmosInit.init log client (dName, cName) mode skipStoredProc | _ -> failwith "please specify a `cosmos` endpoint" } module SqlInit = @@ -346,13 +348,13 @@ module SqlInit = match iargs.TryGetSubCommand() with | Some (ConfigArguments.MsSql sargs) -> let a = Storage.Sql.Ms.Info(sargs) - Storage.Sql.Ms.connect log (a.ConnectionString,a.Credentials,a.Schema,true) |> Async.Ignore |> Async.RunSynchronously + Storage.Sql.Ms.connect log (a.ConnectionString, a.Credentials, a.Schema, true) |> Async.Ignore |> Async.RunSynchronously | Some (ConfigArguments.MySql sargs) -> let a = Storage.Sql.My.Info(sargs) - Storage.Sql.My.connect log (a.ConnectionString,a.Credentials,true) |> Async.Ignore |> Async.RunSynchronously + Storage.Sql.My.connect log (a.ConnectionString, a.Credentials, true) |> Async.Ignore |> Async.RunSynchronously | Some (ConfigArguments.Postgres sargs) -> let a = Storage.Sql.Pg.Info(sargs) - Storage.Sql.Pg.connect log (a.ConnectionString,a.Credentials,a.Schema,true) |> Async.Ignore |> Async.RunSynchronously + Storage.Sql.Pg.connect log (a.ConnectionString, a.Credentials, a.Schema, true) |> Async.Ignore |> Async.RunSynchronously | _ -> failwith "please specify a `ms`,`my` or `pg` endpoint" } module CosmosStats = @@ -365,17 +367,17 @@ module CosmosStats = let run (log : ILogger, _verboseConsole, _maybeSeq) (args : ParseResults) = async { match args.TryGetSubCommand() with | Some (StatsArguments.Cosmos sargs) -> - let doS,doD,doE = args.Contains StatsArguments.Streams, args.Contains StatsArguments.Documents, args.Contains StatsArguments.Events + let doS, doD, doE = args.Contains StatsArguments.Streams, args.Contains StatsArguments.Documents, args.Contains StatsArguments.Events let doS = doS || (not doD && not doE) // default to counting streams only unless otherwise specified let inParallel = args.Contains Parallel - let client,dName,cName = CosmosInit.connect log sargs + let client, dName, cName = CosmosInit.connect log sargs let container = client.GetContainer(dName, cName) let ops = [ if doS then yield "Streams", """SELECT VALUE COUNT(1) FROM c WHERE c.id="-1" """ if doD then yield "Documents", """SELECT VALUE COUNT(1) FROM c""" if doE then yield "Events", """SELECT VALUE SUM(c.n) FROM c WHERE c.id="-1" """ ] log.Information("Computing {measures} ({mode})", Seq.map fst ops, (if inParallel then "in parallel" else "serially")) - ops |> Seq.map (fun (name,sql) -> async { + ops |> Seq.map (fun (name, sql) -> async { log.Debug("Running query: {sql}", sql) let res = container.QueryValue(sql) log.Information("{stat}: {result:N0}", name, res)}) @@ -389,31 +391,35 @@ module Dump = let run (log : ILogger, verboseConsole, maybeSeq) (args : ParseResults) = let a = DumpInfo args let createStoreLog verboseStore = createStoreLog verboseStore verboseConsole maybeSeq - let storeLog, storeConfig = a.ConfigureStore(log,createStoreLog) + let storeLog, storeConfig = a.ConfigureStore(log, createStoreLog) let doU, doE = not (args.Contains EventsOnly), not (args.Contains UnfoldsOnly) - let doC,doJ,doP,doT = args.Contains Correlation, not (args.Contains JsonSkip), args.Contains Pretty, not (args.Contains TimeRegular) + let doC, doJ, doT = args.Contains Correlation, not (args.Contains JsonSkip), not (args.Contains TimeRegular) let cat = Services.StreamResolver(storeConfig) let streams = args.GetResults DumpArguments.Stream log.ForContext("streams",streams).Information("Reading...") let initial = List.empty - let fold state events = (events,state) ||> Seq.foldBack (fun e l -> e :: l) + let fold state events = (events, state) ||> Seq.foldBack (fun e l -> e :: l) let tryDecode (x : FsCodec.ITimelineEvent) = Some x let idCodec = FsCodec.Codec.Create((fun _ -> failwith "No encoding required"), tryDecode, (fun _ -> failwith "No mapCausation")) - let isOriginAndSnapshot = (fun (event : FsCodec.ITimelineEvent<_>) -> not doE && event.IsUnfold),fun _state -> failwith "no snapshot required" - let fo = if doP then Newtonsoft.Json.Formatting.Indented else Newtonsoft.Json.Formatting.None - let render fo (data : byte[]) = + let isOriginAndSnapshot = (fun (event : FsCodec.ITimelineEvent<_>) -> not doE && event.IsUnfold), fun _state -> failwith "no snapshot required" + let formatUnfolds, formatEvents = + let indentedOptions = FsCodec.SystemTextJson.Options.Create(indent = true) + let prettify : string -> _ = System.Text.Json.JsonDocument.Parse >> fun d -> System.Text.Json.JsonSerializer.Serialize(d, indentedOptions) + if args.Contains FlattenUnfolds then id else prettify + , if args.Contains Pretty then prettify else id + let render format (data : byte[]) = try match data with | null | [||] -> null - | _ when doJ -> System.Text.Encoding.UTF8.GetString data |> Newtonsoft.Json.Linq.JObject.Parse |> fun x -> x.ToString fo - | _ -> sprintf "(%d chars)" (System.Text.Encoding.UTF8.GetString(data).Length) - with e -> log.ForContext("str", System.Text.Encoding.UTF8.GetString data).Warning(e, "Parse failure"); reraise() + | _ when doJ -> System.Text.Encoding.UTF8.GetString data |> format + | _ -> $"(%d{System.Text.Encoding.UTF8.GetString(data).Length} chars)" + with e -> log.ForContext("str", System.Text.Encoding.UTF8.GetString data).Warning(e, "JSON Parse failure - use SkipJson option to inhibit"); reraise() let readStream (streamName : FsCodec.StreamName) = async { - let stream = cat.Resolve(idCodec,fold,initial,isOriginAndSnapshot) streamName + let stream = cat.Resolve(idCodec, fold, initial, isOriginAndSnapshot) streamName let! _token, events = stream.Load(storeLog, allowStale = false) let mutable prevTs = None for x in events |> Seq.filter (fun e -> (e.IsUnfold && doU) || (not e.IsUnfold && doE)) do - let ty,render = if x.IsUnfold then "U", render Newtonsoft.Json.Formatting.Indented else "E", render fo + let ty, render = if x.IsUnfold then "U", render formatUnfolds else "E", render formatEvents let interval = match prevTs with Some p when not x.IsUnfold -> Some (x.Timestamp - p) | _ -> None |> function @@ -431,7 +437,7 @@ module Dump = streams |> Seq.map readStream |> Async.Parallel - |> Async.Ignore + |> Async.Ignore [] let main argv = @@ -448,8 +454,8 @@ let main argv = | Dump dargs -> Dump.run (log, verboseConsole, maybeSeq) dargs |> Async.RunSynchronously | Stats sargs -> CosmosStats.run (log, verboseConsole, maybeSeq) sargs |> Async.RunSynchronously | Run rargs -> - let reportFilename = args.GetResult(LogFile,programName+".log") |> fun n -> System.IO.FileInfo(n).FullName - LoadTest.run log (verbose,verboseConsole,maybeSeq) reportFilename rargs + let reportFilename = args.GetResult(LogFile, programName + ".log") |> fun n -> System.IO.FileInfo(n).FullName + LoadTest.run log (verbose, verboseConsole, maybeSeq) reportFilename rargs | _ -> failwith "Please specify a valid subcommand :- init, config, dump, stats or run" 0 with e -> log.Debug(e, "Fatal error; exiting"); reraise ()