Skip to content

Commit

Permalink
prototype a simple reservoir for timers
Browse files Browse the repository at this point in the history
  • Loading branch information
maciuszek committed Jan 8, 2025
1 parent 49e70f1 commit 72bc838
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 17 deletions.
11 changes: 11 additions & 0 deletions settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
DefaultFlushIntervalS = 5
// DefaultLoggingSinkDisabled is the default behavior of logging sink suppression, default is false.
DefaultLoggingSinkDisabled = false
DefaultTimerReservoirSize = 0
)

// The Settings type is used to configure gostats. gostats uses environment
Expand All @@ -38,6 +39,7 @@ type Settings struct {
// Disable the LoggingSink when USE_STATSD is false and use the NullSink instead.
// This will cause all stats to be silently dropped.
LoggingSinkDisabled bool `envconfig:"GOSTATS_LOGGING_SINK_DISABLED" default:"false"`
TimerReservoirSize int `envconfig:"GOSTATS_TIMER_RESERVOIR_SIZE" default:"0"`
}

// An envError is an error that occurred parsing an environment variable
Expand Down Expand Up @@ -101,17 +103,26 @@ func GetSettings() Settings {
if err != nil {
panic(err)
}
timerReservoirSize, err := envInt("GOSTATS_TIMER_RESERVOIR_SIZE", DefaultTimerReservoirSize)
if err != nil {
panic(err)
}
return Settings{
UseStatsd: useStatsd,
StatsdHost: envOr("STATSD_HOST", DefaultStatsdHost),
StatsdProtocol: envOr("STATSD_PROTOCOL", DefaultStatsdProtocol),
StatsdPort: statsdPort,
FlushIntervalS: flushIntervalS,
LoggingSinkDisabled: loggingSinkDisabled,
TimerReservoirSize: timerReservoirSize,
}
}

// FlushInterval returns the flush interval duration.
func (s *Settings) FlushInterval() time.Duration {
return time.Duration(s.FlushIntervalS) * time.Second
}

func (s *Settings) isTimerReservoirEnabled() bool {
return s.TimerReservoirSize > 0
}
97 changes: 87 additions & 10 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stats

