Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: use FsCodec.Encoded bodies #266

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions src/Propulsion.CosmosStore/CosmosStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ module private Impl =
type EventBody = byte[] // V4 defines one directly, here we shim it
module StreamSpan =

let toNativeEventBody (xs: Propulsion.Sinks.EventBody): byte[] = xs.ToArray()
let toNativeEventBody (x: Propulsion.Sinks.EventBody): EventBody = FsCodec.Encoding.ToBlob(x).ToArray()

// Trimmed edition of what V4 exposes
module internal Equinox =
module CosmosStore =
Expand All @@ -31,11 +32,7 @@ module private Impl =
#else
module StreamSpan =

// v4 and later use JsonElement, but Propulsion is using ReadOnlyMemory<byte> rather than assuming and/or offering optimization for JSON bodies
open System.Text.Json
let toNativeEventBody (x: EventBody): JsonElement =
if x.IsEmpty then JsonElement()
else JsonSerializer.Deserialize<JsonElement>(x.Span)
let toNativeEventBody (x: EventBody): Equinox.CosmosStore.Core.EncodedBody = FsCodec.SystemTextJson.Encoding.OfUtf8Encoded x
#endif

module Internal =
Expand Down Expand Up @@ -65,7 +62,7 @@ module Internal =
let write (log: ILogger) (ctx: EventsContext) stream (span: Event[]) ct = task {
let i = StreamSpan.index span
let n = StreamSpan.next span
let mapData = FsCodec.Core.EventData.Map StreamSpan.toNativeEventBody
let mapData = FsCodec.Core.EventData.mapBodies StreamSpan.toNativeEventBody
#if COSMOSV3
span |> Seq.iter (fun x -> if x.IsUnfold then invalidOp "CosmosStore3 does not [yet] support ingesting unfolds")
log.Debug("Writing {s}@{i}x{n}", stream, i, span.Length)
Expand Down
14 changes: 6 additions & 8 deletions src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ module EquinoxSystemTextJsonParser =

type System.Text.Json.JsonElement with
member x.Cast<'T>() = System.Text.Json.JsonSerializer.Deserialize<'T>(x)
member x.ToSinkEventBody() = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes x |> System.ReadOnlyMemory

type System.Text.Json.JsonDocument with member x.Cast<'T>() = x.RootElement.Cast<'T>()
let timestamp (doc: System.Text.Json.JsonDocument) =
Expand All @@ -38,12 +37,13 @@ module EquinoxSystemTextJsonParser =
/// Enumerates the Events and/or Unfolds represented within an Equinox.CosmosStore Batch or Tip Item
let enumEquinoxCosmosBatchOrTip (u: System.Text.Json.JsonElement voption) (batch: Batch): Event seq =
let inline gen isUnfold i (x: Equinox.CosmosStore.Core.Event) =
let d = x.d.ToSinkEventBody()
let m = x.m.ToSinkEventBody()
let d = EncodedBody.ofUnfoldBody (x.D, x.d) |> FsCodec.SystemTextJson.Encoding.ToEncodedUtf8
let m = EncodedBody.ofUnfoldBody (x.M, x.m) |> FsCodec.SystemTextJson.Encoding.ToEncodedUtf8
let inline len s = if isNull s then 0 else String.length s
let size = x.c.Length + FsCodec.Encoding.ByteCount d + FsCodec.Encoding.ByteCount m
+ len x.correlationId + len x.causationId + 80
FsCodec.Core.TimelineEvent.Create(i, x.c, d, m, timestamp = x.t,
size = x.c.Length + d.Length + m.Length + len x.correlationId + len x.causationId + 80,
correlationId = x.correlationId, causationId = x.causationId, isUnfold = isUnfold)
size = size, correlationId = x.correlationId, causationId = x.causationId, isUnfold = isUnfold)
let events = batch.e |> Seq.mapi (fun offset -> gen false (batch.i + int64 offset))
// an Unfold won't have a corr/cause id, but that's OK - can't use Tip type as don't want to expand compressed form etc
match u |> ValueOption.map (fun u -> u.Cast<Equinox.CosmosStore.Core.Event[]>()) with
Expand Down Expand Up @@ -87,8 +87,6 @@ module EquinoxNewtonsoftParser =
type Newtonsoft.Json.Linq.JObject with
member document.Cast<'T>() =
document.ToObject<'T>()
type Batch with
member _.MapData x = x

