-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add telemetry to the ingestion pipeline #220
base: master
Are you sure you want to change the base?
Conversation
…s a tag on metrics in future
…ing it as a tag on metrics in future" This reverts commit 68403f4.
| `category` | The category being processed | | ||
| `stream_name` | The full stream name being processed | | ||
| `stream_id` | The id of the stream being processed | | ||
| `batch_size` | The size of the batch being processed | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dont like this overloading of the term batch.
I used to therm them a streamspan, but that's obv not ideal as a name wrt OT.
Perhaps events_count
: Number of events being handled
Also the next one was even more confusing for me. Maybe events_oldest_timestamp
?
Should the index
of the first/oldest event be included?
@@ -1,5 +1,6 @@ | |||
namespace Propulsion.Streams | |||
|
|||
open System.Diagnostics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
order!
| `stream_name` | The full stream name being processed | | ||
| `stream_id` | The id of the stream being processed | | ||
| `batch_size` | The size of the batch being processed | | ||
| `first_timestamp` | The receive timestamp of the first event in the batch being handled | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See preceding comment. Also it's the write timestamp more than the receive (and its app controlled so could be relatively old). e.g. if this is an event being synced from another store, it could be years ago. So maybe just lose the receive
?
@@ -36,25 +33,18 @@ let CheckpointConnectionString = | |||
| null -> "Host=localhost; Database=message_store; Port=5432; Username=postgres; Password=postgres" | |||
| s -> s | |||
|
|||
let decider categoryName id = | |||
let client = Equinox.MessageDb.MessageDbClient(ConnectionString) | |||
let ctx = Equinox.MessageDb.MessageDbContext(client) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest of the system tends not to use this contraction (or at least, it shouldnt)
|
||
let stats = stats log | ||
|
||
let handle _ _ _ = task { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this and the others should arguable use StartConcurrent and Async<'T> handlers
87d1f43
to
8dbba09
Compare
cc26351
to
d4757fd
Compare
Adds some basic telemetry to the ingestion pipeline.
All attributes are prefixed with
propulsion.
category
stream_name
stream_id
batch_size
first_timestamp
lead_time_ms
The above screenshot shows a run of the
It doesn't read the tail event again
test. It appends 20 events via an equinox decider before starting to poll. You can see it create the checkpoints table, read from it, poll for events, and then fetching two batches before processing a stream batch. Then you see the tailSleepInterval kick in with a wait for 1s before storing the checkpoint and ending the test.This gives users the opportunity to ship these spans to various observability providers and ask questions like "What's my average lead time for processing category X"