import (
"context"
"math"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -298,30 +299,69 @@ func (c *gauge) Value() uint64 {
return atomic.LoadUint64(&c.value)
}

type timer struct {
type timer interface {
time(time.Duration)
AddDuration(time.Duration)
AddValue(float64)
AllocateSpan() Timespan
Value() float64
}

type standardTimer struct {
base time.Duration
name string
sink Sink
}

func (t *timer) time(dur time.Duration) {
func (t *standardTimer) time(dur time.Duration) {
t.AddDuration(dur)
}

func (t *timer) AddDuration(dur time.Duration) {
func (t *standardTimer) AddDuration(dur time.Duration) {
t.AddValue(float64(dur / t.base))
}

func (t *timer) AddValue(value float64) {
func (t *standardTimer) AddValue(value float64) {
t.sink.FlushTimer(t.name, value)
}

func (t *timer) AllocateSpan() Timespan {
func (t *standardTimer) AllocateSpan() Timespan {
return &timespan{timer: t, start: time.Now()}
}

func (t *standardTimer) Value() float64 {
return 0.0 // float zero value
}

type reservoirTimer struct {
base time.Duration
name string
value uint64
}

func (t *reservoirTimer) time(dur time.Duration) {
t.AddDuration(dur)
}

func (t *reservoirTimer) AddDuration(dur time.Duration) {
t.AddValue(float64(dur / t.base))
}

func (t *reservoirTimer) AddValue(value float64) {
// todo does this need to be atomtic? ideally for the the use case it won't/shouldn't be changed like a counter/gauge would be
atomic.StoreUint64(&t.value, math.Float64bits(value))
}

func (t *reservoirTimer) AllocateSpan() Timespan {
return &timespan{timer: t, start: time.Now()}
}

func (t *reservoirTimer) Value() float64 {
return math.Float64frombits(atomic.LoadUint64(&t.value))
}

type timespan struct {
timer *timer
timer timer
start time.Time
}

Expand All @@ -340,6 +380,8 @@ type statStore struct {
gauges sync.Map
timers sync.Map

timerCount int

mu sync.RWMutex
statGenerators []StatGenerator

Expand Down Expand Up @@ -393,6 +435,20 @@ func (s *statStore) Flush() {
return true
})

settings := GetSettings() // todo: move this to some shared memory
if settings.isTimerReservoirEnabled() {
s.timers.Range(func(key, v interface{}) bool {
// todo: maybe change this to not even add to the reservoir
// do not flush timers that are zero value
if value := v.(timer).Value(); value != 0.0 {
s.sink.FlushTimer(key.(string), v.(timer).Value())
}
s.timers.Delete(key)
s.timerCount--
return true
})
}

flushableSink, ok := s.sink.(FlushableSink)
if ok {
flushableSink.Flush()
Expand Down Expand Up @@ -490,14 +546,35 @@ func (s *statStore) NewPerInstanceGauge(name string, tags map[string]string) Gau
return s.newGaugeWithTagSet(name, tagspkg.TagSet(nil).MergePerInstanceTags(tags))
}

func (s *statStore) newTimer(serializedName string, base time.Duration) *timer {
func (s *statStore) newTimer(serializedName string, base time.Duration) timer {
if v, ok := s.timers.Load(serializedName); ok {
return v.(*timer)
return v.(timer)
}

var t timer
settings := GetSettings() // todo: move this to some shared memory
if settings.isTimerReservoirEnabled() {
t = &reservoirTimer{name: serializedName, base: base}

// todo: > shouldn't be necessary
if s.timerCount >= settings.TimerReservoirSize {
// this will delete 1 random timer in the map
s.timers.Range(func(key, _ interface{}) bool {
s.timers.Delete(key)
return false
})
s.timerCount--
}
} else {
t = &standardTimer{name: serializedName, sink: s.sink, base: base}
}
t := &timer{name: serializedName, sink: s.sink, base: base}

if v, loaded := s.timers.LoadOrStore(serializedName, t); loaded {
return v.(*timer)
return v.(timer)
}

s.timerCount++

return t
}

Expand Down
112 changes: 105 additions & 7 deletions stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/hex"
"fmt"
"math/rand"
"os"
"reflect"
"strconv"
"strings"
Expand Down Expand Up @@ -78,9 +79,9 @@ func TestValidateTags(t *testing.T) {
store.Flush()

expected := "test:1|c"
counter := sink.record
if !strings.Contains(counter, expected) {
t.Error("wanted counter value of test:1|c, got", counter)
output := sink.record
if !strings.Contains(output, expected) && !strings.Contains(output, "reserved_tag") {
t.Errorf("Expected without reserved tags: '%s' Got: '%s'", expected, output)
}

// A reserved tag should trigger adding the reserved_tag counter
Expand All @@ -89,10 +90,11 @@ func TestValidateTags(t *testing.T) {
store.NewCounterWithTags("test", map[string]string{"host": "i"}).Inc()
store.Flush()

expected = "reserved_tag:1|c\ntest.__host=i:1|c"
counter = sink.record
if !strings.Contains(counter, expected) {
t.Error("wanted counter value of test.___f=i:1|c, got", counter)
expected = "test.__host=i:1|c"
expectedReservedTag := "reserved_tag:1|c"
output = sink.record
if !strings.Contains(output, expected) && !strings.Contains(output, expectedReservedTag) {
t.Errorf("Expected: '%s' and '%s', In: '%s'", expected, expectedReservedTag, output)
}
}

Expand Down Expand Up @@ -126,6 +128,102 @@ func TestMilliTimer(t *testing.T) {
}
}

func TestTimerResevoir_Disabled(t *testing.T) {
err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "0")
if err != nil {
t.Fatalf("Failed to set GOSTATS_BATCH_ENABLED environment variable: %s", err)
}

expectedStatCount := 1000

ts, sink := setupTestNetSink(t, "tcp", false)
store := NewStore(sink, true)

for i := 0; i < 1000; i++ {
store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i % 10))
}

if ts.String() != "" {
t.Errorf("Stats were written despite forced batching")
}

store.Flush()

time.Sleep(1001 * time.Millisecond)

statCount := len(strings.Split(ts.String(), "\n")) - 1 // there will be 1 extra new line character at the end of the buffer
if statCount != expectedStatCount {
t.Errorf("Not all stats were written\ngot:\n%d\nwanted:\n%d\n", statCount, expectedStatCount)
}

os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE")
}

func TestTimerReservoir(t *testing.T) {
err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100")
if err != nil {
t.Fatalf("Failed to set GOSTATS_BATCH_ENABLED environment variable: %s", err)
}

expectedStatCount := 100

ts, sink := setupTestNetSink(t, "tcp", false)
store := NewStore(sink, true)

for i := 0; i < 1000; i++ {
store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i%10 + 1)) // don't create timers with 0 values to make the count deterministic
}

if ts.String() != "" {
t.Errorf("Stats were written despite forced batching")
}

store.Flush()

time.Sleep(1001 * time.Millisecond)

statCount := len(strings.Split(ts.String(), "\n")) - 1 // there will be 1 extra new line character at the end of the buffer
if statCount != expectedStatCount {
t.Errorf("Not all stats were written\ngot:\n%d\nwanted:\n%d\n", statCount, expectedStatCount)
}

os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE")
}

func TestTimerReservoir_FilteredZeros(t *testing.T) {
err := os.Setenv("GOSTATS_TIMER_RESERVOIR_SIZE", "100")
if err != nil {
t.Fatalf("Failed to set GOSTATS_BATCH_ENABLED environment variable: %s", err)
}

ts, sink := setupTestNetSink(t, "tcp", false)
store := NewStore(sink, true)

for i := 0; i < 1000; i++ {
store.NewTimer("test" + strconv.Itoa(i)).AddValue(float64(i % 10))
}

if ts.String() != "" {
t.Errorf("Stats were written despite forced batching")
}

store.Flush()

time.Sleep(1001 * time.Millisecond)

stats := strings.Split(ts.String(), "\n")
stats = stats[:len(stats)-1] // remove the extra new line character at the end of the buffer
for _, stat := range stats {
value := strings.Split(strings.Split(stat, ":")[1], ("|ms"))[0] // strip value and remove suffix and get raw number
if value == "0" {
t.Errorf("Got a zero value stat: %s", stat)
}

}

os.Unsetenv("GOSTATS_TIMER_RESERVOIR_SIZE")
}

// Ensure 0 counters are not flushed
func TestZeroCounters(t *testing.T) {
sink := &testStatSink{}
Expand Down

0 comments on commit 72bc838

Please sign in to comment.