let timestamp (doc: Newtonsoft.Json.Linq.JObject) =
let unixEpoch = System.DateTime.UnixEpoch
Expand All @@ -101,7 +99,7 @@ module EquinoxNewtonsoftParser =
/// Enumerates the events represented within a batch
let enumEquinoxCosmosEvents (batch: Batch): StreamEvent seq =
let streamName = FsCodec.StreamName.parse batch.p // we expect all Equinox data to adhere to "{category}-{streamId}" form (or we'll throw)
batch.e |> Seq.mapi (fun offset x -> streamName, FsCodec.Core.TimelineEvent.Create(batch.i + int64 offset, x.c, batch.MapData x.d, batch.MapData x.m, timestamp=x.t))
batch.e |> Seq.mapi (fun offset x -> streamName, FsCodec.Core.TimelineEvent.Create(batch.i + int64 offset, x.c, FsCodec.Encoding.OfBlob x.d, FsCodec.Encoding.OfBlob x.m, timestamp = x.t))

/// Collects all events with a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
let enumStreamEvents d: StreamEvent seq =
Expand Down
4 changes: 2 additions & 2 deletions src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="5.0.0" PrivateAssets="All" />

<PackageReference Include="Equinox.CosmosStore" Version="4.1.0-alpha.15" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.2" />
<PackageReference Include="Equinox.CosmosStore" Version="4.1.0-alpha.20" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.4-alpha.0.15" />
</ItemGroup>

<ItemGroup>
Expand Down
4 changes: 2 additions & 2 deletions src/Propulsion.CosmosStore/ReaderCheckpoint.fs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ module Events =
let codec = FsCodec.Box.Codec.Create<Event>()
#else
#if DYNAMOSTORE
let codec = FsCodec.SystemTextJson.Codec.Create<Event>() |> FsCodec.Compression.EncodeUncompressed
let codec = FsCodec.SystemTextJson.Codec.Create<Event>() |> FsCodec.Encoder.Uncompressed
#else
#if !COSMOSV3
let codec = FsCodec.SystemTextJson.CodecJsonElement.Create<Event>()
Expand Down Expand Up @@ -193,7 +193,7 @@ module CosmosStore =

let accessStrategy = AccessStrategy.Custom (Fold.isOrigin, Fold.transmute)
let create log (consumerGroupName, defaultCheckpointFrequency) (context, cache) =
let cat = CosmosStoreCategory(context, Stream.Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy cache)
let cat = CosmosStoreCategory(context, Stream.Category, FsCodec.SystemTextJson.Encoder.Compressed Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy cache)
let resolve = Equinox.Decider.forStream log cat
Service(Stream.id >> resolve, consumerGroupName, defaultCheckpointFrequency)
#else
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore3/Propulsion.CosmosStore3.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
<PackageReference Include="MinVer" Version="5.0.0" PrivateAssets="All" />

<PackageReference Include="Equinox.CosmosStore" Version="[3.0.7, 3.99.0]" />
<PackageReference Include="FsCodec" Version="3.0.0" />
<PackageReference Include="FsCodec" Version="3.0.4-alpha.0.15" />
<PackageReference Include="FSharp.Control.TaskSeq" Version="0.4.0" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.27.0" ExcludeAssets="contentfiles" />
</ItemGroup>
Expand Down
7 changes: 3 additions & 4 deletions src/Propulsion.DynamoStore/DynamoStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ module private Impl =
sw.Stop()
let totalStreams, chosenEvents, totalEvents, streamEvents =
let all = state.changes |> Seq.collect (fun struct (_i, xs) -> xs) |> AppendsEpoch.flatten |> Array.ofSeq
let totalEvents = all |> Array.sumBy (fun x -> x.c.Length)
let totalEvents = all |> Array.sumBy _.c.Length
let mutable chosenEvents = 0
let chooseStream (span: AppendsEpoch.Events.StreamSpan) =
match maybeLoad (IndexStreamId.toStreamName span.p) (span.i, span.c) with
Expand Down Expand Up @@ -84,7 +84,7 @@ module private Impl =
| true, (items: FsCodec.ITimelineEvent<_>[]) ->
// NOTE this could throw if a span has been indexed, but the stream read is from a replica that does not yet have it
// the exception in that case will trigger a safe re-read from the last saved read position that a consumer has forwarded
// TOCONSIDER revise logic to share session key etc to rule this out
// TOCONSIDER revise logic to share session key etc. to rule this out
let events = Array.sub items (span.i - items[0].Index |> int) span.c.Length
for e in events -> struct (IndexStreamId.toStreamName span.p, e) |] }
let mutable prevLoaded, batchIndex = 0L, 0
Expand Down Expand Up @@ -123,13 +123,12 @@ type EventLoadMode =
* /// Defines the Context to use when loading the Event Data/Meta
storeContext: DynamoStoreContext
module internal EventLoadMode =
let private mapTimelineEvent = FsCodec.Core.TimelineEvent.Map(Func<_, _> FsCodec.Compression.EncodedToUtf8)
let private withData (eventsContext: Equinox.DynamoStore.Core.EventsContext) streamFilter =
fun sn (i, cs: string[]) ->
if streamFilter sn then
ValueSome (fun ct -> task {
let! events = eventsContext.Read(sn, ct, i, maxCount = cs.Length)
return events |> Array.map mapTimelineEvent })
return events })
else ValueNone
let private withoutData streamFilter =
fun sn (i, cs) ->
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<PackageReference Include="MinVer" Version="5.0.0" PrivateAssets="All" />

