-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add reservoir timers #171
base: master
Are you sure you want to change the base?
add reservoir timers #171
Conversation
98706b1
to
72bc838
Compare
stats_test.go
Outdated
counter = sink.record | ||
if !strings.Contains(counter, expected) { | ||
t.Error("wanted counter value of test.___f=i:1|c, got", counter) | ||
expected = "test.__host=i:1|c" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: this test change is out of scope of this work but it was previously volatile with the order of reserved_tag vs test.__host not being deterministic
re-design with the notion of sample rate appended to the metric
improve naming
sink.go
Outdated
@@ -6,6 +6,7 @@ type Sink interface { | |||
FlushCounter(name string, value uint64) | |||
FlushGauge(name string, value uint64) | |||
FlushTimer(name string, value float64) | |||
FlushAggregatedTimer(name string, value, sampleRate float64) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because of this we'll need a new major release.
The library usage should be backwards compatible, and additionally there is a feature flag to control the new behaviour, however if anything is implementing this interface it'll break.
7748bb6
to
4a82e5c
Compare
9a8b85d
to
5dd8757
Compare
stats.go
Outdated
if t.count < t.capacity { | ||
t.values = append(t.values, value) | ||
} else { | ||
t.values = append(t.values[1:], value) // discard the oldest value when the reservoir is full, this can probably be smarter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Treating this more as a ring buffer when at capacity would be better since it would provide a more accurate representation of the most recent timings (whereas here we only update the first timing):
diff --git a/stats.go b/stats.go
index 70a582b..99a0a0a 100644
--- a/stats.go
+++ b/stats.go
@@ -346,6 +346,7 @@ type reservoirTimer struct {
capacity int
values []float64
count int
+ off int
mu sync.Mutex
}
@@ -364,7 +365,8 @@ func (t *reservoirTimer) AddValue(value float64) {
if t.count < t.capacity {
t.values = append(t.values, value)
} else {
- t.values = append(t.values[1:], value) // discard the oldest value when the reservoir is full, this can probably be smarter
+ t.values[t.off%len(t.values)] = value
+ t.off++
}
t.count++
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, making capacity
a power of two would allow for us to use a mask instead of an expensive modulo operation. This can be cheaply calculated using the bits
package:
func nextPowerOfTwo(capacity int) uint {
return 1 << bits.Len(uint(capacity))
}
stats.go
Outdated
capacity int | ||
values []float64 | ||
count int | ||
mu sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit, make the Mutex the first field (it's advantageous performance wise to make the most accessed field first)
stats.go
Outdated
@@ -336,6 +404,7 @@ func (ts *timespan) CompleteWithDuration(value time.Duration) { | |||
} | |||
|
|||
type statStore struct { | |||
// todo: no idea how memory is managed here, when are the map entries ever deleted? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assumption is the that stat names are stable over the lifetime of a process, but this is a known issue for processes that incorrectly use this library and cause a cardinality explosion of stat names. See:
- [RFC] replace
sync.Map
withlru.Cache
#158 (original issue) - stats: delete unused Counters and Timers when flushing #160 (my proposed fix)
net_sink.go
Outdated
} | ||
|
||
func (s *netSink) FlushAggregatedTimer(name string, value, sampleRate float64) { | ||
suffix := fmt.Sprintf("|ms|@%.1f\n", sampleRate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For performance reasons it would be great to avoid fmt.Sprintf
here if possible.
stats.go
Outdated
for _, value := range timer.CollectedValue() { | ||
s.sink.FlushAggregatedTimer(key.(string), value, sampleRate) | ||
} | ||
s.timers.Delete(key) // delete it from the map so it's not flushed again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deletion is expensive and since we assume the names of stats are stable will just lead to it being re-added to the map at a later time. For counters we only flush non-zero values and I think we could do something similar with the reservoirTimer
type (say not flush if it has zero values and trim the slice to zero elements on flush).
stats.go
Outdated
return &statStore{sink: sink} | ||
return &statStore{ | ||
sink: sink, | ||
conf: GetSettings(), // todo: right now the environment is being loaded in multiple places and can be made more efficient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sync.OnceValue function can help eliminate these multiple loads.
optimize reservoirTimer by allocating an reusing memory fix bug with sampleRate calculation optimize FlushAggregatedTimer a bit expand and fix test coverage
@charlievieth just did a first pass of your comments. Sincerely thank you both for the clarification regarding how data/keys in the maps are reused, and the ring buffer recommendation. I now have a lot of new todos I need to dive into, but I just pushed a pretty big change (ref: ea5ae6a), if/when you have a chance I would be eager to know if it aligns with your thoughts. Fwiw I appreciate any and all nits. Very much a go noob. Edit: had a logical bug which I addressed in e81d603 |
np! I'm vacation the next few days but will take a look at your changes when I can. |
stats.go
Outdated
t.values[t.overflow&t.ringMask] = value | ||
t.overflow++ | ||
|
||
// todo: can i optimize this with xor? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need to reset the overflow since you're masking (taking only the lower bits is equivalent to a modulo operation). Also using an unsigned int here would be a bit safer / clearer (makes the value always positive and unsigned overflow is well defined).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah this implementation here is wrong
additionally remove variablility of the reservoir size as we need to ensure this number is a power of two
improve comments
…ewDefaultStore and resevoir timers are enabled implicitly optimize store construction
fix data race in reservoirTimer
stats.go
Outdated
} | ||
|
||
func (t *reservoirTimer) Reset() { | ||
atomic.StoreUint64(&t.count, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need a lock here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's already atomic so the access should be protected iiuc. i did add a test for data races 9762152 , but actually this isn't covered, let me cover this reset too.
I had a data race that displayed how atomic works. Inside a mutex i didn't use atomic access, however there was access (atomic) to the same data from a different method outside of the mutex, the run down is that the access in the mutex also needed to be atomic and that itself protects from datarace
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this test should cover it 2641924 , this will aggressively flush (and Reset) with interleaving writes (changing the counter), to the same reservoirTimer allocation in the (statStore instance).timers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind I think you're right! Thank you @sokada1221
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a reservoir implementation for timers. This relies on a new timer type, and using this type should allow for a more controlled stat load from timers.