From 72bc838ecc56dd776154aa5935731ebe10c8f7af Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Wed, 8 Jan 2025 13:25:27 -0500 Subject: [PATCH 01/32] prototype a simple reservoir for timers --- settings.go | 11 +++++ stats.go | 97 ++++++++++++++++++++++++++++++++++++++----- stats_test.go | 112 ++++++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 203 insertions(+), 17 deletions(-) diff --git a/settings.go b/settings.go index 5b6770e..3a4f268 100644 --- a/settings.go +++ b/settings.go @@ -20,6 +20,7 @@ const ( DefaultFlushIntervalS = 5 // DefaultLoggingSinkDisabled is the default behavior of logging sink suppression, default is false. DefaultLoggingSinkDisabled = false + DefaultTimerReservoirSize = 0 ) // The Settings type is used to configure gostats. gostats uses environment @@ -38,6 +39,7 @@ type Settings struct { // Disable the LoggingSink when USE_STATSD is false and use the NullSink instead. // This will cause all stats to be silently dropped. LoggingSinkDisabled bool `envconfig:"GOSTATS_LOGGING_SINK_DISABLED" default:"false"` + TimerReservoirSize int `envconfig:"GOSTATS_TIMER_RESERVOIR_SIZE" default:"0"` } // An envError is an error that occurred parsing an environment variable @@ -101,6 +103,10 @@ func GetSettings() Settings { if err != nil { panic(err) } + timerReservoirSize, err := envInt("GOSTATS_TIMER_RESERVOIR_SIZE", DefaultTimerReservoirSize) + if err != nil { + panic(err) + } return Settings{ UseStatsd: useStatsd, StatsdHost: envOr("STATSD_HOST", DefaultStatsdHost), @@ -108,6 +114,7 @@ func GetSettings() Settings { StatsdPort: statsdPort, FlushIntervalS: flushIntervalS, LoggingSinkDisabled: loggingSinkDisabled, + TimerReservoirSize: timerReservoirSize, } } @@ -115,3 +122,7 @@ func GetSettings() Settings { func (s *Settings) FlushInterval() time.Duration { return time.Duration(s.FlushIntervalS) * time.Second } + +func (s *Settings) isTimerReservoirEnabled() bool { + return s.TimerReservoirSize > 0 +} diff --git a/stats.go b/stats.go index 9f167bd..cd2ceb0 100644 --- a/stats.go +++ b/stats.go @@ -2,6 +2,7 @@ package stats import ( "context" + "math" "strconv" "sync" "sync/atomic" @@ -298,30 +299,69 @@ func (c *gauge) Value() uint64 { return atomic.LoadUint64(&c.value) } -type timer struct { +type timer interface { + time(time.Duration) + AddDuration(time.Duration) + AddValue(float64) + AllocateSpan() Timespan + Value() float64 +} + +type standardTimer struct { base time.Duration name string sink Sink } -func (t *timer) time(dur time.Duration) { +func (t *standardTimer) time(dur time.Duration) { t.AddDuration(dur) } -func (t *timer) AddDuration(dur time.Duration) { +func (t *standardTimer) AddDuration(dur time.Duration) { t.AddValue(float64(dur / t.base)) } -func (t *timer) AddValue(value float64) { +func (t *standardTimer) AddValue(value float64) { t.sink.FlushTimer(t.name, value) } -func (t *timer) AllocateSpan() Timespan { +func (t *standardTimer) AllocateSpan() Timespan { return ×pan{timer: t, start: time.Now()} } +func (t *standardTimer) Value() float64 { + return 0.0 // float zero value +} + +type reservoirTimer struct { + base time.Duration + name string + value uint64 +} + +func (t *reservoirTimer) time(dur time.Duration) { + t.AddDuration(dur) +} + +func (t *reservoirTimer) AddDuration(dur time.Duration) { + t.AddValue(float64(dur / t.base)) +} + +func (t *reservoirTimer) AddValue(value float64) { + // todo does this need to be atomtic? ideally for the the use case it won't/shouldn't be changed like a counter/gauge would be + atomic.StoreUint64(&t.value, math.Float64bits(value)) +} + +func (t *reservoirTimer) AllocateSpan() Timespan { + return ×pan{timer: t, start: time.Now()} +} + +func (t *reservoirTimer) Value() float64 { + return math.Float64frombits(atomic.LoadUint64(&t.value)) +} + type timespan struct { - timer *timer + timer timer start time.Time } @@ -340,6 +380,8 @@ type statStore struct { gauges sync.Map timers sync.Map + timerCount int + mu sync.RWMutex statGenerators []StatGenerator @@ -393,6 +435,20 @@ func (s *statStore) Flush() { return true }) + settings := GetSettings() // todo: move this to some shared memory + if settings.isTimerReservoirEnabled() { + s.timers.Range(func(key, v interface{}) bool { + // todo: maybe change this to not even add to the reservoir + // do not flush timers that are zero value + if value := v.(timer).Value(); value != 0.0 { + s.sink.FlushTimer(key.(string), v.(timer).Value()) + } + s.timers.Delete(key) + s.timerCount-- + return true + }) + } + flushableSink, ok := s.sink.(FlushableSink) if ok { flushableSink.Flush() @@ -490,14 +546,35 @@ func (s *statStore) NewPerInstanceGauge(name string, tags map[string]string) Gau return s.newGaugeWithTagSet(name, tagspkg.TagSet(nil).MergePerInstanceTags(tags)) } -func (s *statStore) newTimer(serializedName string, base time.Duration) *timer { +func (s *statStore) newTimer(serializedName string, base time.Duration) timer { if v, ok := s.timers.Load(serializedName); ok { - return v.(*timer) + return v.(timer) + } + + var t timer + settings := GetSettings() // todo: move this to some shared memory + if settings.isTimerReservoirEnabled() { + t = &reservoirTimer{name: serializedName, base: base} + + // todo: > shouldn't be necessary + if s.timerCount >= settings.TimerReservoirSize { + // this will delete 1 random timer in the map + s.timers.Range(func(key, _ interface{}) bool { + s.timers.Delete(key) + return false + }) + s.timerCount-- + } + } else { + t = &standardTimer{name: serializedName, sink: s.sink, base: base} } - t := &timer{name: serializedName, sink: s.sink, base: base} + if v, loaded := s.timers.LoadOrStore(serializedName, t); loaded { - return v.(*timer) + return v.(timer) } + + s.timerCount++ + return t } diff --git a/stats_test.go b/stats_test.go index 37dbbed..329a396 100644 --- a/stats_test.go +++ b/stats_test.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "fmt" "math/rand" + "os" "reflect" "strconv" "strings" @@ -78,9 +79,9 @@ func TestValidateTags(t *testing.T) { store.Flush() expected := "test:1|c" - counter := sink.record - if !strings.Contains(counter, expected) { - t.Error("wanted counter value of test:1|c, got", counter) + output := sink.record + if !strings.Contains(output, expected) && !strings.Contains(output, "reserved_tag") { + t.Errorf("Expected without reserved tags: '%s' Got: '%s'", expected, output) } // A reserved tag should trigger adding the reserved_tag counter @@ -89,10 +90,11 @@ func TestValidateTags(t *testing.T) { store.NewCounterWithTags("test", map[string]string{"host": "i"}).Inc() store.Flush() - expected = "reserved_tag:1|c\ntest.__host=i:1|c" - counter = sink.record - if !strings.Contains(counter, expected) { - t.Error("wanted counter value of test.___f=i:1|c, got", counter) + expected = "test.__host=i:1|c" + expectedReservedTag := "reserved_tag:1|c" + output = sink.record + if !strings.Contains(output, expected) && !strings.Contains(output, expectedReservedTag) { + t.Errorf("Expected: '%s' and '%s', In: '%s'", expected, expectedReservedTag, output) } } @@ -126,6 +128,102 @@ func TestMilliTimer(t *testing.T) { } } +func TestTimerResevoir_Disabled(t *testing.T) { + err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "0") + if err != nil { + t.Fatalf("Failed to set GOSTATS_BATCH_ENABLED environment variable: %s", err) + } + + expectedStatCount := 1000 + + ts, sink := setupTestNetSink(t, "tcp", false) + store := NewStore(sink, true) + + for i := 0; i < 1000; i++ { + store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i % 10)) + } + + if ts.String() != "" { + t.Errorf("Stats were written despite forced batching") + } + + store.Flush() + + time.Sleep(1001 * time.Millisecond) + + statCount := len(strings.Split(ts.String(), "\n")) - 1 // there will be 1 extra new line character at the end of the buffer + if statCount != expectedStatCount { + t.Errorf("Not all stats were written\ngot:\n%d\nwanted:\n%d\n", statCount, expectedStatCount) + } + + os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE") +} + +func TestTimerReservoir(t *testing.T) { + err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100") + if err != nil { + t.Fatalf("Failed to set GOSTATS_BATCH_ENABLED environment variable: %s", err) + } + + expectedStatCount := 100 + + ts, sink := setupTestNetSink(t, "tcp", false) + store := NewStore(sink, true) + + for i := 0; i < 1000; i++ { + store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i%10 + 1)) // don't create timers with 0 values to make the count deterministic + } + + if ts.String() != "" { + t.Errorf("Stats were written despite forced batching") + } + + store.Flush() + + time.Sleep(1001 * time.Millisecond) + + statCount := len(strings.Split(ts.String(), "\n")) - 1 // there will be 1 extra new line character at the end of the buffer + if statCount != expectedStatCount { + t.Errorf("Not all stats were written\ngot:\n%d\nwanted:\n%d\n", statCount, expectedStatCount) + } + + os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE") +} + +func TestTimerReservoir_FilteredZeros(t *testing.T) { + err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100") + if err != nil { + t.Fatalf("Failed to set GOSTATS_BATCH_ENABLED environment variable: %s", err) + } + + ts, sink := setupTestNetSink(t, "tcp", false) + store := NewStore(sink, true) + + for i := 0; i < 1000; i++ { + store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i % 10)) + } + + if ts.String() != "" { + t.Errorf("Stats were written despite forced batching") + } + + store.Flush() + + time.Sleep(1001 * time.Millisecond) + + stats := strings.Split(ts.String(), "\n") + stats = stats[:len(stats)-1] // remove the extra new line character at the end of the buffer + for _, stat := range stats { + value := strings.Split(strings.Split(stat, ":")[1], ("|ms"))[0] // strip value and remove suffix and get raw number + if value == "0" { + t.Errorf("Got a zero value stat: %s", stat) + } + + } + + os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE") +} + // Ensure 0 counters are not flushed func TestZeroCounters(t *testing.T) { sink := &testStatSink{} From f67aca44456d11b3843e431ea9878a4ddf66070e Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Wed, 8 Jan 2025 13:51:31 -0500 Subject: [PATCH 02/32] clarify comment --- stats.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stats.go b/stats.go index cd2ceb0..d74adac 100644 --- a/stats.go +++ b/stats.go @@ -558,7 +558,7 @@ func (s *statStore) newTimer(serializedName string, base time.Duration) timer { // todo: > shouldn't be necessary if s.timerCount >= settings.TimerReservoirSize { - // this will delete 1 random timer in the map + // todo: this will delete 1 random timer in the map, this can probably be smarter s.timers.Range(func(key, _ interface{}) bool { s.timers.Delete(key) return false From 5cc34d9fc8e97a371fb69865b168bf0037284389 Mon Sep 17 00:00:00 2001 From: Amin Bashiri Date: Wed, 8 Jan 2025 14:57:49 -0500 Subject: [PATCH 03/32] Move s.timerCount inside timer.Range --- stats.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stats.go b/stats.go index d74adac..26a0a80 100644 --- a/stats.go +++ b/stats.go @@ -561,9 +561,9 @@ func (s *statStore) newTimer(serializedName string, base time.Duration) timer { // todo: this will delete 1 random timer in the map, this can probably be smarter s.timers.Range(func(key, _ interface{}) bool { s.timers.Delete(key) + s.timerCount-- return false }) - s.timerCount-- } } else { t = &standardTimer{name: serializedName, sink: s.sink, base: base} From f7fc55e4179f080e9eafb299565283bb154e1664 Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Wed, 8 Jan 2025 16:11:34 -0500 Subject: [PATCH 04/32] fix test messages --- stats_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/stats_test.go b/stats_test.go index 329a396..ed0dd64 100644 --- a/stats_test.go +++ b/stats_test.go @@ -131,7 +131,7 @@ func TestMilliTimer(t *testing.T) { func TestTimerResevoir_Disabled(t *testing.T) { err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "0") if err != nil { - t.Fatalf("Failed to set GOSTATS_BATCH_ENABLED environment variable: %s", err) + t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err) } expectedStatCount := 1000 @@ -144,7 +144,7 @@ func TestTimerResevoir_Disabled(t *testing.T) { } if ts.String() != "" { - t.Errorf("Stats were written despite forced batching") + t.Errorf("Stats were written pre flush potentially clearing the resevoir too early") } store.Flush() @@ -162,7 +162,7 @@ func TestTimerResevoir_Disabled(t *testing.T) { func TestTimerReservoir(t *testing.T) { err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100") if err != nil { - t.Fatalf("Failed to set GOSTATS_BATCH_ENABLED environment variable: %s", err) + t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err) } expectedStatCount := 100 @@ -175,7 +175,7 @@ func TestTimerReservoir(t *testing.T) { } if ts.String() != "" { - t.Errorf("Stats were written despite forced batching") + t.Errorf("Stats were written pre flush potentially clearing the resevoir too early") } store.Flush() @@ -193,7 +193,7 @@ func TestTimerReservoir(t *testing.T) { func TestTimerReservoir_FilteredZeros(t *testing.T) { err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100") if err != nil { - t.Fatalf("Failed to set GOSTATS_BATCH_ENABLED environment variable: %s", err) + t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err) } ts, sink := setupTestNetSink(t, "tcp", false) @@ -204,7 +204,7 @@ func TestTimerReservoir_FilteredZeros(t *testing.T) { } if ts.String() != "" { - t.Errorf("Stats were written despite forced batching") + t.Errorf("Stats were written pre flush potentially clearing the resevoir too early") } store.Flush() From 4a0662a6bc4b3fba7bb89a1c0e52317b562c37e6 Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Wed, 8 Jan 2025 16:13:31 -0500 Subject: [PATCH 05/32] fix spelling --- stats_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/stats_test.go b/stats_test.go index ed0dd64..e5d6ce5 100644 --- a/stats_test.go +++ b/stats_test.go @@ -128,7 +128,7 @@ func TestMilliTimer(t *testing.T) { } } -func TestTimerResevoir_Disabled(t *testing.T) { +func TestTimerReservoir_Disabled(t *testing.T) { err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "0") if err != nil { t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err) @@ -144,7 +144,7 @@ func TestTimerResevoir_Disabled(t *testing.T) { } if ts.String() != "" { - t.Errorf("Stats were written pre flush potentially clearing the resevoir too early") + t.Errorf("Stats were written pre flush potentially clearing the reservoir too early") } store.Flush() @@ -175,7 +175,7 @@ func TestTimerReservoir(t *testing.T) { } if ts.String() != "" { - t.Errorf("Stats were written pre flush potentially clearing the resevoir too early") + t.Errorf("Stats were written pre flush potentially clearing the reservoir too early") } store.Flush() @@ -204,7 +204,7 @@ func TestTimerReservoir_FilteredZeros(t *testing.T) { } if ts.String() != "" { - t.Errorf("Stats were written pre flush potentially clearing the resevoir too early") + t.Errorf("Stats were written pre flush potentially clearing the reservoir too early") } store.Flush() From 57ef42b02bc2d1332fc01e880421b2e78bade064 Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Fri, 10 Jan 2025 17:33:49 -0500 Subject: [PATCH 06/32] re-design with timer reservoirs correctly independent per mertic re-design with the notion of sample rate appended to the metric --- logging_sink.go | 4 +++ mock/sink.go | 13 +++++++ net_sink.go | 20 +++++++---- net_sink_test.go | 6 ++++ null_sink.go | 2 ++ sink.go | 1 + stats.go | 93 +++++++++++++++++++++++++++++++----------------- stats_test.go | 21 +++++------ 8 files changed, 110 insertions(+), 50 deletions(-) diff --git a/logging_sink.go b/logging_sink.go index 135a9cf..2284cb8 100644 --- a/logging_sink.go +++ b/logging_sink.go @@ -92,6 +92,10 @@ func (s *loggingSink) FlushGauge(name string, value uint64) { s.log(name, "gauge func (s *loggingSink) FlushTimer(name string, value float64) { s.log(name, "timer", value) } +func (s *loggingSink) FlushTimerWithSampleRate(name string, value float64, _ float64) { + s.log(name, "timer", value) +} + func (s *loggingSink) Flush() { s.log("", "all stats", 0) } // Logger diff --git a/mock/sink.go b/mock/sink.go index 3a23c8f..d84662b 100644 --- a/mock/sink.go +++ b/mock/sink.go @@ -103,6 +103,19 @@ func (s *Sink) FlushTimer(name string, val float64) { atomic.AddInt64(&p.count, 1) } +// FlushTimer implements the stats.Sink.FlushTimer method and adds val to +// stat name. +func (s *Sink) FlushTimerWithSampleRate(name string, val float64, _ float64) { + timers := s.timers() + v, ok := timers.Load(name) + if !ok { + v, _ = timers.LoadOrStore(name, new(entry)) + } + p := v.(*entry) + atomicAddFloat64(&p.val, val) + atomic.AddInt64(&p.count, 1) +} + // LoadCounter returns the value for stat name and if it was found. func (s *Sink) LoadCounter(name string) (uint64, bool) { v, ok := s.counters().Load(name) diff --git a/net_sink.go b/net_sink.go index d4a42c0..9171e74 100644 --- a/net_sink.go +++ b/net_sink.go @@ -260,15 +260,23 @@ func (s *netSink) FlushGauge(name string, value uint64) { } func (s *netSink) FlushTimer(name string, value float64) { - // Since we mistakenly use floating point values to represent time - // durations this method is often passed an integer encoded as a - // float. Formatting integers is much faster (>2x) than formatting + s.optimizedFloatFlush(name, value, "|ms\n") +} + +func (s *netSink) FlushTimerWithSampleRate(name string, value float64, sampleRate float64) { + suffix := fmt.Sprintf("|ms|@%.1f\n", sampleRate) + s.optimizedFloatFlush(name, value, suffix) +} + +func (s *netSink) optimizedFloatFlush(name string, value float64, suffix string) { + // Since we sometimes use floating point values (e.g. when representing time + // durations), data is often an integer encoded as a float. + // Formatting integers is much faster (>2x) than formatting // floats so use integer formatting whenever possible. - // if 0 <= value && value < math.MaxUint64 && math.Trunc(value) == value { - s.flushUint64(name, "|ms\n", uint64(value)) + s.flushUint64(name, suffix, uint64(value)) } else { - s.flushFloat64(name, "|ms\n", value) + s.flushFloat64(name, suffix, value) } } diff --git a/net_sink_test.go b/net_sink_test.go index de3203c..e100eae 100644 --- a/net_sink_test.go +++ b/net_sink_test.go @@ -45,6 +45,12 @@ func (s *testStatSink) FlushTimer(name string, value float64) { s.Unlock() } +func (s *testStatSink) FlushTimerWithSampleRate(name string, value float64, sampleRate float64) { + s.Lock() + s.record += fmt.Sprintf("%s:%f|ms|@%f\n", name, value, sampleRate) + s.Unlock() +} + func TestCreateTimer(t *testing.T) { sink := &testStatSink{} store := NewStore(sink, true) diff --git a/null_sink.go b/null_sink.go index 50669de..5488fa2 100644 --- a/null_sink.go +++ b/null_sink.go @@ -13,4 +13,6 @@ func (s nullSink) FlushGauge(name string, value uint64) {} //nolint:revive func (s nullSink) FlushTimer(name string, value float64) {} //nolint:revive +func (s nullSink) FlushTimerWithSampleRate(name string, value float64, sampleRate float64) {} //nolint:revive + func (s nullSink) Flush() {} diff --git a/sink.go b/sink.go index 219a2bc..4822b56 100644 --- a/sink.go +++ b/sink.go @@ -6,6 +6,7 @@ type Sink interface { FlushCounter(name string, value uint64) FlushGauge(name string, value uint64) FlushTimer(name string, value float64) + FlushTimerWithSampleRate(name string, value float64, sampleRate float64) } // FlushableSink is an extension of Sink that provides a Flush() function that diff --git a/stats.go b/stats.go index 26a0a80..70c04ec 100644 --- a/stats.go +++ b/stats.go @@ -2,7 +2,6 @@ package stats import ( "context" - "math" "strconv" "sync" "sync/atomic" @@ -304,7 +303,8 @@ type timer interface { AddDuration(time.Duration) AddValue(float64) AllocateSpan() Timespan - Value() float64 + Values() []float64 + SampleRate() float64 } type standardTimer struct { @@ -329,14 +329,22 @@ func (t *standardTimer) AllocateSpan() Timespan { return ×pan{timer: t, start: time.Now()} } -func (t *standardTimer) Value() float64 { - return 0.0 // float zero value +func (t *standardTimer) Values() []float64 { + return nil +} + +func (t *standardTimer) SampleRate() float64 { + return 0.0 // todo: using zero value of float64. the correct value would be 1.0 given 1 stat, hwoever that 1 stat is never stored, just flushed right away } type reservoirTimer struct { - base time.Duration - name string - value uint64 + base time.Duration + name string + capacity int + values []float64 + fill int // todo: the only purpose of this is to be faster than calculating len(values), is it worht it? + count int + mu sync.Mutex } func (t *reservoirTimer) time(dur time.Duration) { @@ -348,16 +356,38 @@ func (t *reservoirTimer) AddDuration(dur time.Duration) { } func (t *reservoirTimer) AddValue(value float64) { - // todo does this need to be atomtic? ideally for the the use case it won't/shouldn't be changed like a counter/gauge would be - atomic.StoreUint64(&t.value, math.Float64bits(value)) + t.mu.Lock() + defer t.mu.Unlock() + + // todo: consider edge cases for < + if t.fill < t.capacity { + t.values = append(t.values, value) + } else { + // todo: discarding the oldest value when the reference is full, this can probably be smarter + t.values = append(t.values[1:], value) + t.fill-- + } + + t.fill++ + t.count++ } func (t *reservoirTimer) AllocateSpan() Timespan { return ×pan{timer: t, start: time.Now()} } -func (t *reservoirTimer) Value() float64 { - return math.Float64frombits(atomic.LoadUint64(&t.value)) +func (t *reservoirTimer) Values() []float64 { + t.mu.Lock() + defer t.mu.Unlock() + + // todo: Return a copy of the values slice to avoid data races + valuesCopy := make([]float64, len(t.values)) + copy(valuesCopy, t.values) + return valuesCopy +} + +func (t *reservoirTimer) SampleRate() float64 { + return float64(t.fill) / float64(t.count) // todo: is it faster to store these values as float64 instead of converting here } type timespan struct { @@ -378,9 +408,7 @@ func (ts *timespan) CompleteWithDuration(value time.Duration) { type statStore struct { counters sync.Map gauges sync.Map - timers sync.Map - - timerCount int + timers sync.Map // todo: should be control this count, especially for reservoirs we will be storing a lot of these in memory before flushing them mu sync.RWMutex statGenerators []StatGenerator @@ -438,13 +466,13 @@ func (s *statStore) Flush() { settings := GetSettings() // todo: move this to some shared memory if settings.isTimerReservoirEnabled() { s.timers.Range(func(key, v interface{}) bool { - // todo: maybe change this to not even add to the reservoir - // do not flush timers that are zero value - if value := v.(timer).Value(); value != 0.0 { - s.sink.FlushTimer(key.(string), v.(timer).Value()) + timer := v.(timer) + sampleRate := timer.SampleRate() + for _, value := range timer.Values() { + s.sink.FlushTimerWithSampleRate(key.(string), value, sampleRate) } + s.timers.Delete(key) - s.timerCount-- return true }) } @@ -554,27 +582,28 @@ func (s *statStore) newTimer(serializedName string, base time.Duration) timer { var t timer settings := GetSettings() // todo: move this to some shared memory if settings.isTimerReservoirEnabled() { - t = &reservoirTimer{name: serializedName, base: base} - - // todo: > shouldn't be necessary - if s.timerCount >= settings.TimerReservoirSize { - // todo: this will delete 1 random timer in the map, this can probably be smarter - s.timers.Range(func(key, _ interface{}) bool { - s.timers.Delete(key) - s.timerCount-- - return false - }) + // todo: have defaults defined in a shared location + t = &reservoirTimer{ + name: serializedName, + base: base, + capacity: 100, + values: make([]float64, 0, 100), + fill: 0, + count: 0, } } else { - t = &standardTimer{name: serializedName, sink: s.sink, base: base} + t = &standardTimer{ + name: serializedName, + sink: s.sink, + base: base, + } } + // todo: why would the timer ever be replaced, will this hurt reservoirs or benefit them? or is it just redundant since we load above? if v, loaded := s.timers.LoadOrStore(serializedName, t); loaded { return v.(timer) } - s.timerCount++ - return t } diff --git a/stats_test.go b/stats_test.go index e5d6ce5..aea63f8 100644 --- a/stats_test.go +++ b/stats_test.go @@ -140,7 +140,7 @@ func TestTimerReservoir_Disabled(t *testing.T) { store := NewStore(sink, true) for i := 0; i < 1000; i++ { - store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i % 10)) + store.NewTimer("test").AddValue(float64(i % 10)) } if ts.String() != "" { @@ -171,7 +171,7 @@ func TestTimerReservoir(t *testing.T) { store := NewStore(sink, true) for i := 0; i < 1000; i++ { - store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i%10 + 1)) // don't create timers with 0 values to make the count deterministic + store.NewTimer("test").AddValue(float64(i % 10)) } if ts.String() != "" { @@ -190,17 +190,19 @@ func TestTimerReservoir(t *testing.T) { os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE") } -func TestTimerReservoir_FilteredZeros(t *testing.T) { +func TestTimerReservoir_IndependantReservoirs(t *testing.T) { err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100") if err != nil { t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err) } + expectedStatCount := 1000 + ts, sink := setupTestNetSink(t, "tcp", false) store := NewStore(sink, true) for i := 0; i < 1000; i++ { - store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i % 10)) + store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i % 10)) // use different names so that we don't conflate the metrics into the same reservoir } if ts.String() != "" { @@ -211,14 +213,9 @@ func TestTimerReservoir_FilteredZeros(t *testing.T) { time.Sleep(1001 * time.Millisecond) - stats := strings.Split(ts.String(), "\n") - stats = stats[:len(stats)-1] // remove the extra new line character at the end of the buffer - for _, stat := range stats { - value := strings.Split(strings.Split(stat, ":")[1], ("|ms"))[0] // strip value and remove suffix and get raw number - if value == "0" { - t.Errorf("Got a zero value stat: %s", stat) - } - + statCount := len(strings.Split(ts.String(), "\n")) - 1 // there will be 1 extra new line character at the end of the buffer + if statCount != expectedStatCount { + t.Errorf("Not all stats were written\ngot:\n%d\nwanted:\n%d\n", statCount, expectedStatCount) } os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE") From 4610f5592b319b43bf9ba7f92b19e8f9be4c2819 Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Fri, 10 Jan 2025 17:51:02 -0500 Subject: [PATCH 07/32] add some more todos --- stats.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/stats.go b/stats.go index 70c04ec..bf3a016 100644 --- a/stats.go +++ b/stats.go @@ -472,7 +472,7 @@ func (s *statStore) Flush() { s.sink.FlushTimerWithSampleRate(key.(string), value, sampleRate) } - s.timers.Delete(key) + s.timers.Delete(key) // todo: not sure if this cleanup is necessary return true }) } @@ -582,6 +582,9 @@ func (s *statStore) newTimer(serializedName string, base time.Duration) timer { var t timer settings := GetSettings() // todo: move this to some shared memory if settings.isTimerReservoirEnabled() { + // todo: if s.timers gets to a certain size, we can flush all timers and delete them from the map + // todo: no idea how memory was managed here before did we just expect the map of s.timers to just be replaced after it's filled? + // todo: have defaults defined in a shared location t = &reservoirTimer{ name: serializedName, From cc908b5d6b82e2fb90533685d4ca6dd8897d4097 Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Fri, 10 Jan 2025 18:57:47 -0500 Subject: [PATCH 08/32] clean up redundant code improve naming --- logging_sink.go | 4 ++-- mock/sink.go | 13 +++---------- net_sink.go | 2 +- net_sink_test.go | 2 +- null_sink.go | 2 +- sink.go | 2 +- stats.go | 10 +++++----- 7 files changed, 14 insertions(+), 21 deletions(-) diff --git a/logging_sink.go b/logging_sink.go index 2284cb8..0a407cb 100644 --- a/logging_sink.go +++ b/logging_sink.go @@ -92,8 +92,8 @@ func (s *loggingSink) FlushGauge(name string, value uint64) { s.log(name, "gauge func (s *loggingSink) FlushTimer(name string, value float64) { s.log(name, "timer", value) } -func (s *loggingSink) FlushTimerWithSampleRate(name string, value float64, _ float64) { - s.log(name, "timer", value) +func (s *loggingSink) FlushAggregatedTimer(name string, value float64, _ float64) { + s.FlushTimer(name, value) } func (s *loggingSink) Flush() { s.log("", "all stats", 0) } diff --git a/mock/sink.go b/mock/sink.go index d84662b..866e56b 100644 --- a/mock/sink.go +++ b/mock/sink.go @@ -103,17 +103,10 @@ func (s *Sink) FlushTimer(name string, val float64) { atomic.AddInt64(&p.count, 1) } -// FlushTimer implements the stats.Sink.FlushTimer method and adds val to +// FlushAggregatedTimer implements the stats.Sink.FlushAggregatedTimer method and adds val to // stat name. -func (s *Sink) FlushTimerWithSampleRate(name string, val float64, _ float64) { - timers := s.timers() - v, ok := timers.Load(name) - if !ok { - v, _ = timers.LoadOrStore(name, new(entry)) - } - p := v.(*entry) - atomicAddFloat64(&p.val, val) - atomic.AddInt64(&p.count, 1) +func (s *Sink) FlushAggregatedTimer(name string, val float64, _ float64) { + s.FlushTimer(name, val) } // LoadCounter returns the value for stat name and if it was found. diff --git a/net_sink.go b/net_sink.go index 9171e74..1877fb0 100644 --- a/net_sink.go +++ b/net_sink.go @@ -263,7 +263,7 @@ func (s *netSink) FlushTimer(name string, value float64) { s.optimizedFloatFlush(name, value, "|ms\n") } -func (s *netSink) FlushTimerWithSampleRate(name string, value float64, sampleRate float64) { +func (s *netSink) FlushAggregatedTimer(name string, value float64, sampleRate float64) { suffix := fmt.Sprintf("|ms|@%.1f\n", sampleRate) s.optimizedFloatFlush(name, value, suffix) } diff --git a/net_sink_test.go b/net_sink_test.go index e100eae..4c98f88 100644 --- a/net_sink_test.go +++ b/net_sink_test.go @@ -45,7 +45,7 @@ func (s *testStatSink) FlushTimer(name string, value float64) { s.Unlock() } -func (s *testStatSink) FlushTimerWithSampleRate(name string, value float64, sampleRate float64) { +func (s *testStatSink) FlushAggregatedTimer(name string, value float64, sampleRate float64) { s.Lock() s.record += fmt.Sprintf("%s:%f|ms|@%f\n", name, value, sampleRate) s.Unlock() diff --git a/null_sink.go b/null_sink.go index 5488fa2..a790524 100644 --- a/null_sink.go +++ b/null_sink.go @@ -13,6 +13,6 @@ func (s nullSink) FlushGauge(name string, value uint64) {} //nolint:revive func (s nullSink) FlushTimer(name string, value float64) {} //nolint:revive -func (s nullSink) FlushTimerWithSampleRate(name string, value float64, sampleRate float64) {} //nolint:revive +func (s nullSink) FlushAggregatedTimer(name string, value float64, sampleRate float64) {} //nolint:revive func (s nullSink) Flush() {} diff --git a/sink.go b/sink.go index 4822b56..c34ee75 100644 --- a/sink.go +++ b/sink.go @@ -6,7 +6,7 @@ type Sink interface { FlushCounter(name string, value uint64) FlushGauge(name string, value uint64) FlushTimer(name string, value float64) - FlushTimerWithSampleRate(name string, value float64, sampleRate float64) + FlushAggregatedTimer(name string, value float64, sampleRate float64) } // FlushableSink is an extension of Sink that provides a Flush() function that diff --git a/stats.go b/stats.go index bf3a016..8657478 100644 --- a/stats.go +++ b/stats.go @@ -303,7 +303,7 @@ type timer interface { AddDuration(time.Duration) AddValue(float64) AllocateSpan() Timespan - Values() []float64 + CollectedValue() []float64 SampleRate() float64 } @@ -329,7 +329,7 @@ func (t *standardTimer) AllocateSpan() Timespan { return ×pan{timer: t, start: time.Now()} } -func (t *standardTimer) Values() []float64 { +func (t *standardTimer) CollectedValue() []float64 { return nil } @@ -376,7 +376,7 @@ func (t *reservoirTimer) AllocateSpan() Timespan { return ×pan{timer: t, start: time.Now()} } -func (t *reservoirTimer) Values() []float64 { +func (t *reservoirTimer) CollectedValue() []float64 { t.mu.Lock() defer t.mu.Unlock() @@ -468,8 +468,8 @@ func (s *statStore) Flush() { s.timers.Range(func(key, v interface{}) bool { timer := v.(timer) sampleRate := timer.SampleRate() - for _, value := range timer.Values() { - s.sink.FlushTimerWithSampleRate(key.(string), value, sampleRate) + for _, value := range timer.CollectedValue() { + s.sink.FlushAggregatedTimer(key.(string), value, sampleRate) } s.timers.Delete(key) // todo: not sure if this cleanup is necessary From b1a2defd2834df19a0e986011247ebe65b2a215c Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Sat, 11 Jan 2025 00:02:50 -0500 Subject: [PATCH 09/32] some more clean up --- logging_sink.go | 2 +- mock/sink.go | 2 +- net_sink.go | 16 ++++++++-------- net_sink_test.go | 2 +- null_sink.go | 2 +- sink.go | 2 +- stats.go | 3 +++ 7 files changed, 16 insertions(+), 13 deletions(-) diff --git a/logging_sink.go b/logging_sink.go index 0a407cb..5aa3659 100644 --- a/logging_sink.go +++ b/logging_sink.go @@ -92,7 +92,7 @@ func (s *loggingSink) FlushGauge(name string, value uint64) { s.log(name, "gauge func (s *loggingSink) FlushTimer(name string, value float64) { s.log(name, "timer", value) } -func (s *loggingSink) FlushAggregatedTimer(name string, value float64, _ float64) { +func (s *loggingSink) FlushAggregatedTimer(name string, value, _ float64) { s.FlushTimer(name, value) } diff --git a/mock/sink.go b/mock/sink.go index 866e56b..1a1cdee 100644 --- a/mock/sink.go +++ b/mock/sink.go @@ -105,7 +105,7 @@ func (s *Sink) FlushTimer(name string, val float64) { // FlushAggregatedTimer implements the stats.Sink.FlushAggregatedTimer method and adds val to // stat name. -func (s *Sink) FlushAggregatedTimer(name string, val float64, _ float64) { +func (s *Sink) FlushAggregatedTimer(name string, val, _ float64) { s.FlushTimer(name, val) } diff --git a/net_sink.go b/net_sink.go index 1877fb0..c0b032c 100644 --- a/net_sink.go +++ b/net_sink.go @@ -260,19 +260,19 @@ func (s *netSink) FlushGauge(name string, value uint64) { } func (s *netSink) FlushTimer(name string, value float64) { - s.optimizedFloatFlush(name, value, "|ms\n") + s.flushFloatOptimized(name, "|ms\n", value) } -func (s *netSink) FlushAggregatedTimer(name string, value float64, sampleRate float64) { +func (s *netSink) FlushAggregatedTimer(name string, value, sampleRate float64) { suffix := fmt.Sprintf("|ms|@%.1f\n", sampleRate) - s.optimizedFloatFlush(name, value, suffix) + s.flushFloatOptimized(name, suffix, value) } -func (s *netSink) optimizedFloatFlush(name string, value float64, suffix string) { - // Since we sometimes use floating point values (e.g. when representing time - // durations), data is often an integer encoded as a float. - // Formatting integers is much faster (>2x) than formatting - // floats so use integer formatting whenever possible. +func (s *netSink) flushFloatOptimized(name, suffix string, value float64) { + // Since we historitically used floating point values to represent time + // durations, metrics (particularly timers) are often recorded as an integer encoded as a + // float. Formatting integers is much faster (>2x) than formatting + // floats, so we should convert to an integer whenever possible. if 0 <= value && value < math.MaxUint64 && math.Trunc(value) == value { s.flushUint64(name, suffix, uint64(value)) } else { diff --git a/net_sink_test.go b/net_sink_test.go index 4c98f88..3e136e9 100644 --- a/net_sink_test.go +++ b/net_sink_test.go @@ -45,7 +45,7 @@ func (s *testStatSink) FlushTimer(name string, value float64) { s.Unlock() } -func (s *testStatSink) FlushAggregatedTimer(name string, value float64, sampleRate float64) { +func (s *testStatSink) FlushAggregatedTimer(name string, value, sampleRate float64) { s.Lock() s.record += fmt.Sprintf("%s:%f|ms|@%f\n", name, value, sampleRate) s.Unlock() diff --git a/null_sink.go b/null_sink.go index a790524..0a20ef3 100644 --- a/null_sink.go +++ b/null_sink.go @@ -13,6 +13,6 @@ func (s nullSink) FlushGauge(name string, value uint64) {} //nolint:revive func (s nullSink) FlushTimer(name string, value float64) {} //nolint:revive -func (s nullSink) FlushAggregatedTimer(name string, value float64, sampleRate float64) {} //nolint:revive +func (s nullSink) FlushAggregatedTimer(name string, value, sampleRate float64) {} //nolint:revive func (s nullSink) Flush() {} diff --git a/sink.go b/sink.go index c34ee75..9e1ff09 100644 --- a/sink.go +++ b/sink.go @@ -6,7 +6,7 @@ type Sink interface { FlushCounter(name string, value uint64) FlushGauge(name string, value uint64) FlushTimer(name string, value float64) - FlushAggregatedTimer(name string, value float64, sampleRate float64) + FlushAggregatedTimer(name string, value, sampleRate float64) } // FlushableSink is an extension of Sink that provides a Flush() function that diff --git a/stats.go b/stats.go index 8657478..cb61ad9 100644 --- a/stats.go +++ b/stats.go @@ -464,10 +464,13 @@ func (s *statStore) Flush() { }) settings := GetSettings() // todo: move this to some shared memory + // todo: i'm not sure not sure if we need a condition here or there's another way to assume this implicitly but since to my understanding s.timers + // will retain/store data even if it's unused in the case of standardTimer. in any case this should provide some optimization if settings.isTimerReservoirEnabled() { s.timers.Range(func(key, v interface{}) bool { timer := v.(timer) sampleRate := timer.SampleRate() + // CollectedValue() should be nil unless reservoirTimer for _, value := range timer.CollectedValue() { s.sink.FlushAggregatedTimer(key.(string), value, sampleRate) } From 8eb942dec2c697ba4897e813fe06523c750b7878 Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Mon, 13 Jan 2025 14:24:02 -0500 Subject: [PATCH 10/32] address todos --- stats.go | 63 ++++++++++++++++++++++---------------------------------- 1 file changed, 25 insertions(+), 38 deletions(-) diff --git a/stats.go b/stats.go index cb61ad9..6453214 100644 --- a/stats.go +++ b/stats.go @@ -214,7 +214,10 @@ type StatGenerator interface { // NewStore returns an Empty store that flushes to Sink passed as an argument. // Note: the export argument is unused. func NewStore(sink Sink, _ bool) Store { - return &statStore{sink: sink} + return &statStore{ + sink: sink, + conf: GetSettings(), // todo: right now the enviornmnet is being loaded in multiple places and this is inefficient + } } // NewDefaultStore returns a Store with a TCP statsd sink, and a running flush timer. @@ -330,11 +333,11 @@ func (t *standardTimer) AllocateSpan() Timespan { } func (t *standardTimer) CollectedValue() []float64 { - return nil + return nil // since we flush right away nothing will be collected } func (t *standardTimer) SampleRate() float64 { - return 0.0 // todo: using zero value of float64. the correct value would be 1.0 given 1 stat, hwoever that 1 stat is never stored, just flushed right away + return 1.0 // metrics which are not sampled have an implicit sample rate 1.0 } type reservoirTimer struct { @@ -342,7 +345,6 @@ type reservoirTimer struct { name string capacity int values []float64 - fill int // todo: the only purpose of this is to be faster than calculating len(values), is it worht it? count int mu sync.Mutex } @@ -359,16 +361,12 @@ func (t *reservoirTimer) AddValue(value float64) { t.mu.Lock() defer t.mu.Unlock() - // todo: consider edge cases for < - if t.fill < t.capacity { + if t.count < t.capacity { t.values = append(t.values, value) } else { - // todo: discarding the oldest value when the reference is full, this can probably be smarter - t.values = append(t.values[1:], value) - t.fill-- + t.values = append(t.values[1:], value) // discard the oldest value when the reservoir is full, this can probably be smarter } - t.fill++ t.count++ } @@ -380,14 +378,14 @@ func (t *reservoirTimer) CollectedValue() []float64 { t.mu.Lock() defer t.mu.Unlock() - // todo: Return a copy of the values slice to avoid data races - valuesCopy := make([]float64, len(t.values)) - copy(valuesCopy, t.values) - return valuesCopy + // return a copy of the values slice to avoid data races + values := make([]float64, len(t.values)) + copy(values, t.values) + return values } func (t *reservoirTimer) SampleRate() float64 { - return float64(t.fill) / float64(t.count) // todo: is it faster to store these values as float64 instead of converting here + return float64(len(t.values)) / float64(t.count) } type timespan struct { @@ -408,12 +406,14 @@ func (ts *timespan) CompleteWithDuration(value time.Duration) { type statStore struct { counters sync.Map gauges sync.Map - timers sync.Map // todo: should be control this count, especially for reservoirs we will be storing a lot of these in memory before flushing them + timers sync.Map // todo: idea how memory was managed here before did we just expect these maps to just be replaced after it's filled? mu sync.RWMutex statGenerators []StatGenerator sink Sink + + conf Settings } var ReservedTagWords = map[string]bool{"asg": true, "az": true, "backend": true, "canary": true, "host": true, "period": true, "region": true, "shard": true, "window": true, "source": true, "project": true, "facet": true, "envoyservice": true} @@ -463,22 +463,15 @@ func (s *statStore) Flush() { return true }) - settings := GetSettings() // todo: move this to some shared memory - // todo: i'm not sure not sure if we need a condition here or there's another way to assume this implicitly but since to my understanding s.timers - // will retain/store data even if it's unused in the case of standardTimer. in any case this should provide some optimization - if settings.isTimerReservoirEnabled() { - s.timers.Range(func(key, v interface{}) bool { - timer := v.(timer) + s.timers.Range(func(key, v interface{}) bool { + if timer, ok := v.(*reservoirTimer); ok { sampleRate := timer.SampleRate() - // CollectedValue() should be nil unless reservoirTimer for _, value := range timer.CollectedValue() { s.sink.FlushAggregatedTimer(key.(string), value, sampleRate) } - - s.timers.Delete(key) // todo: not sure if this cleanup is necessary - return true - }) - } + } + return true + }) flushableSink, ok := s.sink.(FlushableSink) if ok { @@ -583,18 +576,12 @@ func (s *statStore) newTimer(serializedName string, base time.Duration) timer { } var t timer - settings := GetSettings() // todo: move this to some shared memory - if settings.isTimerReservoirEnabled() { - // todo: if s.timers gets to a certain size, we can flush all timers and delete them from the map - // todo: no idea how memory was managed here before did we just expect the map of s.timers to just be replaced after it's filled? - - // todo: have defaults defined in a shared location + if s.conf.isTimerReservoirEnabled() { t = &reservoirTimer{ name: serializedName, base: base, - capacity: 100, - values: make([]float64, 0, 100), - fill: 0, + capacity: s.conf.TimerReservoirSize, + values: make([]float64, 0, s.conf.TimerReservoirSize), count: 0, } } else { @@ -605,7 +592,7 @@ func (s *statStore) newTimer(serializedName string, base time.Duration) timer { } } - // todo: why would the timer ever be replaced, will this hurt reservoirs or benefit them? or is it just redundant since we load above? + // todo: do we need special rules to not lose active reservoirs if v, loaded := s.timers.LoadOrStore(serializedName, t); loaded { return v.(timer) } From 5dd8757f9158c0609fa0a1fbbcb4d815ed877d67 Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Mon, 13 Jan 2025 14:26:49 -0500 Subject: [PATCH 11/32] fix comment --- stats.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/stats.go b/stats.go index 6453214..d75c9cc 100644 --- a/stats.go +++ b/stats.go @@ -216,7 +216,7 @@ type StatGenerator interface { func NewStore(sink Sink, _ bool) Store { return &statStore{ sink: sink, - conf: GetSettings(), // todo: right now the enviornmnet is being loaded in multiple places and this is inefficient + conf: GetSettings(), // todo: right now the environment is being loaded in multiple places and can be made more efficient } } @@ -404,9 +404,10 @@ func (ts *timespan) CompleteWithDuration(value time.Duration) { } type statStore struct { + // todo: no idea how memory was managed here, when is are the entries ever gc'd? counters sync.Map gauges sync.Map - timers sync.Map // todo: idea how memory was managed here before did we just expect these maps to just be replaced after it's filled? + timers sync.Map mu sync.RWMutex statGenerators []StatGenerator From 0d3fb45c46d167742ebedc2a1ba7b3ae65d9a6c2 Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Mon, 13 Jan 2025 14:57:32 -0500 Subject: [PATCH 12/32] ensure memory and flush management for timers --- stats.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/stats.go b/stats.go index d75c9cc..70a582b 100644 --- a/stats.go +++ b/stats.go @@ -404,7 +404,7 @@ func (ts *timespan) CompleteWithDuration(value time.Duration) { } type statStore struct { - // todo: no idea how memory was managed here, when is are the entries ever gc'd? + // todo: no idea how memory is managed here, when are the map entries ever deleted? counters sync.Map gauges sync.Map timers sync.Map @@ -451,6 +451,8 @@ func (s *statStore) Flush() { } s.mu.RUnlock() + // todo: if we're not deleting the data we flush from these maps, won't we just keep resending them? + s.counters.Range(func(key, v interface{}) bool { // do not flush counters that are set to zero if value := v.(*counter).latch(); value != 0 { @@ -470,7 +472,9 @@ func (s *statStore) Flush() { for _, value := range timer.CollectedValue() { s.sink.FlushAggregatedTimer(key.(string), value, sampleRate) } + s.timers.Delete(key) // delete it from the map so it's not flushed again } + return true }) @@ -593,7 +597,6 @@ func (s *statStore) newTimer(serializedName string, base time.Duration) timer { } } - // todo: do we need special rules to not lose active reservoirs if v, loaded := s.timers.LoadOrStore(serializedName, t); loaded { return v.(timer) } From ea5ae6aacd66b6e97c948b2b2d7c2f7d833135a8 Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Tue, 14 Jan 2025 18:22:27 -0500 Subject: [PATCH 13/32] optimize reservoirTimer by utilizing a ring buffer optimize reservoirTimer by allocating an reusing memory fix bug with sampleRate calculation optimize FlushAggregatedTimer a bit expand and fix test coverage --- net_sink.go | 3 +- stats.go | 63 +++++++++++++------ stats_test.go | 166 +++++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 204 insertions(+), 28 deletions(-) diff --git a/net_sink.go b/net_sink.go index c0b032c..73bcfa8 100644 --- a/net_sink.go +++ b/net_sink.go @@ -264,7 +264,8 @@ func (s *netSink) FlushTimer(name string, value float64) { } func (s *netSink) FlushAggregatedTimer(name string, value, sampleRate float64) { - suffix := fmt.Sprintf("|ms|@%.1f\n", sampleRate) + // todo: this can be further optimized by strconv.AppendFloat directly to the buffer in flush(Uint|Float)64 however we would need more conditions or code duplication + suffix := "|ms|@" + strconv.FormatFloat(sampleRate, 'f', 2, 64) + "\n" // todo: deteremine how many decimal places we need s.flushFloatOptimized(name, suffix, value) } diff --git a/stats.go b/stats.go index 70a582b..5403182 100644 --- a/stats.go +++ b/stats.go @@ -2,6 +2,7 @@ package stats import ( "context" + "math/bits" "strconv" "sync" "sync/atomic" @@ -216,7 +217,7 @@ type StatGenerator interface { func NewStore(sink Sink, _ bool) Store { return &statStore{ sink: sink, - conf: GetSettings(), // todo: right now the environment is being loaded in multiple places and can be made more efficient + conf: GetSettings(), // todo: right now the environment is being loaded in multiple places and can be made more efficient by computing it once and storing for subsequent gets } } @@ -306,6 +307,7 @@ type timer interface { AddDuration(time.Duration) AddValue(float64) AllocateSpan() Timespan + ResetValue(int, float64) CollectedValue() []float64 SampleRate() float64 } @@ -332,6 +334,8 @@ func (t *standardTimer) AllocateSpan() Timespan { return ×pan{timer: t, start: time.Now()} } +func (t *standardTimer) ResetValue(_ int, _ float64) {} + func (t *standardTimer) CollectedValue() []float64 { return nil // since we flush right away nothing will be collected } @@ -341,12 +345,14 @@ func (t *standardTimer) SampleRate() float64 { } type reservoirTimer struct { + mu sync.Mutex base time.Duration name string - capacity int + ringSize int + ringMask int values []float64 count int - mu sync.Mutex + overflow int } func (t *reservoirTimer) time(dur time.Duration) { @@ -361,10 +367,11 @@ func (t *reservoirTimer) AddValue(value float64) { t.mu.Lock() defer t.mu.Unlock() - if t.count < t.capacity { - t.values = append(t.values, value) - } else { - t.values = append(t.values[1:], value) // discard the oldest value when the reservoir is full, this can probably be smarter + t.values[t.overflow&t.ringMask] = value + t.overflow++ + + if t.overflow == t.ringSize { + t.overflow = 0 } t.count++ @@ -374,18 +381,31 @@ func (t *reservoirTimer) AllocateSpan() Timespan { return ×pan{timer: t, start: time.Now()} } +func (t *reservoirTimer) ResetValue(index int, value float64) { + t.mu.Lock() + defer t.mu.Unlock() + + // todo: we mistakingly record timers as floats, if we could confidently make this an int we optimize the reset with some bitwise/xor + if t.values[index] == value { + t.values[index] = 0 + } +} + func (t *reservoirTimer) CollectedValue() []float64 { t.mu.Lock() defer t.mu.Unlock() // return a copy of the values slice to avoid data races - values := make([]float64, len(t.values)) + values := make([]float64, t.ringSize) // todo: is it worth it to use t.ringSize instead of computing len of values worth it? copy(values, t.values) return values } func (t *reservoirTimer) SampleRate() float64 { - return float64(len(t.values)) / float64(t.count) + if t.count <= t.ringSize { + return 1.0 + } + return float64(t.ringSize) / float64(t.count) // todo: is it worth it to use t.ringSize instead of computing len of values worth it? } type timespan struct { @@ -404,7 +424,7 @@ func (ts *timespan) CompleteWithDuration(value time.Duration) { } type statStore struct { - // todo: no idea how memory is managed here, when are the map entries ever deleted? + // slots in this maps are reused as stats names are stable over the lifetime of the process counters sync.Map gauges sync.Map timers sync.Map @@ -451,8 +471,6 @@ func (s *statStore) Flush() { } s.mu.RUnlock() - // todo: if we're not deleting the data we flush from these maps, won't we just keep resending them? - s.counters.Range(func(key, v interface{}) bool { // do not flush counters that are set to zero if value := v.(*counter).latch(); value != 0 { @@ -469,10 +487,17 @@ func (s *statStore) Flush() { s.timers.Range(func(key, v interface{}) bool { if timer, ok := v.(*reservoirTimer); ok { sampleRate := timer.SampleRate() - for _, value := range timer.CollectedValue() { - s.sink.FlushAggregatedTimer(key.(string), value, sampleRate) + if sampleRate == 0.0 { + return true // todo: may provide reduce some processing but not sure if it's worth the complexity + } + + for i, value := range timer.CollectedValue() { + // todo: i think i need to preprocess what we flush as skipping should affect the sample rate + if value != 0.0 { + s.sink.FlushAggregatedTimer(key.(string), value, sampleRate) + timer.ResetValue(i, value) + } } - s.timers.Delete(key) // delete it from the map so it's not flushed again } return true @@ -582,12 +607,14 @@ func (s *statStore) newTimer(serializedName string, base time.Duration) timer { var t timer if s.conf.isTimerReservoirEnabled() { + capacity := s.conf.TimerReservoirSize + capacityRoundedToTheNextPowerOfTwo := 1 << bits.Len(uint(capacity)) t = &reservoirTimer{ name: serializedName, base: base, - capacity: s.conf.TimerReservoirSize, - values: make([]float64, 0, s.conf.TimerReservoirSize), - count: 0, + ringSize: capacity, + ringMask: capacityRoundedToTheNextPowerOfTwo - 1, + values: make([]float64, capacity), } } else { t = &standardTimer{ diff --git a/stats_test.go b/stats_test.go index aea63f8..20a651f 100644 --- a/stats_test.go +++ b/stats_test.go @@ -151,15 +151,24 @@ func TestTimerReservoir_Disabled(t *testing.T) { time.Sleep(1001 * time.Millisecond) - statCount := len(strings.Split(ts.String(), "\n")) - 1 // there will be 1 extra new line character at the end of the buffer + stats := strings.Split(ts.String(), "\n") + statCount := len(stats) - 1 // there will be 1 extra new line character at the end of the buffer if statCount != expectedStatCount { - t.Errorf("Not all stats were written\ngot:\n%d\nwanted:\n%d\n", statCount, expectedStatCount) + t.Errorf("Not all stats were written\ngot: %d\nwanted: %d", statCount, expectedStatCount) + } + + stats = stats[:statCount] + for _, stat := range stats { + value := strings.Split(stat, ":")[1] + if strings.Contains(value, "|@") { + t.Errorf("A stat was written with a sample rate when it shouldn't have any: %s", stat) + } } os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE") } -func TestTimerReservoir(t *testing.T) { +func TestTimerReservoir_Overflow(t *testing.T) { err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100") if err != nil { t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err) @@ -171,7 +180,7 @@ func TestTimerReservoir(t *testing.T) { store := NewStore(sink, true) for i := 0; i < 1000; i++ { - store.NewTimer("test").AddValue(float64(i % 10)) + store.NewTimer("test").AddValue(float64(i%10) + 1) // don't create timers with 0 values to make the count deterministic } if ts.String() != "" { @@ -182,9 +191,101 @@ func TestTimerReservoir(t *testing.T) { time.Sleep(1001 * time.Millisecond) - statCount := len(strings.Split(ts.String(), "\n")) - 1 // there will be 1 extra new line character at the end of the buffer + stats := strings.Split(ts.String(), "\n") + statCount := len(stats) - 1 // there will be 1 extra new line character at the end of the buffer if statCount != expectedStatCount { - t.Errorf("Not all stats were written\ngot:\n%d\nwanted:\n%d\n", statCount, expectedStatCount) + t.Errorf("Not all stats were written\ngot: %d\nwanted: %d", statCount, expectedStatCount) + } + + stats = stats[:statCount] + for _, stat := range stats { + value := strings.Split(stat, ":")[1] + sampleRate := strings.Split(value, ("|@"))[1] + if sampleRate != "0.10" { + t.Errorf("A stat was written without a 0.10 sample rate: %s", stat) + } + } + + os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE") +} + +func TestTimerReservoir_Full(t *testing.T) { + err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100") + if err != nil { + t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err) + } + + expectedStatCount := 100 + + ts, sink := setupTestNetSink(t, "tcp", false) + store := NewStore(sink, true) + + for i := 0; i < 100; i++ { + store.NewTimer("test").AddValue(float64(i%10) + 1) // don't create timers with 0 values to make the count deterministic + } + + if ts.String() != "" { + t.Errorf("Stats were written pre flush potentially clearing the reservoir too early") + } + + store.Flush() + + time.Sleep(1001 * time.Millisecond) + + stats := strings.Split(ts.String(), "\n") + statCount := len(stats) - 1 // there will be 1 extra new line character at the end of the buffer + if statCount != expectedStatCount { + t.Errorf("Not all stats were written\ngot: %d\nwanted: %d", statCount, expectedStatCount) + } + + stats = stats[:statCount] + for _, stat := range stats { + value := strings.Split(stat, ":")[1] + sampleRate := strings.Split(value, ("|@"))[1] + if sampleRate != "1.00" { + t.Errorf("A stat was written without a 1.00 sample rate: %s", stat) + } + } + + os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE") +} + +func TestTimerReservoir_NotFull(t *testing.T) { + err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100") + if err != nil { + t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err) + } + + expectedStatCount := 50 + + ts, sink := setupTestNetSink(t, "tcp", false) + store := NewStore(sink, true) + + for i := 0; i < 50; i++ { + store.NewTimer("test").AddValue(float64(i%10) + 1) // don't create timers with 0 values to make the count deterministic + } + + if ts.String() != "" { + t.Errorf("Stats were written pre flush potentially clearing the reservoir too early") + } + + store.Flush() + + time.Sleep(1001 * time.Millisecond) + + stats := strings.Split(ts.String(), "\n") + statCount := len(stats) - 1 // there will be 1 extra new line character at the end of the buffer + if statCount != expectedStatCount { + t.Errorf("Not all stats were written\ngot: %d\nwanted: %d", statCount, expectedStatCount) + } + + stats = stats[:statCount] + for _, stat := range stats { + value := strings.Split(stat, ":")[1] + sampleRate := strings.Split(value, ("|@"))[1] + if sampleRate != "1.00" { + t.Errorf("A stat was written without a 1.00 sample rate: %s", stat) + } } os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE") @@ -202,7 +303,7 @@ func TestTimerReservoir_IndependantReservoirs(t *testing.T) { store := NewStore(sink, true) for i := 0; i < 1000; i++ { - store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i % 10)) // use different names so that we don't conflate the metrics into the same reservoir + store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i%10) + 1) // use different names so that we don't conflate the metrics into the same reservoir } if ts.String() != "" { @@ -213,9 +314,56 @@ func TestTimerReservoir_IndependantReservoirs(t *testing.T) { time.Sleep(1001 * time.Millisecond) - statCount := len(strings.Split(ts.String(), "\n")) - 1 // there will be 1 extra new line character at the end of the buffer + stats := strings.Split(ts.String(), "\n") + statCount := len(stats) - 1 // there will be 1 extra new line character at the end of the buffer if statCount != expectedStatCount { - t.Errorf("Not all stats were written\ngot:\n%d\nwanted:\n%d\n", statCount, expectedStatCount) + t.Errorf("Not all stats were written\ngot: %d\nwanted: %d", statCount, expectedStatCount) + } + + stats = stats[:statCount] + for _, stat := range stats { + value := strings.Split(stat, ":")[1] + sampleRate := strings.Split(value, ("|@"))[1] + if sampleRate != "1.00" { + t.Errorf("A stat was written without a 1.00 sample rate: %s", stat) + } + } + + os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE") +} + +func TestTimerReservoir_FilteredZeros(t *testing.T) { + err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100") + if err != nil { + t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err) + } + + ts, sink := setupTestNetSink(t, "tcp", false) + store := NewStore(sink, true) + + for i := 0; i < 1000; i++ { + store.NewTimer("test").AddValue(float64(i % 10)) + } + + if ts.String() != "" { + t.Errorf("Stats were written pre flush potentially clearing the reservoir too early") + } + store.Flush() + + time.Sleep(1001 * time.Millisecond) + + stats := strings.Split(ts.String(), "\n") + stats = stats[:len(stats)-1] // there will be 1 extra new line character at the end of the buffer + for _, stat := range stats { + value := strings.Split(stat, ":")[1] + sampleRate := strings.Split(value, ("|@"))[1] + value = strings.Split(value, ("|ms"))[0] // strip value and remove suffix and get raw number + if value == "0" { + t.Errorf("A stat was written with a zero value: %s", stat) + } + if sampleRate != "0.10" { + t.Errorf("A stat was written without a 0.10 sample rate: %s", stat) + } } os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE") From e81d603dbad94e0eac4e1ed167a54a647735dce2 Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Tue, 14 Jan 2025 19:50:56 -0500 Subject: [PATCH 14/32] correct how we flush reusable timer entries --- stats.go | 62 ++++++++++++++++++++++++++++++--------------------- stats_test.go | 45 ++++--------------------------------- 2 files changed, 41 insertions(+), 66 deletions(-) diff --git a/stats.go b/stats.go index 5403182..087db42 100644 --- a/stats.go +++ b/stats.go @@ -307,9 +307,10 @@ type timer interface { AddDuration(time.Duration) AddValue(float64) AllocateSpan() Timespan - ResetValue(int, float64) - CollectedValue() []float64 + GetValue(int) float64 + ValueCount() int SampleRate() float64 + Reset() } type standardTimer struct { @@ -334,16 +335,21 @@ func (t *standardTimer) AllocateSpan() Timespan { return ×pan{timer: t, start: time.Now()} } -func (t *standardTimer) ResetValue(_ int, _ float64) {} +func (t *standardTimer) GetValue(_ int) float64 { + return 0.0 // since we flush right away nothing will be collected +} -func (t *standardTimer) CollectedValue() []float64 { - return nil // since we flush right away nothing will be collected +func (t *standardTimer) ValueCount() int { + return 0 // since we flush right away nothing will be collected } func (t *standardTimer) SampleRate() float64 { return 1.0 // metrics which are not sampled have an implicit sample rate 1.0 } +// nothing to persisted in memroy for this timer +func (t *standardTimer) Reset() {} + type reservoirTimer struct { mu sync.Mutex base time.Duration @@ -370,6 +376,7 @@ func (t *reservoirTimer) AddValue(value float64) { t.values[t.overflow&t.ringMask] = value t.overflow++ + // todo: can i optimize this with xor? if t.overflow == t.ringSize { t.overflow = 0 } @@ -381,33 +388,42 @@ func (t *reservoirTimer) AllocateSpan() Timespan { return ×pan{timer: t, start: time.Now()} } -func (t *reservoirTimer) ResetValue(index int, value float64) { +func (t *reservoirTimer) GetValue(index int) float64 { t.mu.Lock() defer t.mu.Unlock() - // todo: we mistakingly record timers as floats, if we could confidently make this an int we optimize the reset with some bitwise/xor - if t.values[index] == value { - t.values[index] = 0 - } + return t.values[index] } -func (t *reservoirTimer) CollectedValue() []float64 { - t.mu.Lock() +func (t *reservoirTimer) ValueCount() int { + t.mu.Lock() // todo: could probably convert locks like this to atomic.LoadUint64 defer t.mu.Unlock() - // return a copy of the values slice to avoid data races - values := make([]float64, t.ringSize) // todo: is it worth it to use t.ringSize instead of computing len of values worth it? - copy(values, t.values) - return values + if t.count > t.ringSize { + return t.ringSize + } + return t.count } func (t *reservoirTimer) SampleRate() float64 { + t.mu.Lock() + defer t.mu.Unlock() + + // todo: a 0 count should probably not be a 1.0 sample rate if t.count <= t.ringSize { return 1.0 } return float64(t.ringSize) / float64(t.count) // todo: is it worth it to use t.ringSize instead of computing len of values worth it? } +func (t *reservoirTimer) Reset() { + t.mu.Lock() + defer t.mu.Unlock() + + t.count = 0 // this will imply a 0.0 sample rate until it's increased + t.overflow = 0 +} + type timespan struct { timer timer start time.Time @@ -487,17 +503,13 @@ func (s *statStore) Flush() { s.timers.Range(func(key, v interface{}) bool { if timer, ok := v.(*reservoirTimer); ok { sampleRate := timer.SampleRate() - if sampleRate == 0.0 { - return true // todo: may provide reduce some processing but not sure if it's worth the complexity - } - for i, value := range timer.CollectedValue() { - // todo: i think i need to preprocess what we flush as skipping should affect the sample rate - if value != 0.0 { - s.sink.FlushAggregatedTimer(key.(string), value, sampleRate) - timer.ResetValue(i, value) - } + // since the map memory is reused only process how we accumulated in the current processing itteration + for i := 0; i < timer.ValueCount(); i++ { + s.sink.FlushAggregatedTimer(key.(string), timer.GetValue(i), sampleRate) } + + timer.Reset() // todo: need to add test coverage for a reused map } return true diff --git a/stats_test.go b/stats_test.go index 20a651f..dea20a4 100644 --- a/stats_test.go +++ b/stats_test.go @@ -180,7 +180,7 @@ func TestTimerReservoir_Overflow(t *testing.T) { store := NewStore(sink, true) for i := 0; i < 1000; i++ { - store.NewTimer("test").AddValue(float64(i%10) + 1) // don't create timers with 0 values to make the count deterministic + store.NewTimer("test").AddValue(float64(i % 10)) } if ts.String() != "" { @@ -221,7 +221,7 @@ func TestTimerReservoir_Full(t *testing.T) { store := NewStore(sink, true) for i := 0; i < 100; i++ { - store.NewTimer("test").AddValue(float64(i%10) + 1) // don't create timers with 0 values to make the count deterministic + store.NewTimer("test").AddValue(float64(i % 10)) } if ts.String() != "" { @@ -262,7 +262,7 @@ func TestTimerReservoir_NotFull(t *testing.T) { store := NewStore(sink, true) for i := 0; i < 50; i++ { - store.NewTimer("test").AddValue(float64(i%10) + 1) // don't create timers with 0 values to make the count deterministic + store.NewTimer("test").AddValue(float64(i % 10)) } if ts.String() != "" { @@ -303,7 +303,7 @@ func TestTimerReservoir_IndependantReservoirs(t *testing.T) { store := NewStore(sink, true) for i := 0; i < 1000; i++ { - store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i%10) + 1) // use different names so that we don't conflate the metrics into the same reservoir + store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i % 10)) // use different names so that we don't conflate the metrics into the same reservoir } if ts.String() != "" { @@ -332,43 +332,6 @@ func TestTimerReservoir_IndependantReservoirs(t *testing.T) { os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE") } -func TestTimerReservoir_FilteredZeros(t *testing.T) { - err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100") - if err != nil { - t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err) - } - - ts, sink := setupTestNetSink(t, "tcp", false) - store := NewStore(sink, true) - - for i := 0; i < 1000; i++ { - store.NewTimer("test").AddValue(float64(i % 10)) - } - - if ts.String() != "" { - t.Errorf("Stats were written pre flush potentially clearing the reservoir too early") - } - store.Flush() - - time.Sleep(1001 * time.Millisecond) - - stats := strings.Split(ts.String(), "\n") - stats = stats[:len(stats)-1] // there will be 1 extra new line character at the end of the buffer - for _, stat := range stats { - value := strings.Split(stat, ":")[1] - sampleRate := strings.Split(value, ("|@"))[1] - value = strings.Split(value, ("|ms"))[0] // strip value and remove suffix and get raw number - if value == "0" { - t.Errorf("A stat was written with a zero value: %s", stat) - } - if sampleRate != "0.10" { - t.Errorf("A stat was written without a 0.10 sample rate: %s", stat) - } - } - - os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE") -} - // Ensure 0 counters are not flushed func TestZeroCounters(t *testing.T) { sink := &testStatSink{} From 74a26a10c9f0095fbd851d0eed991319b2496dd9 Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Tue, 14 Jan 2025 20:10:30 -0500 Subject: [PATCH 15/32] add test for reused timer map after flushing --- net_util_test.go | 8 ++++++ stats.go | 6 ++-- stats_test.go | 74 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 85 insertions(+), 3 deletions(-) diff --git a/net_util_test.go b/net_util_test.go index 13ac2b0..0e567e3 100644 --- a/net_util_test.go +++ b/net_util_test.go @@ -272,6 +272,14 @@ func (s *netTestSink) String() string { return str } +func (s *netTestSink) Pull() string { + s.mu.Lock() + str := s.buf.String() + s.buf.Reset() + s.mu.Unlock() + return str +} + func (s *netTestSink) Host(t testing.TB) string { t.Helper() host, _, err := net.SplitHostPort(s.conn.Address().String()) diff --git a/stats.go b/stats.go index 087db42..7756cb5 100644 --- a/stats.go +++ b/stats.go @@ -440,7 +440,7 @@ func (ts *timespan) CompleteWithDuration(value time.Duration) { } type statStore struct { - // slots in this maps are reused as stats names are stable over the lifetime of the process + // these maps may grow indefinitely however slots in this maps are reused as stats names are stable over the lifetime of the process counters sync.Map gauges sync.Map timers sync.Map @@ -504,12 +504,12 @@ func (s *statStore) Flush() { if timer, ok := v.(*reservoirTimer); ok { sampleRate := timer.SampleRate() - // since the map memory is reused only process how we accumulated in the current processing itteration + // since the map memory is reused only process what we accumulated in the current processing itteration for i := 0; i < timer.ValueCount(); i++ { s.sink.FlushAggregatedTimer(key.(string), timer.GetValue(i), sampleRate) } - timer.Reset() // todo: need to add test coverage for a reused map + timer.Reset() } return true diff --git a/stats_test.go b/stats_test.go index dea20a4..58faf56 100644 --- a/stats_test.go +++ b/stats_test.go @@ -332,6 +332,80 @@ func TestTimerReservoir_IndependantReservoirs(t *testing.T) { os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE") } +func TestTimerReservoir_ReusedStore(t *testing.T) { + err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100") + if err != nil { + t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err) + } + + expectedStatCount := 100 + + ts, sink := setupTestNetSink(t, "tcp", false) + store := NewStore(sink, true) + + for i := 0; i < 100; i++ { + store.NewTimer("test").AddValue(float64(i % 10)) + } + + if ts.String() != "" { + t.Errorf("Stats were written pre flush potentially clearing the reservoir too early") + } + + store.Flush() + + time.Sleep(1001 * time.Millisecond) + + stats := strings.Split(ts.Pull(), "\n") + statCount := len(stats) - 1 // there will be 1 extra new line character at the end of the buffer + if statCount != expectedStatCount { + t.Errorf("Not all stats were written\ngot: %d\nwanted: %d", statCount, expectedStatCount) + } + + stats = stats[:statCount] + for _, stat := range stats { + value := strings.Split(stat, ":")[1] + sampleRate := strings.Split(value, ("|@"))[1] + if sampleRate != "1.00" { + t.Errorf("A stat was written without a 1.00 sample rate: %s", stat) + } + } + + if ts.String() != "" { + t.Errorf("Sink hasn't been cleared") + } + + expectedStatCount = 50 + + for i := 0; i < 50; i++ { + store.NewTimer("test").AddValue(float64(i % 10)) + } + + if ts.String() != "" { + t.Errorf("Stats were written pre flush potentially clearing the reservoir too early") + } + + store.Flush() + + time.Sleep(1001 * time.Millisecond) + + stats = strings.Split(ts.Pull(), "\n") + statCount = len(stats) - 1 // there will be 1 extra new line character at the end of the buffer + if statCount != expectedStatCount { + t.Errorf("Not all stats were written\ngot: %d\nwanted: %d", statCount, expectedStatCount) + } + + stats = stats[:statCount] + for _, stat := range stats { + value := strings.Split(stat, ":")[1] + sampleRate := strings.Split(value, ("|@"))[1] + if sampleRate != "1.00" { + t.Errorf("A stat was written without a 1.00 sample rate: %s", stat) + } + } + + os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE") +} + // Ensure 0 counters are not flushed func TestZeroCounters(t *testing.T) { sink := &testStatSink{} From 6d2687cbc96f152bd46943a1af02a115e2dd958d Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Wed, 15 Jan 2025 16:02:25 -0500 Subject: [PATCH 16/32] correct the ring buffer implementation to utilize bitwise benefits additionally remove variablility of the reservoir size as we need to ensure this number is a power of two --- settings.go | 14 ++++++-------- stats.go | 26 +++++++------------------- stats_test.go | 45 +++++++++++++++++++++++---------------------- 3 files changed, 36 insertions(+), 49 deletions(-) diff --git a/settings.go b/settings.go index 3a4f268..454a447 100644 --- a/settings.go +++ b/settings.go @@ -20,7 +20,8 @@ const ( DefaultFlushIntervalS = 5 // DefaultLoggingSinkDisabled is the default behavior of logging sink suppression, default is false. DefaultLoggingSinkDisabled = false - DefaultTimerReservoirSize = 0 + DefaultUseReservoirTimer = false // DefaultUseReservoirTimer defines if reservoir timers should be used by default, default is false. + DefaultTimerReservoirSize = 128 // DefaultTimerReservoirSize is the max capacity of the reservoir for reservoir timers. needs to be rounded to a power of two e.g. 1 << bits.Len(uint(100)) = 128 ) // The Settings type is used to configure gostats. gostats uses environment @@ -39,7 +40,8 @@ type Settings struct { // Disable the LoggingSink when USE_STATSD is false and use the NullSink instead. // This will cause all stats to be silently dropped. LoggingSinkDisabled bool `envconfig:"GOSTATS_LOGGING_SINK_DISABLED" default:"false"` - TimerReservoirSize int `envconfig:"GOSTATS_TIMER_RESERVOIR_SIZE" default:"0"` + // Enable all timers to act as reservoir timers with sampling + UseReservoirTimer bool `envconfig:"GOSTATS_USE_RESERVOIR_TIMER" default:"false"` } // An envError is an error that occurred parsing an environment variable @@ -103,7 +105,7 @@ func GetSettings() Settings { if err != nil { panic(err) } - timerReservoirSize, err := envInt("GOSTATS_TIMER_RESERVOIR_SIZE", DefaultTimerReservoirSize) + useReservoirTimer, err := envBool("GOSTATS_USE_RESERVOIR_TIMER", DefaultUseReservoirTimer) if err != nil { panic(err) } @@ -114,7 +116,7 @@ func GetSettings() Settings { StatsdPort: statsdPort, FlushIntervalS: flushIntervalS, LoggingSinkDisabled: loggingSinkDisabled, - TimerReservoirSize: timerReservoirSize, + UseReservoirTimer: useReservoirTimer, } } @@ -122,7 +124,3 @@ func GetSettings() Settings { func (s *Settings) FlushInterval() time.Duration { return time.Duration(s.FlushIntervalS) * time.Second } - -func (s *Settings) isTimerReservoirEnabled() bool { - return s.TimerReservoirSize > 0 -} diff --git a/stats.go b/stats.go index 7756cb5..5ca8a6d 100644 --- a/stats.go +++ b/stats.go @@ -2,7 +2,6 @@ package stats import ( "context" - "math/bits" "strconv" "sync" "sync/atomic" @@ -354,11 +353,10 @@ type reservoirTimer struct { mu sync.Mutex base time.Duration name string - ringSize int + ringSize int // just used so that we don't have to re-evaluate capacity of values ringMask int values []float64 count int - overflow int } func (t *reservoirTimer) time(dur time.Duration) { @@ -373,14 +371,7 @@ func (t *reservoirTimer) AddValue(value float64) { t.mu.Lock() defer t.mu.Unlock() - t.values[t.overflow&t.ringMask] = value - t.overflow++ - - // todo: can i optimize this with xor? - if t.overflow == t.ringSize { - t.overflow = 0 - } - + t.values[t.count&t.ringMask] = value t.count++ } @@ -413,7 +404,7 @@ func (t *reservoirTimer) SampleRate() float64 { if t.count <= t.ringSize { return 1.0 } - return float64(t.ringSize) / float64(t.count) // todo: is it worth it to use t.ringSize instead of computing len of values worth it? + return float64(t.ringSize) / float64(t.count) } func (t *reservoirTimer) Reset() { @@ -421,7 +412,6 @@ func (t *reservoirTimer) Reset() { defer t.mu.Unlock() t.count = 0 // this will imply a 0.0 sample rate until it's increased - t.overflow = 0 } type timespan struct { @@ -618,15 +608,13 @@ func (s *statStore) newTimer(serializedName string, base time.Duration) timer { } var t timer - if s.conf.isTimerReservoirEnabled() { - capacity := s.conf.TimerReservoirSize - capacityRoundedToTheNextPowerOfTwo := 1 << bits.Len(uint(capacity)) + if s.conf.UseReservoirTimer { t = &reservoirTimer{ name: serializedName, base: base, - ringSize: capacity, - ringMask: capacityRoundedToTheNextPowerOfTwo - 1, - values: make([]float64, capacity), + ringSize: DefaultTimerReservoirSize, + ringMask: DefaultTimerReservoirSize - 1, + values: make([]float64, DefaultTimerReservoirSize), } } else { t = &standardTimer{ diff --git a/stats_test.go b/stats_test.go index 58faf56..23b693f 100644 --- a/stats_test.go +++ b/stats_test.go @@ -129,9 +129,9 @@ func TestMilliTimer(t *testing.T) { } func TestTimerReservoir_Disabled(t *testing.T) { - err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "0") + err := os.Setenv("GOSTATS_USE_RESERVOIR_TIMER", "false") if err != nil { - t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err) + t.Fatalf("Failed to set GOSTATS_USE_RESERVOIR_TIMER environment variable: %s", err) } expectedStatCount := 1000 @@ -165,21 +165,22 @@ func TestTimerReservoir_Disabled(t *testing.T) { } } - os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE") + os.Unsetenv("GOSTATS_USE_RESERVOIR_TIMER") } func TestTimerReservoir_Overflow(t *testing.T) { - err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100") + err := os.Setenv("GOSTATS_USE_RESERVOIR_TIMER", "true") if err != nil { - t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err) + t.Fatalf("Failed to set GOSTATS_USE_RESERVOIR_TIMER environment variable: %s", err) } - expectedStatCount := 100 + expectedStatCount := 128 // reservoir size ts, sink := setupTestNetSink(t, "tcp", false) store := NewStore(sink, true) - for i := 0; i < 1000; i++ { + // this should equate to a 0.1 sample rate; 0.1 * 1280 = 128 + for i := 0; i < 1280; i++ { store.NewTimer("test").AddValue(float64(i % 10)) } @@ -206,21 +207,21 @@ func TestTimerReservoir_Overflow(t *testing.T) { } } - os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE") + os.Unsetenv("GOSTATS_USE_RESERVOIR_TIMER") } func TestTimerReservoir_Full(t *testing.T) { - err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100") + err := os.Setenv("GOSTATS_USE_RESERVOIR_TIMER", "true") if err != nil { - t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err) + t.Fatalf("Failed to set GOSTATS_USE_RESERVOIR_TIMER environment variable: %s", err) } - expectedStatCount := 100 + expectedStatCount := 128 // reservoir size ts, sink := setupTestNetSink(t, "tcp", false) store := NewStore(sink, true) - for i := 0; i < 100; i++ { + for i := 0; i < 128; i++ { store.NewTimer("test").AddValue(float64(i % 10)) } @@ -247,13 +248,13 @@ func TestTimerReservoir_Full(t *testing.T) { } } - os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE") + os.Unsetenv("GOSTATS_USE_RESERVOIR_TIMER") } func TestTimerReservoir_NotFull(t *testing.T) { - err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100") + err := os.Setenv("GOSTATS_USE_RESERVOIR_TIMER", "true") if err != nil { - t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err) + t.Fatalf("Failed to set GOSTATS_USE_RESERVOIR_TIMER environment variable: %s", err) } expectedStatCount := 50 @@ -288,13 +289,13 @@ func TestTimerReservoir_NotFull(t *testing.T) { } } - os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE") + os.Unsetenv("GOSTATS_USE_RESERVOIR_TIMER") } func TestTimerReservoir_IndependantReservoirs(t *testing.T) { - err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100") + err := os.Setenv("GOSTATS_USE_RESERVOIR_TIMER", "true") if err != nil { - t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err) + t.Fatalf("Failed to set GOSTATS_USE_RESERVOIR_TIMER environment variable: %s", err) } expectedStatCount := 1000 @@ -329,13 +330,13 @@ func TestTimerReservoir_IndependantReservoirs(t *testing.T) { } } - os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE") + os.Unsetenv("GOSTATS_USE_RESERVOIR_TIMER") } func TestTimerReservoir_ReusedStore(t *testing.T) { - err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100") + err := os.Setenv("GOSTATS_USE_RESERVOIR_TIMER", "true") if err != nil { - t.Fatalf("Failed to set GOSTATS_TIMER_RESERVOIR_SIZE environment variable: %s", err) + t.Fatalf("Failed to set GOSTATS_USE_RESERVOIR_TIMER environment variable: %s", err) } expectedStatCount := 100 @@ -403,7 +404,7 @@ func TestTimerReservoir_ReusedStore(t *testing.T) { } } - os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE") + os.Unsetenv("GOSTATS_USE_RESERVOIR_TIMER") } // Ensure 0 counters are not flushed From d067744bd3e3cccaf4c28825c28c6fbf7dd72ca6 Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Wed, 15 Jan 2025 17:13:44 -0500 Subject: [PATCH 17/32] improve reservoirTimer property access --- stats.go | 31 ++++++++++++++----------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/stats.go b/stats.go index 5ca8a6d..97d3423 100644 --- a/stats.go +++ b/stats.go @@ -353,10 +353,10 @@ type reservoirTimer struct { mu sync.Mutex base time.Duration name string - ringSize int // just used so that we don't have to re-evaluate capacity of values - ringMask int + ringSize uint64 // just used so that we don't have to re-evaluate capacity of values + ringMask uint64 values []float64 - count int + count uint64 } func (t *reservoirTimer) time(dur time.Duration) { @@ -371,6 +371,7 @@ func (t *reservoirTimer) AddValue(value float64) { t.mu.Lock() defer t.mu.Unlock() + // direct access to t.count and t.ringMask is protected by the mutex t.values[t.count&t.ringMask] = value t.count++ } @@ -387,31 +388,27 @@ func (t *reservoirTimer) GetValue(index int) float64 { } func (t *reservoirTimer) ValueCount() int { - t.mu.Lock() // todo: could probably convert locks like this to atomic.LoadUint64 - defer t.mu.Unlock() + count := atomic.LoadUint64(&t.count) + ringSize := atomic.LoadUint64(&t.ringSize) - if t.count > t.ringSize { - return t.ringSize + if count > ringSize { + return int(ringSize) } - return t.count + return int(count) } func (t *reservoirTimer) SampleRate() float64 { - t.mu.Lock() - defer t.mu.Unlock() + count := atomic.LoadUint64(&t.count) + ringSize := atomic.LoadUint64(&t.ringSize) - // todo: a 0 count should probably not be a 1.0 sample rate - if t.count <= t.ringSize { + if count <= ringSize { return 1.0 } - return float64(t.ringSize) / float64(t.count) + return float64(ringSize) / float64(count) } func (t *reservoirTimer) Reset() { - t.mu.Lock() - defer t.mu.Unlock() - - t.count = 0 // this will imply a 0.0 sample rate until it's increased + atomic.StoreUint64(&t.count, 0) } type timespan struct { From 7e5a451fbcb972863ee96509deebb155bd2e3eab Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Wed, 15 Jan 2025 19:11:19 -0500 Subject: [PATCH 18/32] make reservoir tests more dynamic improve comments --- settings.go | 8 ++++++-- stats.go | 6 +++--- stats_test.go | 39 +++++++++++++++++++++++---------------- 3 files changed, 32 insertions(+), 21 deletions(-) diff --git a/settings.go b/settings.go index 454a447..aa83d28 100644 --- a/settings.go +++ b/settings.go @@ -20,8 +20,12 @@ const ( DefaultFlushIntervalS = 5 // DefaultLoggingSinkDisabled is the default behavior of logging sink suppression, default is false. DefaultLoggingSinkDisabled = false - DefaultUseReservoirTimer = false // DefaultUseReservoirTimer defines if reservoir timers should be used by default, default is false. - DefaultTimerReservoirSize = 128 // DefaultTimerReservoirSize is the max capacity of the reservoir for reservoir timers. needs to be rounded to a power of two e.g. 1 << bits.Len(uint(100)) = 128 + // DefaultUseReservoirTimer defines if reservoir timers should be used by default, default is false. + DefaultUseReservoirTimer = false + // FixedTimerReservoirSize is the max capacity of the reservoir for reservoir timers. + // note: needs to be rounded to a power of two e.g. 1 << bits.Len(uint(100)) = 128 + // todo: see if we can use not-strict number and just account for the offset + FixedTimerReservoirSize = 128 ) // The Settings type is used to configure gostats. gostats uses environment diff --git a/stats.go b/stats.go index 97d3423..295c697 100644 --- a/stats.go +++ b/stats.go @@ -609,9 +609,9 @@ func (s *statStore) newTimer(serializedName string, base time.Duration) timer { t = &reservoirTimer{ name: serializedName, base: base, - ringSize: DefaultTimerReservoirSize, - ringMask: DefaultTimerReservoirSize - 1, - values: make([]float64, DefaultTimerReservoirSize), + ringSize: FixedTimerReservoirSize, + ringMask: FixedTimerReservoirSize - 1, + values: make([]float64, FixedTimerReservoirSize), } } else { t = &standardTimer{ diff --git a/stats_test.go b/stats_test.go index 23b693f..bb39509 100644 --- a/stats_test.go +++ b/stats_test.go @@ -134,12 +134,13 @@ func TestTimerReservoir_Disabled(t *testing.T) { t.Fatalf("Failed to set GOSTATS_USE_RESERVOIR_TIMER environment variable: %s", err) } - expectedStatCount := 1000 + statsToSend := FixedTimerReservoirSize * 3 + expectedStatCount := statsToSend ts, sink := setupTestNetSink(t, "tcp", false) store := NewStore(sink, true) - for i := 0; i < 1000; i++ { + for i := 0; i < statsToSend; i++ { store.NewTimer("test").AddValue(float64(i % 10)) } @@ -174,13 +175,14 @@ func TestTimerReservoir_Overflow(t *testing.T) { t.Fatalf("Failed to set GOSTATS_USE_RESERVOIR_TIMER environment variable: %s", err) } - expectedStatCount := 128 // reservoir size + statsToSend := FixedTimerReservoirSize * 3 + expectedStatCount := FixedTimerReservoirSize ts, sink := setupTestNetSink(t, "tcp", false) store := NewStore(sink, true) // this should equate to a 0.1 sample rate; 0.1 * 1280 = 128 - for i := 0; i < 1280; i++ { + for i := 0; i < statsToSend; i++ { store.NewTimer("test").AddValue(float64(i % 10)) } @@ -202,8 +204,8 @@ func TestTimerReservoir_Overflow(t *testing.T) { for _, stat := range stats { value := strings.Split(stat, ":")[1] sampleRate := strings.Split(value, ("|@"))[1] - if sampleRate != "0.10" { - t.Errorf("A stat was written without a 0.10 sample rate: %s", stat) + if sampleRate != "0.33" { + t.Errorf("A stat was written without a 0.33 sample rate: %s", stat) } } @@ -216,12 +218,13 @@ func TestTimerReservoir_Full(t *testing.T) { t.Fatalf("Failed to set GOSTATS_USE_RESERVOIR_TIMER environment variable: %s", err) } - expectedStatCount := 128 // reservoir size + statsToSend := FixedTimerReservoirSize + expectedStatCount := statsToSend ts, sink := setupTestNetSink(t, "tcp", false) store := NewStore(sink, true) - for i := 0; i < 128; i++ { + for i := 0; i < statsToSend; i++ { store.NewTimer("test").AddValue(float64(i % 10)) } @@ -257,12 +260,13 @@ func TestTimerReservoir_NotFull(t *testing.T) { t.Fatalf("Failed to set GOSTATS_USE_RESERVOIR_TIMER environment variable: %s", err) } - expectedStatCount := 50 + statsToSend := FixedTimerReservoirSize / 2 + expectedStatCount := statsToSend ts, sink := setupTestNetSink(t, "tcp", false) store := NewStore(sink, true) - for i := 0; i < 50; i++ { + for i := 0; i < statsToSend; i++ { store.NewTimer("test").AddValue(float64(i % 10)) } @@ -298,12 +302,13 @@ func TestTimerReservoir_IndependantReservoirs(t *testing.T) { t.Fatalf("Failed to set GOSTATS_USE_RESERVOIR_TIMER environment variable: %s", err) } - expectedStatCount := 1000 + statsToSend := FixedTimerReservoirSize * 3 + expectedStatCount := statsToSend ts, sink := setupTestNetSink(t, "tcp", false) store := NewStore(sink, true) - for i := 0; i < 1000; i++ { + for i := 0; i < statsToSend; i++ { store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i % 10)) // use different names so that we don't conflate the metrics into the same reservoir } @@ -339,12 +344,13 @@ func TestTimerReservoir_ReusedStore(t *testing.T) { t.Fatalf("Failed to set GOSTATS_USE_RESERVOIR_TIMER environment variable: %s", err) } - expectedStatCount := 100 + statsToSend := FixedTimerReservoirSize / 2 + expectedStatCount := statsToSend ts, sink := setupTestNetSink(t, "tcp", false) store := NewStore(sink, true) - for i := 0; i < 100; i++ { + for i := 0; i < statsToSend; i++ { store.NewTimer("test").AddValue(float64(i % 10)) } @@ -375,9 +381,10 @@ func TestTimerReservoir_ReusedStore(t *testing.T) { t.Errorf("Sink hasn't been cleared") } - expectedStatCount = 50 + statsToSend = FixedTimerReservoirSize + expectedStatCount = statsToSend - for i := 0; i < 50; i++ { + for i := 0; i < statsToSend; i++ { store.NewTimer("test").AddValue(float64(i % 10)) } From a54db1a78d4f20ce1acd420ce81fd7ea4c656545 Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Wed, 15 Jan 2025 19:15:36 -0500 Subject: [PATCH 19/32] improve comments --- settings.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/settings.go b/settings.go index aa83d28..23479f0 100644 --- a/settings.go +++ b/settings.go @@ -20,7 +20,7 @@ const ( DefaultFlushIntervalS = 5 // DefaultLoggingSinkDisabled is the default behavior of logging sink suppression, default is false. DefaultLoggingSinkDisabled = false - // DefaultUseReservoirTimer defines if reservoir timers should be used by default, default is false. + // DefaultUseReservoirTimer defines if all timers should be reservoir timers by default. DefaultUseReservoirTimer = false // FixedTimerReservoirSize is the max capacity of the reservoir for reservoir timers. // note: needs to be rounded to a power of two e.g. 1 << bits.Len(uint(100)) = 128 @@ -44,7 +44,7 @@ type Settings struct { // Disable the LoggingSink when USE_STATSD is false and use the NullSink instead. // This will cause all stats to be silently dropped. LoggingSinkDisabled bool `envconfig:"GOSTATS_LOGGING_SINK_DISABLED" default:"false"` - // Enable all timers to act as reservoir timers with sampling + // Make all timers reservoir timers with implied sampling under flush interval of FlushIntervalS UseReservoirTimer bool `envconfig:"GOSTATS_USE_RESERVOIR_TIMER" default:"false"` } From 18c0e5769f371b350a309decd7d928fe6716f083 Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Wed, 15 Jan 2025 20:02:19 -0500 Subject: [PATCH 20/32] optimize reservoir timer flush --- logging_sink.go | 2 +- mock/sink.go | 4 ++-- net_sink.go | 53 +++++++++++++++++++++++++++++++++++------------- net_sink_test.go | 2 +- null_sink.go | 2 +- sink.go | 2 +- stats.go | 2 +- 7 files changed, 46 insertions(+), 21 deletions(-) diff --git a/logging_sink.go b/logging_sink.go index 5aa3659..04c3aa8 100644 --- a/logging_sink.go +++ b/logging_sink.go @@ -92,7 +92,7 @@ func (s *loggingSink) FlushGauge(name string, value uint64) { s.log(name, "gauge func (s *loggingSink) FlushTimer(name string, value float64) { s.log(name, "timer", value) } -func (s *loggingSink) FlushAggregatedTimer(name string, value, _ float64) { +func (s *loggingSink) FlushSampledTimer(name string, value, _ float64) { s.FlushTimer(name, value) } diff --git a/mock/sink.go b/mock/sink.go index 1a1cdee..7584dae 100644 --- a/mock/sink.go +++ b/mock/sink.go @@ -103,9 +103,9 @@ func (s *Sink) FlushTimer(name string, val float64) { atomic.AddInt64(&p.count, 1) } -// FlushAggregatedTimer implements the stats.Sink.FlushAggregatedTimer method and adds val to +// FlushSampledTimer implements the stats.Sink.FlushSampledTimer method and adds val to // stat name. -func (s *Sink) FlushAggregatedTimer(name string, val, _ float64) { +func (s *Sink) FlushSampledTimer(name string, val, _ float64) { s.FlushTimer(name, val) } diff --git a/net_sink.go b/net_sink.go index 73bcfa8..1d25831 100644 --- a/net_sink.go +++ b/net_sink.go @@ -260,25 +260,46 @@ func (s *netSink) FlushGauge(name string, value uint64) { } func (s *netSink) FlushTimer(name string, value float64) { - s.flushFloatOptimized(name, "|ms\n", value) + // Since we mistakenly use floating point values to represent time + // durations this method is often passed an integer encoded as a + // float. Formatting integers is much faster (>2x) than formatting + // floats so use integer formatting whenever possible. + // + if 0 <= value && value < math.MaxUint64 && math.Trunc(value) == value { + s.flushUint64(name, "|ms\n", uint64(value)) + } else { + s.flushFloat64(name, "|ms\n", value) + } } -func (s *netSink) FlushAggregatedTimer(name string, value, sampleRate float64) { - // todo: this can be further optimized by strconv.AppendFloat directly to the buffer in flush(Uint|Float)64 however we would need more conditions or code duplication - suffix := "|ms|@" + strconv.FormatFloat(sampleRate, 'f', 2, 64) + "\n" // todo: deteremine how many decimal places we need - s.flushFloatOptimized(name, suffix, value) -} +func (s *netSink) FlushSampledTimer(name string, value, sampleRate float64) { + timerSuffix := "|ms" + sampleSuffix := "|@" + metricSuffix := "\n" -func (s *netSink) flushFloatOptimized(name, suffix string, value float64) { - // Since we historitically used floating point values to represent time - // durations, metrics (particularly timers) are often recorded as an integer encoded as a - // float. Formatting integers is much faster (>2x) than formatting - // floats, so we should convert to an integer whenever possible. + // todo: see if we can dedup code here without the expense of efficiency + var writeValue func(*buffer) if 0 <= value && value < math.MaxUint64 && math.Trunc(value) == value { - s.flushUint64(name, suffix, uint64(value)) + writeValue = func(b *buffer) { b.WriteUnit64(uint64(value)) } } else { - s.flushFloat64(name, suffix, value) + writeValue = func(b *buffer) { b.WriteFloat64(value) } } + + b := pbFree.Get().(*buffer) + + b.WriteString(name) + b.WriteChar(':') + writeValue(b) + b.WriteString(timerSuffix) + + b.WriteString(sampleSuffix) + b.writeFloat64WithPercision(sampleRate, 2) // todo: deteremine how many decimal places we need + b.WriteString(metricSuffix) + + s.writeBuffer(b) + + b.Reset() + pbFree.Put(b) } func (s *netSink) run() { @@ -426,5 +447,9 @@ func (b *buffer) WriteUnit64(val uint64) { } func (b *buffer) WriteFloat64(val float64) { - *b = strconv.AppendFloat(*b, val, 'f', 6, 64) + b.writeFloat64WithPercision(val, 6) +} + +func (b *buffer) writeFloat64WithPercision(val float64, precision int) { + *b = strconv.AppendFloat(*b, val, 'f', precision, 64) } diff --git a/net_sink_test.go b/net_sink_test.go index 3e136e9..28897e0 100644 --- a/net_sink_test.go +++ b/net_sink_test.go @@ -45,7 +45,7 @@ func (s *testStatSink) FlushTimer(name string, value float64) { s.Unlock() } -func (s *testStatSink) FlushAggregatedTimer(name string, value, sampleRate float64) { +func (s *testStatSink) FlushSampledTimer(name string, value, sampleRate float64) { s.Lock() s.record += fmt.Sprintf("%s:%f|ms|@%f\n", name, value, sampleRate) s.Unlock() diff --git a/null_sink.go b/null_sink.go index 0a20ef3..473a50d 100644 --- a/null_sink.go +++ b/null_sink.go @@ -13,6 +13,6 @@ func (s nullSink) FlushGauge(name string, value uint64) {} //nolint:revive func (s nullSink) FlushTimer(name string, value float64) {} //nolint:revive -func (s nullSink) FlushAggregatedTimer(name string, value, sampleRate float64) {} //nolint:revive +func (s nullSink) FlushSampledTimer(name string, value, sampleRate float64) {} //nolint:revive func (s nullSink) Flush() {} diff --git a/sink.go b/sink.go index 9e1ff09..956b0df 100644 --- a/sink.go +++ b/sink.go @@ -6,7 +6,7 @@ type Sink interface { FlushCounter(name string, value uint64) FlushGauge(name string, value uint64) FlushTimer(name string, value float64) - FlushAggregatedTimer(name string, value, sampleRate float64) + FlushSampledTimer(name string, value, sampleRate float64) } // FlushableSink is an extension of Sink that provides a Flush() function that diff --git a/stats.go b/stats.go index 295c697..15c58de 100644 --- a/stats.go +++ b/stats.go @@ -493,7 +493,7 @@ func (s *statStore) Flush() { // since the map memory is reused only process what we accumulated in the current processing itteration for i := 0; i < timer.ValueCount(); i++ { - s.sink.FlushAggregatedTimer(key.(string), timer.GetValue(i), sampleRate) + s.sink.FlushSampledTimer(key.(string), timer.GetValue(i), sampleRate) } timer.Reset() From bf0ef6356922b23e7e33e377e9191a31ec12f8a3 Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Wed, 15 Jan 2025 23:44:34 -0500 Subject: [PATCH 21/32] block never flush edge cases when stores are constructed outside of NewDefaultStore and resevoir timers are enabled implicitly optimize store construction --- net_sink.go | 6 +++--- settings.go | 6 +++++- stats.go | 42 ++++++++++++++++++++++++++++++------------ stats_test.go | 14 ++++++++------ 4 files changed, 46 insertions(+), 22 deletions(-) diff --git a/net_sink.go b/net_sink.go index 1d25831..e0fde54 100644 --- a/net_sink.go +++ b/net_sink.go @@ -293,7 +293,7 @@ func (s *netSink) FlushSampledTimer(name string, value, sampleRate float64) { b.WriteString(timerSuffix) b.WriteString(sampleSuffix) - b.writeFloat64WithPercision(sampleRate, 2) // todo: deteremine how many decimal places we need + b.writeFloat64WithPrecision(sampleRate, 2) // todo: deteremine how many decimal places we need b.WriteString(metricSuffix) s.writeBuffer(b) @@ -447,9 +447,9 @@ func (b *buffer) WriteUnit64(val uint64) { } func (b *buffer) WriteFloat64(val float64) { - b.writeFloat64WithPercision(val, 6) + b.writeFloat64WithPrecision(val, 6) } -func (b *buffer) writeFloat64WithPercision(val float64, precision int) { +func (b *buffer) writeFloat64WithPrecision(val float64, precision int) { *b = strconv.AppendFloat(*b, val, 'f', precision, 64) } diff --git a/settings.go b/settings.go index 23479f0..d98077e 100644 --- a/settings.go +++ b/settings.go @@ -24,7 +24,10 @@ const ( DefaultUseReservoirTimer = false // FixedTimerReservoirSize is the max capacity of the reservoir for reservoir timers. // note: needs to be rounded to a power of two e.g. 1 << bits.Len(uint(100)) = 128 - // todo: see if we can use not-strict number and just account for the offset + // todo: see if it's worth efficency trade off to reduce tech debt of this magic number and allowing any number. + // we could determine the difference between the defined size and next power of two + // and use that to offset the counter when ANDing it against the mask, + // once the result is 0 we just increment offset by "original offset" FixedTimerReservoirSize = 128 ) @@ -92,6 +95,7 @@ func envBool(key string, def bool) (bool, error) { } // GetSettings returns the Settings gostats will run with. +// todo: can we optimize this by storing the result for subsequent calls func GetSettings() Settings { useStatsd, err := envBool("USE_STATSD", DefaultUseStatsd) if err != nil { diff --git a/stats.go b/stats.go index 15c58de..26763a4 100644 --- a/stats.go +++ b/stats.go @@ -215,24 +215,33 @@ type StatGenerator interface { // Note: the export argument is unused. func NewStore(sink Sink, _ bool) Store { return &statStore{ - sink: sink, - conf: GetSettings(), // todo: right now the environment is being loaded in multiple places and can be made more efficient by computing it once and storing for subsequent gets + sink: sink, + timerType: standard, } } +func newStatStore(sink Sink, export bool, conf Settings) *statStore { + store := NewStore(sink, export).(*statStore) + if conf.UseReservoirTimer { + store.timerType = reservoir + } + return store +} + // NewDefaultStore returns a Store with a TCP statsd sink, and a running flush timer. +// note: this is the only way to use reservoir timers as they rely on the store flush loop func NewDefaultStore() Store { var newStore Store settings := GetSettings() if !settings.UseStatsd { if settings.LoggingSinkDisabled { - newStore = NewStore(NewNullSink(), false) + newStore = newStatStore(NewNullSink(), false, settings) } else { - newStore = NewStore(NewLoggingSink(), false) + newStore = newStatStore(NewLoggingSink(), false, settings) } go newStore.Start(time.NewTicker(10 * time.Second)) } else { - newStore = NewStore(NewTCPStatsdSink(), false) + newStore = newStatStore(NewTCPStatsdSink(), false, settings) go newStore.Start(time.NewTicker(time.Duration(settings.FlushIntervalS) * time.Second)) } return newStore @@ -301,6 +310,13 @@ func (c *gauge) Value() uint64 { return atomic.LoadUint64(&c.value) } +type timerType int + +const ( + standard timerType = iota + reservoir +) + type timer interface { time(time.Duration) AddDuration(time.Duration) @@ -428,16 +444,15 @@ func (ts *timespan) CompleteWithDuration(value time.Duration) { type statStore struct { // these maps may grow indefinitely however slots in this maps are reused as stats names are stable over the lifetime of the process - counters sync.Map - gauges sync.Map - timers sync.Map + counters sync.Map + gauges sync.Map + timers sync.Map + timerType timerType mu sync.RWMutex statGenerators []StatGenerator sink Sink - - conf Settings } var ReservedTagWords = map[string]bool{"asg": true, "az": true, "backend": true, "canary": true, "host": true, "period": true, "region": true, "shard": true, "window": true, "source": true, "project": true, "facet": true, "envoyservice": true} @@ -605,7 +620,8 @@ func (s *statStore) newTimer(serializedName string, base time.Duration) timer { } var t timer - if s.conf.UseReservoirTimer { + switch s.timerType { + case reservoir: t = &reservoirTimer{ name: serializedName, base: base, @@ -613,7 +629,9 @@ func (s *statStore) newTimer(serializedName string, base time.Duration) timer { ringMask: FixedTimerReservoirSize - 1, values: make([]float64, FixedTimerReservoirSize), } - } else { + case standard: // this should allow backward compatible a backwards compatible fallback as standard is the zero value of s.timerType + fallthrough + default: t = &standardTimer{ name: serializedName, sink: s.sink, diff --git a/stats_test.go b/stats_test.go index bb39509..c1e3b0e 100644 --- a/stats_test.go +++ b/stats_test.go @@ -138,7 +138,7 @@ func TestTimerReservoir_Disabled(t *testing.T) { expectedStatCount := statsToSend ts, sink := setupTestNetSink(t, "tcp", false) - store := NewStore(sink, true) + store := newStatStore(sink, true, GetSettings()) for i := 0; i < statsToSend; i++ { store.NewTimer("test").AddValue(float64(i % 10)) @@ -179,7 +179,7 @@ func TestTimerReservoir_Overflow(t *testing.T) { expectedStatCount := FixedTimerReservoirSize ts, sink := setupTestNetSink(t, "tcp", false) - store := NewStore(sink, true) + store := newStatStore(sink, true, GetSettings()) // this should equate to a 0.1 sample rate; 0.1 * 1280 = 128 for i := 0; i < statsToSend; i++ { @@ -222,7 +222,7 @@ func TestTimerReservoir_Full(t *testing.T) { expectedStatCount := statsToSend ts, sink := setupTestNetSink(t, "tcp", false) - store := NewStore(sink, true) + store := newStatStore(sink, true, GetSettings()) for i := 0; i < statsToSend; i++ { store.NewTimer("test").AddValue(float64(i % 10)) @@ -264,7 +264,7 @@ func TestTimerReservoir_NotFull(t *testing.T) { expectedStatCount := statsToSend ts, sink := setupTestNetSink(t, "tcp", false) - store := NewStore(sink, true) + store := newStatStore(sink, true, GetSettings()) for i := 0; i < statsToSend; i++ { store.NewTimer("test").AddValue(float64(i % 10)) @@ -306,7 +306,7 @@ func TestTimerReservoir_IndependantReservoirs(t *testing.T) { expectedStatCount := statsToSend ts, sink := setupTestNetSink(t, "tcp", false) - store := NewStore(sink, true) + store := newStatStore(sink, true, GetSettings()) for i := 0; i < statsToSend; i++ { store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i % 10)) // use different names so that we don't conflate the metrics into the same reservoir @@ -348,7 +348,7 @@ func TestTimerReservoir_ReusedStore(t *testing.T) { expectedStatCount := statsToSend ts, sink := setupTestNetSink(t, "tcp", false) - store := NewStore(sink, true) + store := newStatStore(sink, true, GetSettings()) for i := 0; i < statsToSend; i++ { store.NewTimer("test").AddValue(float64(i % 10)) @@ -414,6 +414,8 @@ func TestTimerReservoir_ReusedStore(t *testing.T) { os.Unsetenv("GOSTATS_USE_RESERVOIR_TIMER") } +// todo: add test coverage for NewDefaultStore and the automatic flush loop + // Ensure 0 counters are not flushed func TestZeroCounters(t *testing.T) { sink := &testStatSink{} From 8dad5edeae3867c22ce27cc1c7e6ac74473286b4 Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Wed, 15 Jan 2025 23:45:50 -0500 Subject: [PATCH 22/32] fix typo in comment --- settings.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/settings.go b/settings.go index d98077e..d052129 100644 --- a/settings.go +++ b/settings.go @@ -24,7 +24,7 @@ const ( DefaultUseReservoirTimer = false // FixedTimerReservoirSize is the max capacity of the reservoir for reservoir timers. // note: needs to be rounded to a power of two e.g. 1 << bits.Len(uint(100)) = 128 - // todo: see if it's worth efficency trade off to reduce tech debt of this magic number and allowing any number. + // todo: see if it's worth an efficiency trade off to reduce tech debt of this magic number and allowing any number. // we could determine the difference between the defined size and next power of two // and use that to offset the counter when ANDing it against the mask, // once the result is 0 we just increment offset by "original offset" From 976215245c1e4f9f51d5bd0bc4ff1691fc46dd46 Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Thu, 16 Jan 2025 15:19:53 -0500 Subject: [PATCH 23/32] add test for reservoir automatic flushing fix data race in reservoirTimer --- stats.go | 6 ++--- stats_test.go | 67 +++++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 68 insertions(+), 5 deletions(-) diff --git a/stats.go b/stats.go index 26763a4..9dc6833 100644 --- a/stats.go +++ b/stats.go @@ -387,9 +387,9 @@ func (t *reservoirTimer) AddValue(value float64) { t.mu.Lock() defer t.mu.Unlock() - // direct access to t.count and t.ringMask is protected by the mutex - t.values[t.count&t.ringMask] = value - t.count++ + // t.ringMask isn't ever changed and accessing it here is protected by the mutex + t.values[atomic.LoadUint64(&t.count)&t.ringMask] = value + atomic.AddUint64(&t.count, 1) } func (t *reservoirTimer) AllocateSpan() Timespan { diff --git a/stats_test.go b/stats_test.go index c1e3b0e..658f9e4 100644 --- a/stats_test.go +++ b/stats_test.go @@ -181,7 +181,6 @@ func TestTimerReservoir_Overflow(t *testing.T) { ts, sink := setupTestNetSink(t, "tcp", false) store := newStatStore(sink, true, GetSettings()) - // this should equate to a 0.1 sample rate; 0.1 * 1280 = 128 for i := 0; i < statsToSend; i++ { store.NewTimer("test").AddValue(float64(i % 10)) } @@ -414,7 +413,71 @@ func TestTimerReservoir_ReusedStore(t *testing.T) { os.Unsetenv("GOSTATS_USE_RESERVOIR_TIMER") } -// todo: add test coverage for NewDefaultStore and the automatic flush loop +func TestTimerReservoir_AutomaticFlush(t *testing.T) { + err := os.Setenv("GOSTATS_USE_RESERVOIR_TIMER", "true") + if err != nil { + t.Fatalf("Failed to set GOSTATS_USE_RESERVOIR_TIMER environment variable: %s", err) + } + + flushIntervalS := 5 + expectedStatCount := FixedTimerReservoirSize * 2 + + ts, sink := setupTestNetSink(t, "tcp", false) + store := newStatStore(sink, true, GetSettings()) + + ctx, cancel := context.WithCancel(context.Background()) + + wg := &sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + store.StartContext(ctx, time.NewTicker(time.Duration(flushIntervalS)*time.Second)) + }() + + // first reservoir timer + statsToSend := FixedTimerReservoirSize + for i := 0; i < statsToSend; i++ { + store.NewTimer("test1").AddValue(float64(i % 10)) + } + + // second reservoir timer + statsToSend = FixedTimerReservoirSize * 3 + for i := 0; i < statsToSend; i++ { + store.NewTimer("test2").AddValue(float64(i % 10)) + } + + if ts.String() != "" { + t.Errorf("Stats were written pre flush invalidating the test") + } + + time.Sleep(time.Duration(flushIntervalS+1) * time.Second) // increment a second to allow the flush to happen + + stats := strings.Split(ts.String(), "\n") + statCount := len(stats) - 1 // there will be 1 extra new line character at the end of the buffer + if statCount != expectedStatCount { + t.Errorf("Not all stats were written\ngot: %d\nwanted: %d", statCount, expectedStatCount) + } + + stats = stats[:statCount] + for _, stat := range stats { + name := strings.Split(stat, ":")[0] + value := strings.Split(stat, ":")[1] + sampleRate := strings.Split(value, ("|@"))[1] + if name == "test1" && sampleRate != "1.00" { + t.Errorf("The test1 stat was written without a 1.00 sample rate: %s", stat) + } else if name == "test2" && sampleRate != "0.33" { + t.Errorf("The test2 stat was written without a 0.33 sample rate: %s", stat) + } else if name != "test1" && name != "test2" { + t.Errorf("A unknown stat was written: %s", stat) + } + } + + cancel() + wg.Wait() + + os.Unsetenv("GOSTATS_USE_RESERVOIR_TIMER") +} // Ensure 0 counters are not flushed func TestZeroCounters(t *testing.T) { From 264192494f7a8ae6f5c6d8c11d4c1c928fc81cd4 Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Thu, 16 Jan 2025 16:49:35 -0500 Subject: [PATCH 24/32] add test for concurrent reservoir writes and flushing --- stats.go | 2 +- stats_test.go | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/stats.go b/stats.go index 9dc6833..03457c8 100644 --- a/stats.go +++ b/stats.go @@ -387,7 +387,7 @@ func (t *reservoirTimer) AddValue(value float64) { t.mu.Lock() defer t.mu.Unlock() - // t.ringMask isn't ever changed and accessing it here is protected by the mutex + // t.ringMask isn't ever changed so the access to should fine t.values[atomic.LoadUint64(&t.count)&t.ringMask] = value atomic.AddUint64(&t.count, 1) } diff --git a/stats_test.go b/stats_test.go index 658f9e4..99aad3b 100644 --- a/stats_test.go +++ b/stats_test.go @@ -479,6 +479,57 @@ func TestTimerReservoir_AutomaticFlush(t *testing.T) { os.Unsetenv("GOSTATS_USE_RESERVOIR_TIMER") } +func TestTimerReservoir_ConcurrentFlushingWhileWrites(t *testing.T) { + err := os.Setenv("GOSTATS_USE_RESERVOIR_TIMER", "true") + if err != nil { + t.Fatalf("Failed to set GOSTATS_USE_RESERVOIR_TIMER environment variable: %s", err) + } + + flushIntervalMs := 5 + expectedStatCount := FixedTimerReservoirSize * 2 + + ts, sink := setupTestNetSink(t, "tcp", false) + store := newStatStore(sink, true, GetSettings()) + + ctx, cancel := context.WithCancel(context.Background()) + + wg := &sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + store.StartContext(ctx, time.NewTicker(time.Duration(flushIntervalMs)*time.Millisecond)) + }() + + statsToSend := expectedStatCount + for i := 0; i < statsToSend; i++ { + store.NewTimer("test").AddValue(float64(i % 10)) + time.Sleep(time.Duration(flushIntervalMs/5) * time.Millisecond) + } + + time.Sleep(time.Duration(flushIntervalMs+1) * time.Millisecond) // guarentee we finish flushing + + stats := strings.Split(ts.String(), "\n") + statCount := len(stats) - 1 // there will be 1 extra new line character at the end of the buffer + if statCount != expectedStatCount { + t.Errorf("Not all stats were written\ngot: %d\nwanted: %d", statCount, expectedStatCount) + } + + stats = stats[:statCount] + for _, stat := range stats { + value := strings.Split(stat, ":")[1] + sampleRate := strings.Split(value, ("|@"))[1] + if sampleRate != "1.00" { + t.Errorf("The test1 stat was written without a 1.00 sample rate: %s", stat) + } + } + + cancel() + wg.Wait() + + os.Unsetenv("GOSTATS_USE_RESERVOIR_TIMER") +} + // Ensure 0 counters are not flushed func TestZeroCounters(t *testing.T) { sink := &testStatSink{} From 858a3fdde936483c16278d5f634e4cb462b152ae Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Thu, 16 Jan 2025 16:50:30 -0500 Subject: [PATCH 25/32] fix typo in comment --- stats_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stats_test.go b/stats_test.go index 99aad3b..0cb4f7d 100644 --- a/stats_test.go +++ b/stats_test.go @@ -507,7 +507,7 @@ func TestTimerReservoir_ConcurrentFlushingWhileWrites(t *testing.T) { time.Sleep(time.Duration(flushIntervalMs/5) * time.Millisecond) } - time.Sleep(time.Duration(flushIntervalMs+1) * time.Millisecond) // guarentee we finish flushing + time.Sleep(time.Duration(flushIntervalMs+1) * time.Millisecond) // guarantee we finish flushing stats := strings.Split(ts.String(), "\n") statCount := len(stats) - 1 // there will be 1 extra new line character at the end of the buffer From 4e9611db7de9ca240f59d30bb3c9cbc5d8ed3ac4 Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Thu, 16 Jan 2025 18:14:03 -0500 Subject: [PATCH 26/32] protect writes while flushing --- stats.go | 34 ++++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/stats.go b/stats.go index 03457c8..071c887 100644 --- a/stats.go +++ b/stats.go @@ -325,6 +325,8 @@ type timer interface { GetValue(int) float64 ValueCount() int SampleRate() float64 + Lock() + Unlock() Reset() } @@ -362,6 +364,12 @@ func (t *standardTimer) SampleRate() float64 { return 1.0 // metrics which are not sampled have an implicit sample rate 1.0 } +// no support or need for concurrency +func (t *standardTimer) Lock() {} + +// no support or need for concurrency +func (t *standardTimer) Unlock() {} + // nothing to persisted in memroy for this timer func (t *standardTimer) Reset() {} @@ -369,8 +377,8 @@ type reservoirTimer struct { mu sync.Mutex base time.Duration name string - ringSize uint64 // just used so that we don't have to re-evaluate capacity of values - ringMask uint64 + ringSize uint64 // this value shouldn't change, just used so that we don't have to re-evaluate capacity of values + ringMask uint64 // this value shouldn't change values []float64 count uint64 } @@ -387,7 +395,6 @@ func (t *reservoirTimer) AddValue(value float64) { t.mu.Lock() defer t.mu.Unlock() - // t.ringMask isn't ever changed so the access to should fine t.values[atomic.LoadUint64(&t.count)&t.ringMask] = value atomic.AddUint64(&t.count, 1) } @@ -397,15 +404,12 @@ func (t *reservoirTimer) AllocateSpan() Timespan { } func (t *reservoirTimer) GetValue(index int) float64 { - t.mu.Lock() - defer t.mu.Unlock() - return t.values[index] } func (t *reservoirTimer) ValueCount() int { count := atomic.LoadUint64(&t.count) - ringSize := atomic.LoadUint64(&t.ringSize) + ringSize := t.ringSize if count > ringSize { return int(ringSize) @@ -415,7 +419,7 @@ func (t *reservoirTimer) ValueCount() int { func (t *reservoirTimer) SampleRate() float64 { count := atomic.LoadUint64(&t.count) - ringSize := atomic.LoadUint64(&t.ringSize) + ringSize := t.ringSize if count <= ringSize { return 1.0 @@ -423,6 +427,14 @@ func (t *reservoirTimer) SampleRate() float64 { return float64(ringSize) / float64(count) } +func (t *reservoirTimer) Lock() { + t.mu.Lock() +} + +func (t *reservoirTimer) Unlock() { + t.mu.Unlock() +} + func (t *reservoirTimer) Reset() { atomic.StoreUint64(&t.count, 0) } @@ -504,6 +516,11 @@ func (s *statStore) Flush() { s.timers.Range(func(key, v interface{}) bool { if timer, ok := v.(*reservoirTimer); ok { + // lock while flushing to: + // 1. provide correct sample rate + // 2. allow for exit despite continuous writes + // 3. reduce metric loss from writes after flush and before reset + timer.Lock() sampleRate := timer.SampleRate() // since the map memory is reused only process what we accumulated in the current processing itteration @@ -512,6 +529,7 @@ func (s *statStore) Flush() { } timer.Reset() + timer.Unlock() } return true From 70cc61c4d3b90a3860f5c85a09a3fd96d7677046 Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Thu, 16 Jan 2025 18:18:20 -0500 Subject: [PATCH 27/32] dont export controls that can result in a deadlock or datarace --- stats.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/stats.go b/stats.go index 071c887..6828c7d 100644 --- a/stats.go +++ b/stats.go @@ -319,15 +319,15 @@ const ( type timer interface { time(time.Duration) + lock() + unlock() + reset() AddDuration(time.Duration) AddValue(float64) AllocateSpan() Timespan GetValue(int) float64 ValueCount() int SampleRate() float64 - Lock() - Unlock() - Reset() } type standardTimer struct { @@ -365,13 +365,13 @@ func (t *standardTimer) SampleRate() float64 { } // no support or need for concurrency -func (t *standardTimer) Lock() {} +func (t *standardTimer) lock() {} // no support or need for concurrency -func (t *standardTimer) Unlock() {} +func (t *standardTimer) unlock() {} // nothing to persisted in memroy for this timer -func (t *standardTimer) Reset() {} +func (t *standardTimer) reset() {} type reservoirTimer struct { mu sync.Mutex @@ -427,15 +427,15 @@ func (t *reservoirTimer) SampleRate() float64 { return float64(ringSize) / float64(count) } -func (t *reservoirTimer) Lock() { +func (t *reservoirTimer) lock() { t.mu.Lock() } -func (t *reservoirTimer) Unlock() { +func (t *reservoirTimer) unlock() { t.mu.Unlock() } -func (t *reservoirTimer) Reset() { +func (t *reservoirTimer) reset() { atomic.StoreUint64(&t.count, 0) } @@ -520,7 +520,7 @@ func (s *statStore) Flush() { // 1. provide correct sample rate // 2. allow for exit despite continuous writes // 3. reduce metric loss from writes after flush and before reset - timer.Lock() + timer.lock() sampleRate := timer.SampleRate() // since the map memory is reused only process what we accumulated in the current processing itteration @@ -528,8 +528,8 @@ func (s *statStore) Flush() { s.sink.FlushSampledTimer(key.(string), timer.GetValue(i), sampleRate) } - timer.Reset() - timer.Unlock() + timer.reset() + timer.unlock() } return true From b352a4f99c3fa412332bc955def21b4755e3110a Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Thu, 16 Jan 2025 18:29:57 -0500 Subject: [PATCH 28/32] add critical optimization todo --- stats.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/stats.go b/stats.go index 6828c7d..977eb07 100644 --- a/stats.go +++ b/stats.go @@ -516,6 +516,8 @@ func (s *statStore) Flush() { s.timers.Range(func(key, v interface{}) bool { if timer, ok := v.(*reservoirTimer); ok { + // todo: this locking is uncessary, rewrite reservoirTimer to return all values and clear the counter and unlock right away, we can calculate sample rate here + // lock while flushing to: // 1. provide correct sample rate // 2. allow for exit despite continuous writes From ef8cf0bd1467c89862ca888470f05cfcf277aa3b Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Fri, 17 Jan 2025 10:20:39 -0500 Subject: [PATCH 29/32] simplify reservoir processing --- stats.go | 108 +++++++++++++++++--------------------------------- stats_test.go | 8 ++-- 2 files changed, 40 insertions(+), 76 deletions(-) diff --git a/stats.go b/stats.go index 977eb07..5e7ec7b 100644 --- a/stats.go +++ b/stats.go @@ -319,15 +319,10 @@ const ( type timer interface { time(time.Duration) - lock() - unlock() - reset() AddDuration(time.Duration) AddValue(float64) AllocateSpan() Timespan - GetValue(int) float64 - ValueCount() int - SampleRate() float64 + Empty() ([]float64, uint64) } type standardTimer struct { @@ -352,33 +347,17 @@ func (t *standardTimer) AllocateSpan() Timespan { return ×pan{timer: t, start: time.Now()} } -func (t *standardTimer) GetValue(_ int) float64 { - return 0.0 // since we flush right away nothing will be collected +// values are not collected for this timer +func (t *standardTimer) Empty() ([]float64, uint64) { + return nil, 0 } -func (t *standardTimer) ValueCount() int { - return 0 // since we flush right away nothing will be collected -} - -func (t *standardTimer) SampleRate() float64 { - return 1.0 // metrics which are not sampled have an implicit sample rate 1.0 -} - -// no support or need for concurrency -func (t *standardTimer) lock() {} - -// no support or need for concurrency -func (t *standardTimer) unlock() {} - -// nothing to persisted in memroy for this timer -func (t *standardTimer) reset() {} - type reservoirTimer struct { mu sync.Mutex base time.Duration name string - ringSize uint64 // this value shouldn't change, just used so that we don't have to re-evaluate capacity of values - ringMask uint64 // this value shouldn't change + RingSize uint64 + ringMask uint64 values []float64 count uint64 } @@ -395,48 +374,36 @@ func (t *reservoirTimer) AddValue(value float64) { t.mu.Lock() defer t.mu.Unlock() - t.values[atomic.LoadUint64(&t.count)&t.ringMask] = value - atomic.AddUint64(&t.count, 1) + t.values[t.count&t.ringMask] = value + t.count++ } func (t *reservoirTimer) AllocateSpan() Timespan { return ×pan{timer: t, start: time.Now()} } -func (t *reservoirTimer) GetValue(index int) float64 { - return t.values[index] -} - -func (t *reservoirTimer) ValueCount() int { - count := atomic.LoadUint64(&t.count) - ringSize := t.ringSize - - if count > ringSize { - return int(ringSize) - } - return int(count) -} +// resets the reservoir returning all valeus collected and a count of the total inflow, +// including what exceeded the collection size and was dropped +func (t *reservoirTimer) Empty() ([]float64, uint64) { + t.mu.Lock() + defer t.mu.Unlock() -func (t *reservoirTimer) SampleRate() float64 { - count := atomic.LoadUint64(&t.count) - ringSize := t.ringSize + count := t.count - if count <= ringSize { - return 1.0 + var accumulation uint64 + if count > t.RingSize { + accumulation = t.RingSize + } else { + accumulation = count } - return float64(ringSize) / float64(count) -} -func (t *reservoirTimer) lock() { - t.mu.Lock() -} + // make a copy to avoid data races + values := make([]float64, accumulation) + copy(values, t.values[:accumulation]) // since the slice memory is reused only copy what we accumulated in the current processing itteration -func (t *reservoirTimer) unlock() { - t.mu.Unlock() -} + t.count = 0 // new values can start being written to the slice -func (t *reservoirTimer) reset() { - atomic.StoreUint64(&t.count, 0) + return values, count } type timespan struct { @@ -516,22 +483,19 @@ func (s *statStore) Flush() { s.timers.Range(func(key, v interface{}) bool { if timer, ok := v.(*reservoirTimer); ok { - // todo: this locking is uncessary, rewrite reservoirTimer to return all values and clear the counter and unlock right away, we can calculate sample rate here - - // lock while flushing to: - // 1. provide correct sample rate - // 2. allow for exit despite continuous writes - // 3. reduce metric loss from writes after flush and before reset - timer.lock() - sampleRate := timer.SampleRate() - - // since the map memory is reused only process what we accumulated in the current processing itteration - for i := 0; i < timer.ValueCount(); i++ { - s.sink.FlushSampledTimer(key.(string), timer.GetValue(i), sampleRate) + values, count := timer.Empty() + reservoirSize := timer.RingSize + + var sampleRate float64 + if count <= reservoirSize { + sampleRate = 1.0 + } else { + sampleRate = float64(reservoirSize) / float64(count) } - timer.reset() - timer.unlock() + for _, value := range values { + s.sink.FlushSampledTimer(key.(string), value, sampleRate) + } } return true @@ -645,7 +609,7 @@ func (s *statStore) newTimer(serializedName string, base time.Duration) timer { t = &reservoirTimer{ name: serializedName, base: base, - ringSize: FixedTimerReservoirSize, + RingSize: FixedTimerReservoirSize, ringMask: FixedTimerReservoirSize - 1, values: make([]float64, FixedTimerReservoirSize), } diff --git a/stats_test.go b/stats_test.go index 0cb4f7d..706a7e1 100644 --- a/stats_test.go +++ b/stats_test.go @@ -485,7 +485,7 @@ func TestTimerReservoir_ConcurrentFlushingWhileWrites(t *testing.T) { t.Fatalf("Failed to set GOSTATS_USE_RESERVOIR_TIMER environment variable: %s", err) } - flushIntervalMs := 5 + flushIntervalMS := 5 expectedStatCount := FixedTimerReservoirSize * 2 ts, sink := setupTestNetSink(t, "tcp", false) @@ -498,16 +498,16 @@ func TestTimerReservoir_ConcurrentFlushingWhileWrites(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - store.StartContext(ctx, time.NewTicker(time.Duration(flushIntervalMs)*time.Millisecond)) + store.StartContext(ctx, time.NewTicker(time.Duration(flushIntervalMS)*time.Millisecond)) }() statsToSend := expectedStatCount for i := 0; i < statsToSend; i++ { store.NewTimer("test").AddValue(float64(i % 10)) - time.Sleep(time.Duration(flushIntervalMs/5) * time.Millisecond) + time.Sleep(time.Duration(flushIntervalMS/5) * time.Millisecond) } - time.Sleep(time.Duration(flushIntervalMs+1) * time.Millisecond) // guarantee we finish flushing + time.Sleep(time.Duration(flushIntervalMS+1) * time.Millisecond) // guarantee we finish flushing stats := strings.Split(ts.String(), "\n") statCount := len(stats) - 1 // there will be 1 extra new line character at the end of the buffer From 8cceead13cb6997e33ec2b86c1e6e731d5ffedaa Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Fri, 17 Jan 2025 10:48:34 -0500 Subject: [PATCH 30/32] unexport RingSize and document immutability --- stats.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/stats.go b/stats.go index 5e7ec7b..f38980c 100644 --- a/stats.go +++ b/stats.go @@ -356,8 +356,8 @@ type reservoirTimer struct { mu sync.Mutex base time.Duration name string - RingSize uint64 - ringMask uint64 + ringSize uint64 // immutable + ringMask uint64 // immutable values []float64 count uint64 } @@ -391,8 +391,8 @@ func (t *reservoirTimer) Empty() ([]float64, uint64) { count := t.count var accumulation uint64 - if count > t.RingSize { - accumulation = t.RingSize + if count > t.ringSize { + accumulation = t.ringSize } else { accumulation = count } @@ -484,7 +484,7 @@ func (s *statStore) Flush() { s.timers.Range(func(key, v interface{}) bool { if timer, ok := v.(*reservoirTimer); ok { values, count := timer.Empty() - reservoirSize := timer.RingSize + reservoirSize := timer.ringSize // assuming this is immutable var sampleRate float64 if count <= reservoirSize { @@ -609,7 +609,7 @@ func (s *statStore) newTimer(serializedName string, base time.Duration) timer { t = &reservoirTimer{ name: serializedName, base: base, - RingSize: FixedTimerReservoirSize, + ringSize: FixedTimerReservoirSize, ringMask: FixedTimerReservoirSize - 1, values: make([]float64, FixedTimerReservoirSize), } From 4cae9accdc9c0297ef4902262c0a3dab5a7b62f4 Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Fri, 17 Jan 2025 12:13:58 -0500 Subject: [PATCH 31/32] print to stdout for testing --- stats.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/stats.go b/stats.go index f38980c..473b2d4 100644 --- a/stats.go +++ b/stats.go @@ -2,6 +2,7 @@ package stats import ( "context" + "fmt" "strconv" "sync" "sync/atomic" @@ -496,6 +497,8 @@ func (s *statStore) Flush() { for _, value := range values { s.sink.FlushSampledTimer(key.(string), value, sampleRate) } + + fmt.Printf("Flushed %d timer samples for metric: %s", len(values), key.(string)) // todo: either remove or convert to debug logging } return true @@ -613,6 +616,7 @@ func (s *statStore) newTimer(serializedName string, base time.Duration) timer { ringMask: FixedTimerReservoirSize - 1, values: make([]float64, FixedTimerReservoirSize), } + fmt.Printf("New reservoirTimer created") // todo: either remove or convert to debug logging case standard: // this should allow backward compatible a backwards compatible fallback as standard is the zero value of s.timerType fallthrough default: @@ -621,6 +625,7 @@ func (s *statStore) newTimer(serializedName string, base time.Duration) timer { sink: s.sink, base: base, } + fmt.Printf("New standardTimer created") // todo: either remove or convert to debug logging } if v, loaded := s.timers.LoadOrStore(serializedName, t); loaded { From 7fe28932329b37556baf9ef181b24c6cd0af3e64 Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Fri, 17 Jan 2025 18:41:23 -0500 Subject: [PATCH 32/32] improve test logging --- logging_sink.go | 4 ++++ net_sink.go | 1 + stats.go | 18 ++++++++++++++---- stats_test.go | 6 +++--- 4 files changed, 22 insertions(+), 7 deletions(-) diff --git a/logging_sink.go b/logging_sink.go index 04c3aa8..f1c681a 100644 --- a/logging_sink.go +++ b/logging_sink.go @@ -107,3 +107,7 @@ func (s *loggingSink) Errorf(msg string, args ...interface{}) { func (s *loggingSink) Warnf(msg string, args ...interface{}) { s.logMessage("warn", fmt.Sprintf(msg, args...)) } + +func (s *loggingSink) Infof(msg string, args ...interface{}) { + s.logMessage("info", fmt.Sprintf(msg, args...)) +} diff --git a/net_sink.go b/net_sink.go index e0fde54..a35c2ac 100644 --- a/net_sink.go +++ b/net_sink.go @@ -20,6 +20,7 @@ import ( type Logger interface { Errorf(msg string, args ...interface{}) Warnf(msg string, args ...interface{}) + Infof(msg string, args ...interface{}) } const ( diff --git a/stats.go b/stats.go index 473b2d4..ba31963 100644 --- a/stats.go +++ b/stats.go @@ -2,7 +2,7 @@ package stats import ( "context" - "fmt" + "os" "strconv" "sync" "sync/atomic" @@ -218,6 +218,7 @@ func NewStore(sink Sink, _ bool) Store { return &statStore{ sink: sink, timerType: standard, + log: &loggingSink{writer: os.Stderr, now: time.Now}, // should always be initialized since this is not exported todo: not sure if we really need this } } @@ -433,6 +434,7 @@ type statStore struct { statGenerators []StatGenerator sink Sink + log Logger } var ReservedTagWords = map[string]bool{"asg": true, "az": true, "backend": true, "canary": true, "host": true, "period": true, "region": true, "shard": true, "window": true, "source": true, "project": true, "facet": true, "envoyservice": true} @@ -482,6 +484,9 @@ func (s *statStore) Flush() { return true }) + flushedTimers := 0 + skippedTimers := 0 + flushedStats := 0 s.timers.Range(func(key, v interface{}) bool { if timer, ok := v.(*reservoirTimer); ok { values, count := timer.Empty() @@ -498,11 +503,16 @@ func (s *statStore) Flush() { s.sink.FlushSampledTimer(key.(string), value, sampleRate) } - fmt.Printf("Flushed %d timer samples for metric: %s", len(values), key.(string)) // todo: either remove or convert to debug logging + flushedTimers++ + flushedStats += len(values) + } else { + skippedTimers++ } return true }) + s.log.Infof("Flushed %d timers including %d samples", flushedTimers, flushedStats) // todo: either remove or convert to debug logging + s.log.Infof("Ignored %d timers", skippedTimers) // todo: either remove or convert to debug logging flushableSink, ok := s.sink.(FlushableSink) if ok { @@ -616,7 +626,7 @@ func (s *statStore) newTimer(serializedName string, base time.Duration) timer { ringMask: FixedTimerReservoirSize - 1, values: make([]float64, FixedTimerReservoirSize), } - fmt.Printf("New reservoirTimer created") // todo: either remove or convert to debug logging + s.log.Infof("New reservoirTimer created") // todo: either remove or convert to debug logging case standard: // this should allow backward compatible a backwards compatible fallback as standard is the zero value of s.timerType fallthrough default: @@ -625,7 +635,7 @@ func (s *statStore) newTimer(serializedName string, base time.Duration) timer { sink: s.sink, base: base, } - fmt.Printf("New standardTimer created") // todo: either remove or convert to debug logging + s.log.Infof("New standardTimer created") // todo: either remove or convert to debug logging } if v, loaded := s.timers.LoadOrStore(serializedName, t); loaded { diff --git a/stats_test.go b/stats_test.go index 706a7e1..faf0fe0 100644 --- a/stats_test.go +++ b/stats_test.go @@ -604,8 +604,8 @@ func TestTagMapNotModified(t *testing.T) { } scopeGenerators := map[string]func() Scope{ - "statStore": func() Scope { return &statStore{} }, - "subScope": func() Scope { return newSubScope(&statStore{}, "name", nil) }, + "statStore": func() Scope { return &statStore{log: discardLogger()} }, + "subScope": func() Scope { return newSubScope(&statStore{log: discardLogger()}, "name", nil) }, } methodTestCases := map[string]TagMethod{ @@ -764,7 +764,7 @@ func TestPerInstanceStats(t *testing.T) { testPerInstanceMethods := func(t *testing.T, setupScope func(Scope) Scope) { for _, x := range testCases { sink := mock.NewSink() - scope := setupScope(&statStore{sink: sink}) + scope := setupScope(&statStore{sink: sink, log: discardLogger()}) scope.NewPerInstanceCounter("name", x.tags).Inc() scope.NewPerInstanceGauge("name", x.tags).Inc()