<PackageReference Include="Equinox.DynamoStore" Version="4.0.0" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.2" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.4-alpha.0.15" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.DynamoStore/Store.fs
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ module Dynamo =
module internal Codec =

let gen<'t when 't :> TypeShape.UnionContract.IUnionContract> =
FsCodec.SystemTextJson.Codec.Create<'t>() |> FsCodec.Compression.EncodeTryCompress
FsCodec.SystemTextJson.Codec.Create<'t>() |> FsCodec.Encoder.Compressed
2 changes: 1 addition & 1 deletion src/Propulsion.DynamoStore/Types.fs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ module Streams =

let private withUpconverter<'c, 'e when 'c :> TypeShape.UnionContract.IUnionContract> up: FsCodec.IEventCodec<'e, _, _> =
let down (_: 'e) = failwith "Unexpected"
FsCodec.SystemTextJson.Codec.Create<'e, 'c, _>(up, down) |> FsCodec.Compression.EncodeTryCompress
FsCodec.SystemTextJson.Codec.Create<'e, 'c, _>(up, down) |> FsCodec.Encoder.Compressed
let decWithIndex<'c when 'c :> TypeShape.UnionContract.IUnionContract> : FsCodec.IEventCodec<struct (int64 * 'c), _, _> =
let up (raw: FsCodec.ITimelineEvent<_>) e = struct (raw.Index, e)
withUpconverter<'c, struct (int64 * 'c)> up
Expand Down
4 changes: 2 additions & 2 deletions src/Propulsion.EventStore/EventStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ module Internal =
let i = StreamSpan.index span
log.Debug("Writing {s}@{i}x{n}", stream, i, span.Length)
#if EVENTSTORE_LEGACY
let! res = context.Sync(log, stream, i - 1L, span |> Array.map (fun span -> span :> _))
let! res = context.Sync(log, stream, i - 1L, span |> Array.map (FsCodec.Core.EventData.mapBodies FsCodec.Encoding.ToBlob))
#else
let! res = context.Sync(log, stream, i - 1L, span |> Array.map (fun span -> span :> _), ct)
let! res = context.Sync(log, stream, i - 1L, span |> Array.map (FsCodec.Core.EventData.mapBodies FsCodec.Encoding.ToBlob), ct)
#endif
let res' =
match res with
Expand Down
3 changes: 1 addition & 2 deletions src/Propulsion.EventStore/EventStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ module Mapping =
member x.Timestamp = DateTimeOffset.FromUnixTimeMilliseconds(x.CreatedEpoch)

let (|PropulsionTimelineEvent|) (x: RecordedEvent): Propulsion.Sinks.Event =
let inline len0ToNull (x: _[]) = match x with null -> ReadOnlyMemory.Empty | x when x.Length = 0 -> ReadOnlyMemory.Empty | x -> ReadOnlyMemory x
FsCodec.Core.TimelineEvent.Create(x.EventNumber, x.EventType, len0ToNull x.Data, len0ToNull x.Metadata, timestamp = x.Timestamp)
FsCodec.Core.TimelineEvent.Create(x.EventNumber, x.EventType, FsCodec.Encoding.OfBlob x.Data, FsCodec.Encoding.OfBlob x.Metadata, timestamp = x.Timestamp)

let (|PropulsionStreamEvent|) (x: RecordedEvent): Propulsion.Sinks.StreamEvent =
Propulsion.Streams.StreamName.internalParseSafe x.EventStreamId, (|PropulsionTimelineEvent|) x
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.EventStoreDb/EventStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module private Impl =
for e in events do
let sn = Propulsion.Streams.StreamName.internalParseSafe e.EventStreamId
if streamFilter sn then
yield sn, Equinox.EventStoreDb.ClientCodec.timelineEvent e |]
yield sn, Equinox.EventStoreDb.ClientCodec.timelineEvent e |> FsCodec.Core.TimelineEvent.mapBodies FsCodec.Encoding.OfBlob |]
let private checkpointPos (xs: EventRecord[]) =
match Array.tryLast xs with Some e -> int64 e.Position.CommitPosition | None -> -1L
|> Propulsion.Feed.Position.parse
Expand Down
10 changes: 5 additions & 5 deletions src/Propulsion.Kafka/Codec.fs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ type [<NoEquality; NoComparison>] RenderedSpan =
module RenderedSpan =

