From 72bc838ecc56dd776154aa5935731ebe10c8f7af Mon Sep 17 00:00:00 2001 From: Matthew Kuzminski Date: Wed, 8 Jan 2025 13:25:27 -0500 Subject: [PATCH] prototype a simple reservoir for timers --- settings.go | 11 +++++ stats.go | 97 ++++++++++++++++++++++++++++++++++++++----- stats_test.go | 112 ++++++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 203 insertions(+), 17 deletions(-) diff --git a/settings.go b/settings.go index 5b6770e..3a4f268 100644 --- a/settings.go +++ b/settings.go @@ -20,6 +20,7 @@ const ( DefaultFlushIntervalS = 5 // DefaultLoggingSinkDisabled is the default behavior of logging sink suppression, default is false. DefaultLoggingSinkDisabled = false + DefaultTimerReservoirSize = 0 ) // The Settings type is used to configure gostats. gostats uses environment @@ -38,6 +39,7 @@ type Settings struct { // Disable the LoggingSink when USE_STATSD is false and use the NullSink instead. // This will cause all stats to be silently dropped. LoggingSinkDisabled bool `envconfig:"GOSTATS_LOGGING_SINK_DISABLED" default:"false"` + TimerReservoirSize int `envconfig:"GOSTATS_TIMER_RESERVOIR_SIZE" default:"0"` } // An envError is an error that occurred parsing an environment variable @@ -101,6 +103,10 @@ func GetSettings() Settings { if err != nil { panic(err) } + timerReservoirSize, err := envInt("GOSTATS_TIMER_RESERVOIR_SIZE", DefaultTimerReservoirSize) + if err != nil { + panic(err) + } return Settings{ UseStatsd: useStatsd, StatsdHost: envOr("STATSD_HOST", DefaultStatsdHost), @@ -108,6 +114,7 @@ func GetSettings() Settings { StatsdPort: statsdPort, FlushIntervalS: flushIntervalS, LoggingSinkDisabled: loggingSinkDisabled, + TimerReservoirSize: timerReservoirSize, } } @@ -115,3 +122,7 @@ func GetSettings() Settings { func (s *Settings) FlushInterval() time.Duration { return time.Duration(s.FlushIntervalS) * time.Second } + +func (s *Settings) isTimerReservoirEnabled() bool { + return s.TimerReservoirSize > 0 +} diff --git a/stats.go b/stats.go index 9f167bd..cd2ceb0 100644 --- a/stats.go +++ b/stats.go @@ -2,6 +2,7 @@ package stats import ( "context" + "math" "strconv" "sync" "sync/atomic" @@ -298,30 +299,69 @@ func (c *gauge) Value() uint64 { return atomic.LoadUint64(&c.value) } -type timer struct { +type timer interface { + time(time.Duration) + AddDuration(time.Duration) + AddValue(float64) + AllocateSpan() Timespan + Value() float64 +} + +type standardTimer struct { base time.Duration name string sink Sink } -func (t *timer) time(dur time.Duration) { +func (t *standardTimer) time(dur time.Duration) { t.AddDuration(dur) } -func (t *timer) AddDuration(dur time.Duration) { +func (t *standardTimer) AddDuration(dur time.Duration) { t.AddValue(float64(dur / t.base)) } -func (t *timer) AddValue(value float64) { +func (t *standardTimer) AddValue(value float64) { t.sink.FlushTimer(t.name, value) } -func (t *timer) AllocateSpan() Timespan { +func (t *standardTimer) AllocateSpan() Timespan { return ×pan{timer: t, start: time.Now()} } +func (t *standardTimer) Value() float64 { + return 0.0 // float zero value +} + +type reservoirTimer struct { + base time.Duration + name string + value uint64 +} + +func (t *reservoirTimer) time(dur time.Duration) { + t.AddDuration(dur) +} + +func (t *reservoirTimer) AddDuration(dur time.Duration) { + t.AddValue(float64(dur / t.base)) +} + +func (t *reservoirTimer) AddValue(value float64) { + // todo does this need to be atomtic? ideally for the the use case it won't/shouldn't be changed like a counter/gauge would be + atomic.StoreUint64(&t.value, math.Float64bits(value)) +} + +func (t *reservoirTimer) AllocateSpan() Timespan { + return ×pan{timer: t, start: time.Now()} +} + +func (t *reservoirTimer) Value() float64 { + return math.Float64frombits(atomic.LoadUint64(&t.value)) +} + type timespan struct { - timer *timer + timer timer start time.Time } @@ -340,6 +380,8 @@ type statStore struct { gauges sync.Map timers sync.Map + timerCount int + mu sync.RWMutex statGenerators []StatGenerator @@ -393,6 +435,20 @@ func (s *statStore) Flush() { return true }) + settings := GetSettings() // todo: move this to some shared memory + if settings.isTimerReservoirEnabled() { + s.timers.Range(func(key, v interface{}) bool { + // todo: maybe change this to not even add to the reservoir + // do not flush timers that are zero value + if value := v.(timer).Value(); value != 0.0 { + s.sink.FlushTimer(key.(string), v.(timer).Value()) + } + s.timers.Delete(key) + s.timerCount-- + return true + }) + } + flushableSink, ok := s.sink.(FlushableSink) if ok { flushableSink.Flush() @@ -490,14 +546,35 @@ func (s *statStore) NewPerInstanceGauge(name string, tags map[string]string) Gau return s.newGaugeWithTagSet(name, tagspkg.TagSet(nil).MergePerInstanceTags(tags)) } -func (s *statStore) newTimer(serializedName string, base time.Duration) *timer { +func (s *statStore) newTimer(serializedName string, base time.Duration) timer { if v, ok := s.timers.Load(serializedName); ok { - return v.(*timer) + return v.(timer) + } + + var t timer + settings := GetSettings() // todo: move this to some shared memory + if settings.isTimerReservoirEnabled() { + t = &reservoirTimer{name: serializedName, base: base} + + // todo: > shouldn't be necessary + if s.timerCount >= settings.TimerReservoirSize { + // this will delete 1 random timer in the map + s.timers.Range(func(key, _ interface{}) bool { + s.timers.Delete(key) + return false + }) + s.timerCount-- + } + } else { + t = &standardTimer{name: serializedName, sink: s.sink, base: base} } - t := &timer{name: serializedName, sink: s.sink, base: base} + if v, loaded := s.timers.LoadOrStore(serializedName, t); loaded { - return v.(*timer) + return v.(timer) } + + s.timerCount++ + return t } diff --git a/stats_test.go b/stats_test.go index 37dbbed..329a396 100644 --- a/stats_test.go +++ b/stats_test.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "fmt" "math/rand" + "os" "reflect" "strconv" "strings" @@ -78,9 +79,9 @@ func TestValidateTags(t *testing.T) { store.Flush() expected := "test:1|c" - counter := sink.record - if !strings.Contains(counter, expected) { - t.Error("wanted counter value of test:1|c, got", counter) + output := sink.record + if !strings.Contains(output, expected) && !strings.Contains(output, "reserved_tag") { + t.Errorf("Expected without reserved tags: '%s' Got: '%s'", expected, output) } // A reserved tag should trigger adding the reserved_tag counter @@ -89,10 +90,11 @@ func TestValidateTags(t *testing.T) { store.NewCounterWithTags("test", map[string]string{"host": "i"}).Inc() store.Flush() - expected = "reserved_tag:1|c\ntest.__host=i:1|c" - counter = sink.record - if !strings.Contains(counter, expected) { - t.Error("wanted counter value of test.___f=i:1|c, got", counter) + expected = "test.__host=i:1|c" + expectedReservedTag := "reserved_tag:1|c" + output = sink.record + if !strings.Contains(output, expected) && !strings.Contains(output, expectedReservedTag) { + t.Errorf("Expected: '%s' and '%s', In: '%s'", expected, expectedReservedTag, output) } } @@ -126,6 +128,102 @@ func TestMilliTimer(t *testing.T) { } } +func TestTimerResevoir_Disabled(t *testing.T) { + err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "0") + if err != nil { + t.Fatalf("Failed to set GOSTATS_BATCH_ENABLED environment variable: %s", err) + } + + expectedStatCount := 1000 + + ts, sink := setupTestNetSink(t, "tcp", false) + store := NewStore(sink, true) + + for i := 0; i < 1000; i++ { + store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i % 10)) + } + + if ts.String() != "" { + t.Errorf("Stats were written despite forced batching") + } + + store.Flush() + + time.Sleep(1001 * time.Millisecond) + + statCount := len(strings.Split(ts.String(), "\n")) - 1 // there will be 1 extra new line character at the end of the buffer + if statCount != expectedStatCount { + t.Errorf("Not all stats were written\ngot:\n%d\nwanted:\n%d\n", statCount, expectedStatCount) + } + + os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE") +} + +func TestTimerReservoir(t *testing.T) { + err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100") + if err != nil { + t.Fatalf("Failed to set GOSTATS_BATCH_ENABLED environment variable: %s", err) + } + + expectedStatCount := 100 + + ts, sink := setupTestNetSink(t, "tcp", false) + store := NewStore(sink, true) + + for i := 0; i < 1000; i++ { + store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i%10 + 1)) // don't create timers with 0 values to make the count deterministic + } + + if ts.String() != "" { + t.Errorf("Stats were written despite forced batching") + } + + store.Flush() + + time.Sleep(1001 * time.Millisecond) + + statCount := len(strings.Split(ts.String(), "\n")) - 1 // there will be 1 extra new line character at the end of the buffer + if statCount != expectedStatCount { + t.Errorf("Not all stats were written\ngot:\n%d\nwanted:\n%d\n", statCount, expectedStatCount) + } + + os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE") +} + +func TestTimerReservoir_FilteredZeros(t *testing.T) { + err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100") + if err != nil { + t.Fatalf("Failed to set GOSTATS_BATCH_ENABLED environment variable: %s", err) + } + + ts, sink := setupTestNetSink(t, "tcp", false) + store := NewStore(sink, true) + + for i := 0; i < 1000; i++ { + store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i % 10)) + } + + if ts.String() != "" { + t.Errorf("Stats were written despite forced batching") + } + + store.Flush() + + time.Sleep(1001 * time.Millisecond) + + stats := strings.Split(ts.String(), "\n") + stats = stats[:len(stats)-1] // remove the extra new line character at the end of the buffer + for _, stat := range stats { + value := strings.Split(strings.Split(stat, ":")[1], ("|ms"))[0] // strip value and remove suffix and get raw number + if value == "0" { + t.Errorf("Got a zero value stat: %s", stat) + } + + } + + os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE") +} + // Ensure 0 counters are not flushed func TestZeroCounters(t *testing.T) { sink := &testStatSink{}