Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add telemetry to the ingestion pipeline #220

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -482,3 +482,20 @@ Resources:
- [low level documentation of the client settings](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
- [thorough free book](https://www.confluent.io/wp-content/uploads/confluent-kafka-definitive-guide-complete.pdf)
- [medium post covering some high level structures that Jet explored in this space](https://medium.com/@eulerfx/scaling-event-sourcing-at-jet-9c873cac33b8).


# Telemetry

Propulsion emits OpenTelemetry spans for stream processing. All span attributes are prefixed with the `propulsion.`
namespace

## {category} process

| Attribute | Description |
|-------------------|------------------------------------------------------------------------------------------------------------------|
| `category` | The category being processed |
| `stream_name` | The full stream name being processed |
| `stream_id` | The id of the stream being processed |
| `batch_size` | The size of the batch being processed |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dont like this overloading of the term batch.
I used to therm them a streamspan, but that's obv not ideal as a name wrt OT.
Perhaps events_count: Number of events being handled
Also the next one was even more confusing for me. Maybe events_oldest_timestamp ?
Should the index of the first/oldest event be included?

| `first_timestamp` | The receive timestamp of the first event in the batch being handled |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See preceding comment. Also it's the write timestamp more than the receive (and its app controlled so could be relatively old). e.g. if this is an event being synced from another store, it could be years ago. So maybe just lose the receive ?

| `lead_time_ms` | The [lead time](https://www.merriam-webster.com/dictionary/lead%20time) in milliseconds for processing the batch |
4 changes: 1 addition & 3 deletions src/Propulsion.Feed/FeedSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,7 @@ type TailingFeedSource

let crawl trancheId (wasLast, startPos) ct = taskSeq {
if wasLast then do! Task.delay tailSleepInterval ct
try let batches = crawl.Invoke(trancheId, startPos, ct)
for batch in batches do
yield batch
try yield! crawl.Invoke(trancheId, startPos, ct)
bartelink marked this conversation as resolved.
Show resolved Hide resolved
with e -> // Swallow (and sleep, if requested) if there's an issue reading from a tailing log
match logReadFailure with None -> log.ForContext("tranche", trancheId).ForContext<TailingFeedSource>().Warning(e, "Read failure") | Some l -> l e
match readFailureSleepInterval with None -> () | Some interval -> do! Task.delay interval ct }
Expand Down
21 changes: 19 additions & 2 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace Propulsion.Streams

open System.Diagnostics
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

order!

open Propulsion
open Propulsion.Internal
open Serilog
Expand Down Expand Up @@ -848,6 +849,8 @@ module Dispatcher =
member _.AwaitCapacity(ct) = inner.AwaitButRelease(ct)
member _.TryReplenish(pending, markStarted, project) = tryFillDispatcher pending markStarted project

let private source = new ActivitySource("Propulsion")

/// Implementation of IDispatcher that feeds items to an item dispatcher that maximizes concurrent requests (within a limit)
type Concurrent<'P, 'R, 'E, 'F> internal
( inner : ItemDispatcher<Result<'P, 'E>, 'F>,
Expand All @@ -857,9 +860,23 @@ module Dispatcher =
( maxDop,
project : FsCodec.StreamName -> FsCodec.ITimelineEvent<'F>[] -> CancellationToken -> Task<struct (bool * Result<'P, 'E>)>,
interpretProgress) =
let project struct (startTs, item : Scheduling.Item<'F>) (ct : CancellationToken) = task {
let project struct (startTs: int64, item : Scheduling.Item<'F>) (ct : CancellationToken) = task {
use act = source.StartActivity("process", ActivityKind.Consumer)
if act <> null then
let struct(category, streamId) = FsCodec.StreamName.splitCategoryAndStreamId item.stream
act.DisplayName <- $"{category} process"
act.SetTag("propulsion.stream_name", item.stream)
.SetTag("propulsion.stream_id", streamId)
.SetTag("propulsion.category", category)
.SetTag("propulsion.batch_size", item.span.Length)
.SetTag("propulsion.first_timestamp", item.span[0].Timestamp)
|> ignore
let! struct (progressed, res) = project item.stream item.span ct
return struct (Stopwatch.elapsed startTs, item.stream, progressed, res) }
let elapsed = Stopwatch.elapsed startTs
if act <> null then
let oldestItemTs = item.span[0].Timestamp
act.SetTag("propulsion.lead_time_ms", (DateTimeOffset.UtcNow - oldestItemTs).TotalMilliseconds) |> ignore
return struct (elapsed, item.stream, progressed, res) }
Concurrent<_, _, _, _>(ItemDispatcher(maxDop), project, interpretProgress)
static member Create(maxDop, prepare : Func<_, _, _>, handle : Func<_, _, CancellationToken, Task<_>>, toIndex : Func<_, 'R, int64>) =
let project sn span ct = task {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Equinox.MessageDb" Version="4.0.0-rc.11" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.10" />
<PackageReference Include="OpenTelemetry" Version="1.5.0" />
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.5.0" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.5.0" />
<PackageReference Include="TypeShape" Version="10.0.0" />
</ItemGroup>
<ItemGroup>
Expand Down
193 changes: 99 additions & 94 deletions tests/Propulsion.MessageDb.Integration/Tests.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
module Propulsion.MessageDb.Integration.Tests
module Propulsion.MessageDb.Integration

open Npgsql
open NpgsqlTypes
open Propulsion.Internal
open Propulsion.MessageDb
open Swensen.Unquote
Expand All @@ -10,22 +8,21 @@ open System.Collections.Generic
open System.Diagnostics
open System.Threading.Tasks
open Xunit
open OpenTelemetry
open OpenTelemetry.Trace
open OpenTelemetry.Resources

let source = new ActivitySource("Propulsion.MessageDb.Integration")

module Simple =
type Hello = { name : string}
type Event =
| Hello of Hello
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.SystemTextJson.Codec.Create<Event>()

let createStreamMessage streamName =
let cmd = NpgsqlBatchCommand()
cmd.CommandText <- "select 1 from write_message(@Id::text, @StreamName, @EventType, @Data, null, null)"
cmd.Parameters.AddWithValue("Id", NpgsqlDbType.Uuid, Guid.NewGuid()) |> ignore
cmd.Parameters.AddWithValue("StreamName", NpgsqlDbType.Text, streamName) |> ignore
cmd.Parameters.AddWithValue("EventType", NpgsqlDbType.Text, "Hello") |> ignore
cmd.Parameters.AddWithValue("Data", NpgsqlDbType.Jsonb, """{"name": "world"}""") |> ignore
cmd
type State = unit
let initial = ()
let fold state events = state

let ConnectionString =
match Environment.GetEnvironmentVariable "MSG_DB_CONNECTION_STRING" with
Expand All @@ -36,25 +33,18 @@ let CheckpointConnectionString =
| null -> "Host=localhost; Database=message_store; Port=5432; Username=postgres; Password=postgres"
| s -> s

let decider categoryName id =
let client = Equinox.MessageDb.MessageDbClient(ConnectionString)
let ctx = Equinox.MessageDb.MessageDbContext(client)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest of the system tends not to use this contraction (or at least, it shouldnt)

let category = Equinox.MessageDb.MessageDbCategory(ctx, Simple.codec, Simple.fold, Simple.initial)
Equinox.Decider.resolve Serilog.Log.Logger category categoryName (Equinox.StreamId.gen string id)

let connect () = task {
let conn = new NpgsqlConnection(ConnectionString)
do! conn.OpenAsync()
return conn
}

let writeMessagesToStream (conn: NpgsqlConnection) streamName = task {
let batch = conn.CreateBatch()
for _ in 1..20 do
let cmd = createStreamMessage streamName
batch.BatchCommands.Add(cmd)
do! batch.ExecuteNonQueryAsync() :> Task }

let writeMessagesToCategory conn category = task {
let writeMessagesToCategory category = task {
for _ in 1..50 do
let streamName = $"{category}-{Guid.NewGuid():N}"
do! writeMessagesToStream conn streamName
}
let streamId = Guid.NewGuid().ToString("N")
let decider = decider category streamId
let decide _ = List.replicate 20 (Simple.Event.Hello { name = "world" })
do! decider.Transact(decide, load = Equinox.LoadOption.AssumeEmpty) }

let stats log = { new Propulsion.Streams.Stats<_>(log, TimeSpan.FromMinutes 1, TimeSpan.FromMinutes 1)
with member _.HandleOk x = ()
Expand All @@ -65,47 +55,6 @@ let makeCheckpoints consumerGroup = task {
do! checkpoints.CreateSchemaIfNotExists()
return checkpoints }

[<Fact>]
let ``It processes events for a category`` () = task {
use! conn = connect ()
let log = Serilog.Log.Logger
let consumerGroup = $"{Guid.NewGuid():N}"
let category1 = $"{Guid.NewGuid():N}"
let category2 = $"{Guid.NewGuid():N}"
do! writeMessagesToCategory conn category1
do! writeMessagesToCategory conn category2
let! checkpoints = makeCheckpoints consumerGroup
let stats = stats log
let mutable stop = ignore
let handled = HashSet<_>()
let handle stream (events: Propulsion.Sinks.Event[]) _ct = task {
lock handled (fun _ ->
for evt in events do
handled.Add((stream, evt.Index)) |> ignore)
test <@ Array.chooseV Simple.codec.TryDecode events |> Array.forall ((=) (Simple.Hello { name = "world" })) @>
if handled.Count >= 2000 then
stop ()
return struct (Propulsion.Sinks.StreamResult.AllProcessed, ()) }
use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 2, 2, handle, stats)
let source = MessageDbSource(
log, TimeSpan.FromMinutes 1,
ConnectionString, 1000, TimeSpan.FromMilliseconds 100,
checkpoints, sink, [| category1; category2 |])
use src = source.Start()
stop <- src.Stop

Task.Delay(TimeSpan.FromSeconds 30).ContinueWith(fun _ -> src.Stop()) |> ignore

do! src.Await()

// 2000 total events
test <@ handled.Count = 2000 @>
// 20 in each stream
test <@ handled |> Array.ofSeq |> Array.groupBy fst |> Array.map (snd >> Array.length) |> Array.forall ((=) 20) @>
// they were handled in order within streams
let ordering = handled |> Seq.groupBy fst |> Seq.map (snd >> Seq.map snd >> Seq.toArray) |> Seq.toArray
test <@ ordering |> Array.forall ((=) [| 0L..19L |]) @> }

type ActivityCapture() =
let operations = ResizeArray()
let listener =
Expand All @@ -121,29 +70,85 @@ type ActivityCapture() =
interface IDisposable with
member _.Dispose() = listener.Dispose()

[<Fact>]
let ``It doesn't read the tail event again`` () = task {
let log = Serilog.LoggerConfiguration().CreateLogger()
let consumerGroup = $"{Guid.NewGuid():N}"
let category = $"{Guid.NewGuid():N}"
use! conn = connect ()
do! writeMessagesToStream conn $"{category}-1"
let! checkpoints = makeCheckpoints consumerGroup

let stats = stats log
type Tests() =
let sdk =
Sdk.CreateTracerProviderBuilder()
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService(serviceName = "Tests"))
.AddSource("Equinox")
.AddSource("Equinox.MessageDb")
.AddSource("Propulsion")
.AddSource("Propulsion.MessageDb.Integration")
.AddSource("Npgsql")
.AddOtlpExporter(fun opts -> opts.Endpoint <- Uri("http://localhost:4317"))
.AddConsoleExporter()
.Build()

[<Fact>]
let ``It processes events for a category`` () = task {
use _ = source.StartActivity("It processes events for a category", ActivityKind.Server)
let log = Serilog.Log.Logger
let consumerGroup = $"{Guid.NewGuid():N}"
let category1 = $"{Guid.NewGuid():N}"
let category2 = $"{Guid.NewGuid():N}"
do! writeMessagesToCategory category1
do! writeMessagesToCategory category2
let! checkpoints = makeCheckpoints consumerGroup
let stats = stats log
let mutable stop = ignore
let handled = HashSet<_>()
let handle stream (events: Propulsion.Sinks.Event[]) _ct = task {
lock handled (fun _ -> for evt in events do handled.Add((stream, evt.Index)) |> ignore)
test <@ Array.chooseV Simple.codec.TryDecode events |> Array.forall ((=) (Simple.Hello { name = "world" })) @>
if handled.Count >= 2000 then stop ()
return struct (Propulsion.Sinks.StreamResult.AllProcessed, ()) }
use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 2, 2, handle, stats)
let source = MessageDbSource(
log, TimeSpan.FromMinutes 1,
ConnectionString, 1000, TimeSpan.FromMilliseconds 100,
checkpoints, sink, [| category1; category2 |])
use src = source.Start()
stop <- src.Stop

Task.Delay(TimeSpan.FromSeconds 30).ContinueWith(fun _ -> src.Stop()) |> ignore

do! src.Await()

// 2000 total events
test <@ handled.Count = 2000 @>
// 20 in each stream
test <@ handled |> Array.ofSeq |> Array.groupBy fst |> Array.map (snd >> Array.length) |> Array.forall ((=) 20) @>
// they were handled in order within streams
let ordering = handled |> Seq.groupBy fst |> Seq.map (snd >> Seq.map snd >> Seq.toArray) |> Seq.toArray
test <@ ordering |> Array.forall ((=) [| 0L..19L |]) @> }

[<Fact>]
let ``It doesn't read the tail event again`` () = task {
use _ = source.StartActivity("It doesn't read the tail event again", ActivityKind.Server)
let log = Serilog.LoggerConfiguration().CreateLogger()
let consumerGroup = $"{Guid.NewGuid():N}"
let category = $"{Guid.NewGuid():N}"
let decider = decider category "1"
do! decider.Transact((fun _ -> List.replicate 20 (Simple.Hello { name = "world" })), load = Equinox.LoadOption.AssumeEmpty)
let! checkpoints = makeCheckpoints consumerGroup

let stats = stats log

let handle _ _ _ = task {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this and the others should arguable use StartConcurrent and Async<'T> handlers

return struct (Propulsion.Sinks.StreamResult.AllProcessed, ()) }
use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 1, 1, handle, stats)
let batchSize = 10
let source = MessageDbSource(
log, TimeSpan.FromMilliseconds 1000,
ConnectionString, batchSize, TimeSpan.FromMilliseconds 10000,
checkpoints, sink, [| category |])

use capture = new ActivityCapture()

do! source.RunUntilCaughtUp(TimeSpan.FromSeconds(10), stats.StatsInterval) :> Task

// 3 batches fetched, 1 checkpoint read, and 1 checkpoint write
test <@ capture.Operations.Count = 5 @> }

let handle _ _ _ = task {
return struct (Propulsion.Sinks.StreamResult.AllProcessed, ()) }
use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 1, 1, handle, stats)
let batchSize = 10
let source = MessageDbSource(
log, TimeSpan.FromMilliseconds 1000,
ConnectionString, batchSize, TimeSpan.FromMilliseconds 1000,
checkpoints, sink, [| category |])

use capture = new ActivityCapture()

do! source.RunUntilCaughtUp(TimeSpan.FromSeconds(10), stats.StatsInterval) :> Task

// 3 batches fetched, 1 checkpoint read, and 1 checkpoint write
test <@ capture.Operations.Count = 5 @> }
interface IDisposable with
member _.Dispose() = sdk.Shutdown() |> ignore