let ofStreamSpan streamName (span: Event[]): RenderedSpan =
let ta (x: EventBody) = x.ToArray()
let ta (x: EventBody) = FsCodec.Encoding.ToBlob(x).ToArray()
{ s = FsCodec.StreamName.toString streamName
i = span[0].Index
e = span |> Array.map (fun x -> { c = x.EventType; t = x.Timestamp; d = ta x.Data; m = ta x.Meta }) }

let enum (span: RenderedSpan): StreamEvent seq =
let streamName = Propulsion.Streams.StreamName.internalParseSafe span.s
let td (x: byte[]): EventBody = System.ReadOnlyMemory x
let inline mkEvent offset (e: RenderedEvent) = FsCodec.Core.TimelineEvent.Create(span.i+int64 offset, e.c, td e.d, td e.m, timestamp = e.t)
let td (x: byte[]): EventBody = FsCodec.Encoding.OfBlob x
let inline mkEvent offset (e: RenderedEvent) = FsCodec.Core.TimelineEvent.Create(span.i + int64 offset, e.c, td e.d, td e.m, timestamp = e.t)
span.e |> Seq.mapi (fun i e -> streamName, mkEvent i e)

let parse (spanJson: string): StreamEvent seq =
Expand All @@ -92,7 +92,7 @@ type [<NoEquality; NoComparison>] RenderedSummary =
module RenderedSummary =

let ofStreamEvents (streamName: FsCodec.StreamName) (index: int64) (events: FsCodec.IEventData<EventBody> seq): RenderedSummary =
let ta (x: EventBody): byte[] = x.ToArray()
let ta (x: EventBody): byte[] = FsCodec.Encoding.ToBlob(x).ToArray()
{ s = FsCodec.StreamName.toString streamName
i = index
u = [| for x in events -> { c = x.EventType; t = x.Timestamp; d = ta x.Data; m = ta x.Meta } |] }
Expand All @@ -102,7 +102,7 @@ module RenderedSummary =

let enum (span: RenderedSummary): StreamEvent seq =
let streamName = Propulsion.Streams.StreamName.internalParseSafe span.s
seq { for e in span.u -> streamName, FsCodec.Core.TimelineEvent.Create(span.i, e.c, e.d, e.m, timestamp=e.t, isUnfold=true) }
seq { for e in span.u -> streamName, FsCodec.Core.TimelineEvent.Create(span.i, e.c, FsCodec.Encoding.OfBlob e.d, FsCodec.Encoding.OfBlob e.m, timestamp = e.t, isUnfold = true) }

let parse (spanJson: string): StreamEvent seq =
spanJson |> RenderedSummary.Parse |> enum
8 changes: 4 additions & 4 deletions src/Propulsion.Kafka/Consumers.fs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ module private Impl =

/// guesstimate approximate message size in bytes
let approximateMessageBytes (m: Message<string, string>) =
let inline len (x: string) = match x with null -> 0 | x -> sizeof<char> * x.Length
16 + len m.Key + len m.Value |> int64
let inline stringLen (x: string) = match x with null -> 0 | x -> sizeof<char> * x.Length
16 + stringLen m.Key + stringLen m.Value |> int64

