diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index 43696094..825deb9e 100755 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -482,3 +482,20 @@ Resources: - [low level documentation of the client settings](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) - [thorough free book](https://www.confluent.io/wp-content/uploads/confluent-kafka-definitive-guide-complete.pdf) - [medium post covering some high level structures that Jet explored in this space](https://medium.com/@eulerfx/scaling-event-sourcing-at-jet-9c873cac33b8). + + +# Telemetry + +Propulsion emits OpenTelemetry spans for stream processing. All span attributes are prefixed with the `propulsion.` +namespace + +## {category} process + +| Attribute | Description | +|-------------------|------------------------------------------------------------------------------------------------------------------| +| `category` | The category being processed | +| `stream_name` | The full stream name being processed | +| `stream_id` | The id of the stream being processed | +| `batch_size` | The size of the batch being processed | +| `first_timestamp` | The receive timestamp of the first event in the batch being handled | +| `lead_time_ms` | The [lead time](https://www.merriam-webster.com/dictionary/lead%20time) in milliseconds for processing the batch | diff --git a/src/Propulsion.Feed/FeedSource.fs b/src/Propulsion.Feed/FeedSource.fs index c6998aa9..4dfdf83c 100644 --- a/src/Propulsion.Feed/FeedSource.fs +++ b/src/Propulsion.Feed/FeedSource.fs @@ -269,9 +269,7 @@ type TailingFeedSource let crawl trancheId (wasLast, startPos) ct = taskSeq { if wasLast then do! Task.delay tailSleepInterval ct - try let batches = crawl.Invoke(trancheId, startPos, ct) - for batch in batches do - yield batch + try yield! crawl.Invoke(trancheId, startPos, ct) with e -> // Swallow (and sleep, if requested) if there's an issue reading from a tailing log match logReadFailure with None -> log.ForContext("tranche", trancheId).ForContext().Warning(e, "Read failure") | Some l -> l e match readFailureSleepInterval with None -> () | Some interval -> do! Task.delay interval ct } diff --git a/src/Propulsion/Streams.fs b/src/Propulsion/Streams.fs index b6000c01..ebb8469c 100755 --- a/src/Propulsion/Streams.fs +++ b/src/Propulsion/Streams.fs @@ -1,5 +1,6 @@ namespace Propulsion.Streams +open System.Diagnostics open Propulsion open Propulsion.Internal open Serilog @@ -848,6 +849,8 @@ module Dispatcher = member _.AwaitCapacity(ct) = inner.AwaitButRelease(ct) member _.TryReplenish(pending, markStarted, project) = tryFillDispatcher pending markStarted project + let private source = new ActivitySource("Propulsion") + /// Implementation of IDispatcher that feeds items to an item dispatcher that maximizes concurrent requests (within a limit) type Concurrent<'P, 'R, 'E, 'F> internal ( inner : ItemDispatcher, 'F>, @@ -857,9 +860,23 @@ module Dispatcher = ( maxDop, project : FsCodec.StreamName -> FsCodec.ITimelineEvent<'F>[] -> CancellationToken -> Task)>, interpretProgress) = - let project struct (startTs, item : Scheduling.Item<'F>) (ct : CancellationToken) = task { + let project struct (startTs: int64, item : Scheduling.Item<'F>) (ct : CancellationToken) = task { + use act = source.StartActivity("process", ActivityKind.Consumer) + if act <> null then + let struct(category, streamId) = FsCodec.StreamName.splitCategoryAndStreamId item.stream + act.DisplayName <- $"{category} process" + act.SetTag("propulsion.stream_name", item.stream) + .SetTag("propulsion.stream_id", streamId) + .SetTag("propulsion.category", category) + .SetTag("propulsion.batch_size", item.span.Length) + .SetTag("propulsion.first_timestamp", item.span[0].Timestamp) + |> ignore let! struct (progressed, res) = project item.stream item.span ct - return struct (Stopwatch.elapsed startTs, item.stream, progressed, res) } + let elapsed = Stopwatch.elapsed startTs + if act <> null then + let oldestItemTs = item.span[0].Timestamp + act.SetTag("propulsion.lead_time_ms", (DateTimeOffset.UtcNow - oldestItemTs).TotalMilliseconds) |> ignore + return struct (elapsed, item.stream, progressed, res) } Concurrent<_, _, _, _>(ItemDispatcher(maxDop), project, interpretProgress) static member Create(maxDop, prepare : Func<_, _, _>, handle : Func<_, _, CancellationToken, Task<_>>, toIndex : Func<_, 'R, int64>) = let project sn span ct = task { diff --git a/tests/Propulsion.MessageDb.Integration/Propulsion.MessageDb.Integration.fsproj b/tests/Propulsion.MessageDb.Integration/Propulsion.MessageDb.Integration.fsproj index 0e8f6fe1..ec43d995 100644 --- a/tests/Propulsion.MessageDb.Integration/Propulsion.MessageDb.Integration.fsproj +++ b/tests/Propulsion.MessageDb.Integration/Propulsion.MessageDb.Integration.fsproj @@ -9,7 +9,11 @@ + + + + diff --git a/tests/Propulsion.MessageDb.Integration/Tests.fs b/tests/Propulsion.MessageDb.Integration/Tests.fs index 5de46a98..35a6d439 100644 --- a/tests/Propulsion.MessageDb.Integration/Tests.fs +++ b/tests/Propulsion.MessageDb.Integration/Tests.fs @@ -1,7 +1,5 @@ -module Propulsion.MessageDb.Integration.Tests +module Propulsion.MessageDb.Integration -open Npgsql -open NpgsqlTypes open Propulsion.Internal open Propulsion.MessageDb open Swensen.Unquote @@ -10,6 +8,11 @@ open System.Collections.Generic open System.Diagnostics open System.Threading.Tasks open Xunit +open OpenTelemetry +open OpenTelemetry.Trace +open OpenTelemetry.Resources + +let source = new ActivitySource("Propulsion.MessageDb.Integration") module Simple = type Hello = { name : string} @@ -17,15 +20,9 @@ module Simple = | Hello of Hello interface TypeShape.UnionContract.IUnionContract let codec = FsCodec.SystemTextJson.Codec.Create() - -let createStreamMessage streamName = - let cmd = NpgsqlBatchCommand() - cmd.CommandText <- "select 1 from write_message(@Id::text, @StreamName, @EventType, @Data, null, null)" - cmd.Parameters.AddWithValue("Id", NpgsqlDbType.Uuid, Guid.NewGuid()) |> ignore - cmd.Parameters.AddWithValue("StreamName", NpgsqlDbType.Text, streamName) |> ignore - cmd.Parameters.AddWithValue("EventType", NpgsqlDbType.Text, "Hello") |> ignore - cmd.Parameters.AddWithValue("Data", NpgsqlDbType.Jsonb, """{"name": "world"}""") |> ignore - cmd + type State = unit + let initial = () + let fold state events = state let ConnectionString = match Environment.GetEnvironmentVariable "MSG_DB_CONNECTION_STRING" with @@ -36,25 +33,18 @@ let CheckpointConnectionString = | null -> "Host=localhost; Database=message_store; Port=5432; Username=postgres; Password=postgres" | s -> s +let decider categoryName id = + let client = Equinox.MessageDb.MessageDbClient(ConnectionString) + let ctx = Equinox.MessageDb.MessageDbContext(client) + let category = Equinox.MessageDb.MessageDbCategory(ctx, Simple.codec, Simple.fold, Simple.initial) + Equinox.Decider.resolve Serilog.Log.Logger category categoryName (Equinox.StreamId.gen string id) -let connect () = task { - let conn = new NpgsqlConnection(ConnectionString) - do! conn.OpenAsync() - return conn -} - -let writeMessagesToStream (conn: NpgsqlConnection) streamName = task { - let batch = conn.CreateBatch() - for _ in 1..20 do - let cmd = createStreamMessage streamName - batch.BatchCommands.Add(cmd) - do! batch.ExecuteNonQueryAsync() :> Task } - -let writeMessagesToCategory conn category = task { +let writeMessagesToCategory category = task { for _ in 1..50 do - let streamName = $"{category}-{Guid.NewGuid():N}" - do! writeMessagesToStream conn streamName -} + let streamId = Guid.NewGuid().ToString("N") + let decider = decider category streamId + let decide _ = List.replicate 20 (Simple.Event.Hello { name = "world" }) + do! decider.Transact(decide, load = Equinox.LoadOption.AssumeEmpty) } let stats log = { new Propulsion.Streams.Stats<_>(log, TimeSpan.FromMinutes 1, TimeSpan.FromMinutes 1) with member _.HandleOk x = () @@ -65,47 +55,6 @@ let makeCheckpoints consumerGroup = task { do! checkpoints.CreateSchemaIfNotExists() return checkpoints } -[] -let ``It processes events for a category`` () = task { - use! conn = connect () - let log = Serilog.Log.Logger - let consumerGroup = $"{Guid.NewGuid():N}" - let category1 = $"{Guid.NewGuid():N}" - let category2 = $"{Guid.NewGuid():N}" - do! writeMessagesToCategory conn category1 - do! writeMessagesToCategory conn category2 - let! checkpoints = makeCheckpoints consumerGroup - let stats = stats log - let mutable stop = ignore - let handled = HashSet<_>() - let handle stream (events: Propulsion.Sinks.Event[]) _ct = task { - lock handled (fun _ -> - for evt in events do - handled.Add((stream, evt.Index)) |> ignore) - test <@ Array.chooseV Simple.codec.TryDecode events |> Array.forall ((=) (Simple.Hello { name = "world" })) @> - if handled.Count >= 2000 then - stop () - return struct (Propulsion.Sinks.StreamResult.AllProcessed, ()) } - use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 2, 2, handle, stats) - let source = MessageDbSource( - log, TimeSpan.FromMinutes 1, - ConnectionString, 1000, TimeSpan.FromMilliseconds 100, - checkpoints, sink, [| category1; category2 |]) - use src = source.Start() - stop <- src.Stop - - Task.Delay(TimeSpan.FromSeconds 30).ContinueWith(fun _ -> src.Stop()) |> ignore - - do! src.Await() - - // 2000 total events - test <@ handled.Count = 2000 @> - // 20 in each stream - test <@ handled |> Array.ofSeq |> Array.groupBy fst |> Array.map (snd >> Array.length) |> Array.forall ((=) 20) @> - // they were handled in order within streams - let ordering = handled |> Seq.groupBy fst |> Seq.map (snd >> Seq.map snd >> Seq.toArray) |> Seq.toArray - test <@ ordering |> Array.forall ((=) [| 0L..19L |]) @> } - type ActivityCapture() = let operations = ResizeArray() let listener = @@ -121,29 +70,85 @@ type ActivityCapture() = interface IDisposable with member _.Dispose() = listener.Dispose() -[] -let ``It doesn't read the tail event again`` () = task { - let log = Serilog.LoggerConfiguration().CreateLogger() - let consumerGroup = $"{Guid.NewGuid():N}" - let category = $"{Guid.NewGuid():N}" - use! conn = connect () - do! writeMessagesToStream conn $"{category}-1" - let! checkpoints = makeCheckpoints consumerGroup - let stats = stats log +type Tests() = + let sdk = + Sdk.CreateTracerProviderBuilder() + .SetResourceBuilder(ResourceBuilder.CreateDefault().AddService(serviceName = "Tests")) + .AddSource("Equinox") + .AddSource("Equinox.MessageDb") + .AddSource("Propulsion") + .AddSource("Propulsion.MessageDb.Integration") + .AddSource("Npgsql") + .AddOtlpExporter(fun opts -> opts.Endpoint <- Uri("http://localhost:4317")) + .AddConsoleExporter() + .Build() + + [] + let ``It processes events for a category`` () = task { + use _ = source.StartActivity("It processes events for a category", ActivityKind.Server) + let log = Serilog.Log.Logger + let consumerGroup = $"{Guid.NewGuid():N}" + let category1 = $"{Guid.NewGuid():N}" + let category2 = $"{Guid.NewGuid():N}" + do! writeMessagesToCategory category1 + do! writeMessagesToCategory category2 + let! checkpoints = makeCheckpoints consumerGroup + let stats = stats log + let mutable stop = ignore + let handled = HashSet<_>() + let handle stream (events: Propulsion.Sinks.Event[]) _ct = task { + lock handled (fun _ -> for evt in events do handled.Add((stream, evt.Index)) |> ignore) + test <@ Array.chooseV Simple.codec.TryDecode events |> Array.forall ((=) (Simple.Hello { name = "world" })) @> + if handled.Count >= 2000 then stop () + return struct (Propulsion.Sinks.StreamResult.AllProcessed, ()) } + use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 2, 2, handle, stats) + let source = MessageDbSource( + log, TimeSpan.FromMinutes 1, + ConnectionString, 1000, TimeSpan.FromMilliseconds 100, + checkpoints, sink, [| category1; category2 |]) + use src = source.Start() + stop <- src.Stop + + Task.Delay(TimeSpan.FromSeconds 30).ContinueWith(fun _ -> src.Stop()) |> ignore + + do! src.Await() + + // 2000 total events + test <@ handled.Count = 2000 @> + // 20 in each stream + test <@ handled |> Array.ofSeq |> Array.groupBy fst |> Array.map (snd >> Array.length) |> Array.forall ((=) 20) @> + // they were handled in order within streams + let ordering = handled |> Seq.groupBy fst |> Seq.map (snd >> Seq.map snd >> Seq.toArray) |> Seq.toArray + test <@ ordering |> Array.forall ((=) [| 0L..19L |]) @> } + + [] + let ``It doesn't read the tail event again`` () = task { + use _ = source.StartActivity("It doesn't read the tail event again", ActivityKind.Server) + let log = Serilog.LoggerConfiguration().CreateLogger() + let consumerGroup = $"{Guid.NewGuid():N}" + let category = $"{Guid.NewGuid():N}" + let decider = decider category "1" + do! decider.Transact((fun _ -> List.replicate 20 (Simple.Hello { name = "world" })), load = Equinox.LoadOption.AssumeEmpty) + let! checkpoints = makeCheckpoints consumerGroup + + let stats = stats log + + let handle _ _ _ = task { + return struct (Propulsion.Sinks.StreamResult.AllProcessed, ()) } + use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 1, 1, handle, stats) + let batchSize = 10 + let source = MessageDbSource( + log, TimeSpan.FromMilliseconds 1000, + ConnectionString, batchSize, TimeSpan.FromMilliseconds 10000, + checkpoints, sink, [| category |]) + + use capture = new ActivityCapture() + + do! source.RunUntilCaughtUp(TimeSpan.FromSeconds(10), stats.StatsInterval) :> Task + + // 3 batches fetched, 1 checkpoint read, and 1 checkpoint write + test <@ capture.Operations.Count = 5 @> } - let handle _ _ _ = task { - return struct (Propulsion.Sinks.StreamResult.AllProcessed, ()) } - use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 1, 1, handle, stats) - let batchSize = 10 - let source = MessageDbSource( - log, TimeSpan.FromMilliseconds 1000, - ConnectionString, batchSize, TimeSpan.FromMilliseconds 1000, - checkpoints, sink, [| category |]) - - use capture = new ActivityCapture() - - do! source.RunUntilCaughtUp(TimeSpan.FromSeconds(10), stats.StatsInterval) :> Task - - // 3 batches fetched, 1 checkpoint read, and 1 checkpoint write - test <@ capture.Operations.Count = 5 @> } + interface IDisposable with + member _.Dispose() = sdk.Shutdown() |> ignore