Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add reservoir timers #171

Open
wants to merge 33 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
72bc838
prototype a simple reservoir for timers
maciuszek Jan 8, 2025
f67aca4
clarify comment
maciuszek Jan 8, 2025
5cc34d9
Move s.timerCount inside timer.Range
bshramin Jan 8, 2025
f7fc55e
fix test messages
maciuszek Jan 8, 2025
4a0662a
fix spelling
maciuszek Jan 8, 2025
f35471d
Merge remote-tracking branch 'origin/master' into mattkuzminski/add-t…
maciuszek Jan 10, 2025
57ef42b
re-design with timer reservoirs correctly independent per mertic
maciuszek Jan 10, 2025
4610f55
add some more todos
maciuszek Jan 10, 2025
cc908b5
clean up redundant code
maciuszek Jan 10, 2025
b1a2def
some more clean up
maciuszek Jan 11, 2025
8eb942d
address todos
maciuszek Jan 13, 2025
5dd8757
fix comment
maciuszek Jan 13, 2025
0d3fb45
ensure memory and flush management for timers
maciuszek Jan 13, 2025
ea5ae6a
optimize reservoirTimer by utilizing a ring buffer
maciuszek Jan 14, 2025
e81d603
correct how we flush reusable timer entries
maciuszek Jan 15, 2025
74a26a1
add test for reused timer map after flushing
maciuszek Jan 15, 2025
6d2687c
correct the ring buffer implementation to utilize bitwise benefits
maciuszek Jan 15, 2025
d067744
improve reservoirTimer property access
maciuszek Jan 15, 2025
7e5a451
make reservoir tests more dynamic
maciuszek Jan 16, 2025
a54db1a
improve comments
maciuszek Jan 16, 2025
18c0e57
optimize reservoir timer flush
maciuszek Jan 16, 2025
bf0ef63
block never flush edge cases when stores are constructed outside of N…
maciuszek Jan 16, 2025
8dad5ed
fix typo in comment
maciuszek Jan 16, 2025
9762152
add test for reservoir automatic flushing
maciuszek Jan 16, 2025
2641924
add test for concurrent reservoir writes and flushing
maciuszek Jan 16, 2025
858a3fd
fix typo in comment
maciuszek Jan 16, 2025
4e9611d
protect writes while flushing
maciuszek Jan 16, 2025
70cc61c
dont export controls that can result in a deadlock or datarace
maciuszek Jan 16, 2025
b352a4f
add critical optimization todo
maciuszek Jan 16, 2025
ef8cf0b
simplify reservoir processing
maciuszek Jan 17, 2025
8cceead
unexport RingSize and document immutability
maciuszek Jan 17, 2025
4cae9ac
print to stdout for testing
maciuszek Jan 17, 2025
7fe2893
improve test logging
maciuszek Jan 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions logging_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (s *loggingSink) FlushGauge(name string, value uint64) { s.log(name, "gauge

func (s *loggingSink) FlushTimer(name string, value float64) { s.log(name, "timer", value) }

func (s *loggingSink) FlushAggregatedTimer(name string, value, _ float64) {
s.FlushTimer(name, value)
}

func (s *loggingSink) Flush() { s.log("", "all stats", 0) }

// Logger
Expand Down
6 changes: 6 additions & 0 deletions mock/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ func (s *Sink) FlushTimer(name string, val float64) {
atomic.AddInt64(&p.count, 1)
}

// FlushAggregatedTimer implements the stats.Sink.FlushAggregatedTimer method and adds val to
// stat name.
func (s *Sink) FlushAggregatedTimer(name string, val, _ float64) {
s.FlushTimer(name, val)
}

// LoadCounter returns the value for stat name and if it was found.
func (s *Sink) LoadCounter(name string) (uint64, bool) {
v, ok := s.counters().Load(name)
Expand Down
20 changes: 14 additions & 6 deletions net_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,23 @@ func (s *netSink) FlushGauge(name string, value uint64) {
}

func (s *netSink) FlushTimer(name string, value float64) {
// Since we mistakenly use floating point values to represent time
// durations this method is often passed an integer encoded as a
s.flushFloatOptimized(name, "|ms\n", value)
}

func (s *netSink) FlushAggregatedTimer(name string, value, sampleRate float64) {
suffix := fmt.Sprintf("|ms|@%.1f\n", sampleRate)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For performance reasons it would be great to avoid fmt.Sprintf here if possible.

s.flushFloatOptimized(name, suffix, value)
}

func (s *netSink) flushFloatOptimized(name, suffix string, value float64) {
// Since we historitically used floating point values to represent time
// durations, metrics (particularly timers) are often recorded as an integer encoded as a
// float. Formatting integers is much faster (>2x) than formatting
// floats so use integer formatting whenever possible.
//
// floats, so we should convert to an integer whenever possible.
if 0 <= value && value < math.MaxUint64 && math.Trunc(value) == value {
s.flushUint64(name, "|ms\n", uint64(value))
s.flushUint64(name, suffix, uint64(value))
} else {
s.flushFloat64(name, "|ms\n", value)
s.flushFloat64(name, suffix, value)
}
}

Expand Down
6 changes: 6 additions & 0 deletions net_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ func (s *testStatSink) FlushTimer(name string, value float64) {
s.Unlock()
}

func (s *testStatSink) FlushAggregatedTimer(name string, value, sampleRate float64) {
s.Lock()
s.record += fmt.Sprintf("%s:%f|ms|@%f\n", name, value, sampleRate)
s.Unlock()
}

func TestCreateTimer(t *testing.T) {
sink := &testStatSink{}
store := NewStore(sink, true)
Expand Down
2 changes: 2 additions & 0 deletions null_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ func (s nullSink) FlushGauge(name string, value uint64) {} //nolint:revive

func (s nullSink) FlushTimer(name string, value float64) {} //nolint:revive

func (s nullSink) FlushAggregatedTimer(name string, value, sampleRate float64) {} //nolint:revive

func (s nullSink) Flush() {}
11 changes: 11 additions & 0 deletions settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
DefaultFlushIntervalS = 5
// DefaultLoggingSinkDisabled is the default behavior of logging sink suppression, default is false.
DefaultLoggingSinkDisabled = false
DefaultTimerReservoirSize = 0
)

// The Settings type is used to configure gostats. gostats uses environment
Expand All @@ -38,6 +39,7 @@ type Settings struct {
// Disable the LoggingSink when USE_STATSD is false and use the NullSink instead.
// This will cause all stats to be silently dropped.
LoggingSinkDisabled bool `envconfig:"GOSTATS_LOGGING_SINK_DISABLED" default:"false"`
TimerReservoirSize int `envconfig:"GOSTATS_TIMER_RESERVOIR_SIZE" default:"0"`
}

// An envError is an error that occurred parsing an environment variable
Expand Down Expand Up @@ -101,17 +103,26 @@ func GetSettings() Settings {
if err != nil {
panic(err)
}
timerReservoirSize, err := envInt("GOSTATS_TIMER_RESERVOIR_SIZE", DefaultTimerReservoirSize)
if err != nil {
panic(err)
}
return Settings{
UseStatsd: useStatsd,
StatsdHost: envOr("STATSD_HOST", DefaultStatsdHost),
StatsdProtocol: envOr("STATSD_PROTOCOL", DefaultStatsdProtocol),
StatsdPort: statsdPort,
FlushIntervalS: flushIntervalS,
LoggingSinkDisabled: loggingSinkDisabled,
TimerReservoirSize: timerReservoirSize,
}
}

// FlushInterval returns the flush interval duration.
func (s *Settings) FlushInterval() time.Duration {
return time.Duration(s.FlushIntervalS) * time.Second
}

func (s *Settings) isTimerReservoirEnabled() bool {
return s.TimerReservoirSize > 0
}
1 change: 1 addition & 0 deletions sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type Sink interface {
FlushCounter(name string, value uint64)
FlushGauge(name string, value uint64)
FlushTimer(name string, value float64)
FlushAggregatedTimer(name string, value, sampleRate float64)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of this we'll need a new major release.

The library usage should be backwards compatible, and additionally there is a feature flag to control the new behaviour, however if anything is implementing this interface it'll break.

}

// FlushableSink is an extension of Sink that provides a Flush() function that
Expand Down
125 changes: 114 additions & 11 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,10 @@ type StatGenerator interface {
// NewStore returns an Empty store that flushes to Sink passed as an argument.
// Note: the export argument is unused.
func NewStore(sink Sink, _ bool) Store {
return &statStore{sink: sink}
return &statStore{
sink: sink,
conf: GetSettings(), // todo: right now the environment is being loaded in multiple places and can be made more efficient
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sync.OnceValue function can help eliminate these multiple loads.

}
}

// NewDefaultStore returns a Store with a TCP statsd sink, and a running flush timer.
Expand Down Expand Up @@ -298,30 +301,95 @@ func (c *gauge) Value() uint64 {
return atomic.LoadUint64(&c.value)
}

type timer struct {
type timer interface {
time(time.Duration)
AddDuration(time.Duration)
AddValue(float64)
AllocateSpan() Timespan
CollectedValue() []float64
SampleRate() float64
}

type standardTimer struct {
base time.Duration
name string
sink Sink
}

func (t *timer) time(dur time.Duration) {
func (t *standardTimer) time(dur time.Duration) {
t.AddDuration(dur)
}

func (t *timer) AddDuration(dur time.Duration) {
func (t *standardTimer) AddDuration(dur time.Duration) {
t.AddValue(float64(dur / t.base))
}

func (t *timer) AddValue(value float64) {
func (t *standardTimer) AddValue(value float64) {
t.sink.FlushTimer(t.name, value)
}

func (t *timer) AllocateSpan() Timespan {
func (t *standardTimer) AllocateSpan() Timespan {
return &timespan{timer: t, start: time.Now()}
}

func (t *standardTimer) CollectedValue() []float64 {
return nil // since we flush right away nothing will be collected
}

func (t *standardTimer) SampleRate() float64 {
return 1.0 // metrics which are not sampled have an implicit sample rate 1.0
}

type reservoirTimer struct {
base time.Duration
name string
capacity int
values []float64
count int
mu sync.Mutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit, make the Mutex the first field (it's advantageous performance wise to make the most accessed field first)

}

func (t *reservoirTimer) time(dur time.Duration) {
t.AddDuration(dur)
}

func (t *reservoirTimer) AddDuration(dur time.Duration) {
t.AddValue(float64(dur / t.base))
}

func (t *reservoirTimer) AddValue(value float64) {
t.mu.Lock()
defer t.mu.Unlock()

if t.count < t.capacity {
t.values = append(t.values, value)
} else {
t.values = append(t.values[1:], value) // discard the oldest value when the reservoir is full, this can probably be smarter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Treating this more as a ring buffer when at capacity would be better since it would provide a more accurate representation of the most recent timings (whereas here we only update the first timing):

diff --git a/stats.go b/stats.go
index 70a582b..99a0a0a 100644
--- a/stats.go
+++ b/stats.go
@@ -346,6 +346,7 @@ type reservoirTimer struct {
 	capacity int
 	values   []float64
 	count    int
+	off      int
 	mu       sync.Mutex
 }
 
@@ -364,7 +365,8 @@ func (t *reservoirTimer) AddValue(value float64) {
 	if t.count < t.capacity {
 		t.values = append(t.values, value)
 	} else {
-		t.values = append(t.values[1:], value) // discard the oldest value when the reservoir is full, this can probably be smarter
+		t.values[t.off%len(t.values)] = value
+		t.off++
 	}
 
 	t.count++

Copy link
Contributor

@charlievieth charlievieth Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, making capacity a power of two would allow for us to use a mask instead of an expensive modulo operation. This can be cheaply calculated using the bits package:

func nextPowerOfTwo(capacity int) uint {
	return 1 << bits.Len(uint(capacity))
}

}

t.count++
}

func (t *reservoirTimer) AllocateSpan() Timespan {
return &timespan{timer: t, start: time.Now()}
}

func (t *reservoirTimer) CollectedValue() []float64 {
t.mu.Lock()
defer t.mu.Unlock()

// return a copy of the values slice to avoid data races
values := make([]float64, len(t.values))
copy(values, t.values)
return values
}

func (t *reservoirTimer) SampleRate() float64 {
return float64(len(t.values)) / float64(t.count)
}

type timespan struct {
timer *timer
timer timer
start time.Time
}

Expand All @@ -336,6 +404,7 @@ func (ts *timespan) CompleteWithDuration(value time.Duration) {
}

type statStore struct {
// todo: no idea how memory is managed here, when are the map entries ever deleted?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assumption is the that stat names are stable over the lifetime of a process, but this is a known issue for processes that incorrectly use this library and cause a cardinality explosion of stat names. See:

counters sync.Map
gauges sync.Map
timers sync.Map
Expand All @@ -344,6 +413,8 @@ type statStore struct {
statGenerators []StatGenerator

sink Sink

conf Settings
}

var ReservedTagWords = map[string]bool{"asg": true, "az": true, "backend": true, "canary": true, "host": true, "period": true, "region": true, "shard": true, "window": true, "source": true, "project": true, "facet": true, "envoyservice": true}
Expand Down Expand Up @@ -380,6 +451,8 @@ func (s *statStore) Flush() {
}
s.mu.RUnlock()

// todo: if we're not deleting the data we flush from these maps, won't we just keep resending them?

s.counters.Range(func(key, v interface{}) bool {
// do not flush counters that are set to zero
if value := v.(*counter).latch(); value != 0 {
Expand All @@ -393,6 +466,18 @@ func (s *statStore) Flush() {
return true
})

s.timers.Range(func(key, v interface{}) bool {
if timer, ok := v.(*reservoirTimer); ok {
sampleRate := timer.SampleRate()
for _, value := range timer.CollectedValue() {
s.sink.FlushAggregatedTimer(key.(string), value, sampleRate)
}
s.timers.Delete(key) // delete it from the map so it's not flushed again
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deletion is expensive and since we assume the names of stats are stable will just lead to it being re-added to the map at a later time. For counters we only flush non-zero values and I think we could do something similar with the reservoirTimer type (say not flush if it has zero values and trim the slice to zero elements on flush).

}

return true
})

flushableSink, ok := s.sink.(FlushableSink)
if ok {
flushableSink.Flush()
Expand Down Expand Up @@ -490,14 +575,32 @@ func (s *statStore) NewPerInstanceGauge(name string, tags map[string]string) Gau
return s.newGaugeWithTagSet(name, tagspkg.TagSet(nil).MergePerInstanceTags(tags))
}

func (s *statStore) newTimer(serializedName string, base time.Duration) *timer {
func (s *statStore) newTimer(serializedName string, base time.Duration) timer {
if v, ok := s.timers.Load(serializedName); ok {
return v.(*timer)
return v.(timer)
}
t := &timer{name: serializedName, sink: s.sink, base: base}

var t timer
if s.conf.isTimerReservoirEnabled() {
t = &reservoirTimer{
name: serializedName,
base: base,
capacity: s.conf.TimerReservoirSize,
values: make([]float64, 0, s.conf.TimerReservoirSize),
count: 0,
}
} else {
t = &standardTimer{
name: serializedName,
sink: s.sink,
base: base,
}
}

if v, loaded := s.timers.LoadOrStore(serializedName, t); loaded {
return v.(*timer)
return v.(timer)
}

return t
}

Expand Down
Loading
Loading