module private Binding =

Expand Down Expand Up @@ -259,7 +259,7 @@ module Core =
if m = null then invalidOp "Cannot dereference null message"
let data = System.Text.Encoding.UTF8.GetBytes m.Value
let context = { topic = result.Topic; partition = Binding.partitionValue result.Partition; offset = Binding.offsetValue result.Offset }
(ReadOnlyMemory data, box context)
(FsCodec.Encoding.OfBlob data, box context)

/// StreamsSink buffers and deduplicates messages from a contiguous stream with each event bearing a monotonically incrementing `Index`.
/// Where the messages we consume don't have such characteristics, we need to maintain a fake `Index` by keeping an int per stream in a dictionary
Expand Down Expand Up @@ -325,7 +325,7 @@ type StreamNameSequenceGenerator() =
member x.KeyValueToStreamEvent(KeyValue (k, v: string), ?eventType, ?defaultCategory): StreamEvent seq =
let sn = Core.parseMessageKey (defaultArg defaultCategory String.Empty) k
let e = FsCodec.Core.TimelineEvent.Create(x.GenerateIndex sn, defaultArg eventType String.Empty, System.Text.Encoding.UTF8.GetBytes v |> ReadOnlyMemory)
Seq.singleton (sn, e)
Seq.singleton (sn, FsCodec.Core.TimelineEvent.mapBodies FsCodec.Encoding.OfBlob e)

[<AbstractClass; Sealed>]
type Factory private () =
Expand Down
8 changes: 2 additions & 6 deletions src/Propulsion.MemoryStore/MemoryStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,9 @@ and MemoryStoreMonitor internal (log: Serilog.ILogger, positions: TranchePositio
if sink.IsCompleted && not sink.RanToCompletion then
return! sink.Wait() }

module TimelineEvent =

let mapEncoded = FsCodec.Core.TimelineEvent.Map(Func<_, _> FsCodec.Compression.EncodedToUtf8)

/// Coordinates forwarding of a VolatileStore's Committed events to a supplied Sink
/// Supports awaiting the (asynchronous) handling by the Sink of all Committed events from a given point in time
type MemoryStoreSource(log, store: Equinox.MemoryStore.VolatileStore<FsCodec.EncodedBody>, categoryFilter, sink) =
inherit MemoryStoreSource<FsCodec.EncodedBody>(log, store, categoryFilter, TimelineEvent.mapEncoded, sink)
type MemoryStoreSource(log, store: Equinox.MemoryStore.VolatileStore<FsCodec.Encoded>, categoryFilter, sink) =
inherit MemoryStoreSource<FsCodec.Encoded>(log, store, categoryFilter, id, sink)
new(log, store, categories, sink) =
MemoryStoreSource(log, store, (fun x -> Array.contains x categories), sink)
8 changes: 4 additions & 4 deletions src/Propulsion.MessageDb/MessageDbSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,18 @@ module Internal =
do! conn.OpenAsync(ct)
return conn }

let private jsonNull = ReadOnlyMemory(System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(null))
let private jsonNull = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes null

type System.Data.Common.DbDataReader with
member reader.GetJson idx =
if reader.IsDBNull(idx) then jsonNull
else reader.GetString(idx) |> Text.Encoding.UTF8.GetBytes |> ReadOnlyMemory
else reader.GetString(idx) |> Text.Encoding.UTF8.GetBytes

type MessageDbCategoryClient(connectionString) =
let connect = createConnectionAndOpen connectionString
let parseRow (reader: System.Data.Common.DbDataReader) =
let et, data, meta = reader.GetString(1), reader.GetJson 2, reader.GetJson 3
let sz = data.Length + meta.Length + et.Length
let et, data, meta = reader.GetString(1), reader.GetJson 2 |> FsCodec.Encoding.OfBlob, reader.GetJson 3 |> FsCodec.Encoding.OfBlob
let sz = FsCodec.Encoding.ByteCount data + FsCodec.Encoding.ByteCount meta + et.Length
let event = FsCodec.Core.TimelineEvent.Create(
index = reader.GetInt64(0), // index within the stream, 0 based
eventType = et, data = data, meta = meta, eventId = reader.GetGuid(4),
Expand Down
Loading
Loading