Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support to batch metric and send on a strict interval #169

Closed
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
d0c14db
prototype a more definite flush interval control restricting when any…
maciuszek Dec 19, 2024
cefa4c8
fix settings
maciuszek Dec 19, 2024
2f51fd7
add todo to review buffering capacity
maciuszek Dec 19, 2024
384f462
improve naming
maciuszek Dec 20, 2024
90a0f82
fix outc processing competition and allow batching
maciuszek Dec 23, 2024
391d649
fix pre-exisiting volatile test
maciuszek Dec 23, 2024
257ebc4
optimize delays for concurrent action in tests
maciuszek Dec 23, 2024
0d06f21
major fix: previously the batching would have blocked once the buffer…
maciuszek Dec 23, 2024
dba2df8
fix comment
maciuszek Dec 23, 2024
3d32fa5
fix spelling for linter
maciuszek Dec 23, 2024
101187d
a bit of a redesign with improvements to code breakdown
maciuszek Dec 24, 2024
5a6293c
fix comments
maciuszek Dec 24, 2024
446188d
readd doflush outc drain logic
maciuszek Dec 30, 2024
c3b9a08
improve comments
maciuszek Dec 30, 2024
b3722fa
refactor to support retries in batch send
maciuszek Dec 30, 2024
1c80dae
fix lint
maciuszek Dec 30, 2024
d1944c2
add ugly code to cover batching in all scenarios cases and consider e…
maciuszek Dec 31, 2024
b7f0a3d
fix spelling for linter
maciuszek Dec 31, 2024
b1ed20e
guarentee batch doesn't starve
maciuszek Dec 31, 2024
eeb03f2
expand some comments and add todo
maciuszek Jan 2, 2025
40cac66
improve batch send timeout
maciuszek Jan 2, 2025
3837a52
add tests for batch autosend
maciuszek Jan 2, 2025
10b9773
clean up comments
maciuszek Jan 2, 2025
5828040
fix spelling for linter
maciuszek Jan 2, 2025
a46cf32
fix batch send failure remining batch logic
maciuszek Jan 2, 2025
51ee282
fix lint errors
maciuszek Jan 2, 2025
d3e4b19
improve batch send tests
maciuszek Jan 3, 2025
747fbc2
made batch configuration more granular
maciuszek Jan 3, 2025
9c6f78f
fix spelling
maciuszek Jan 3, 2025
25cd271
improve comments
maciuszek Jan 3, 2025
a8ec4d4
improve variable naming
maciuszek Jan 6, 2025
abc1e18
restructure
maciuszek Jan 6, 2025
108884c
refactor how we send metrics with an interface design optimizing loop…
maciuszek Jan 6, 2025
1a0e7d4
fix spelling for linter
maciuszek Jan 6, 2025
9b67c65
only create connection when we need to for batch sends
maciuszek Jan 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 34 additions & 15 deletions net_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ func NewNetSink(opts ...SinkOption) FlushableSink {
bufSize = defaultBufferSizeTCP
}

s.outc = make(chan *bytes.Buffer, approxMaxMemBytes/bufSize)
s.retryc = make(chan *bytes.Buffer, 1) // It should be okay to limit this given we preferentially process from this over outc.
s.outc = make(chan *bytes.Buffer, approxMaxMemBytes/bufSize) // todo: need to understand why/how this number was chosen and probably elevate it
Copy link
Contributor Author

@maciuszek maciuszek Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make threading possible forking #169 (review) to here . @sokada1221.

So this doesn't restrict memory allocated, but the amount of slots per metric/string.
If it exceeds it'll block more stats for being written until read, it wouldn't act as a batching mechanism 🤔. buffered channeled are a bit strange, i think in actuality this buffer will always be full, if we would change it to normal channel with no buffer (always block), i don't think we would see an impact.

s.retryc = make(chan *bytes.Buffer, 1) // It should be okay to limit this given we preferentially process from this over outc.

writer := &sinkWriter{outc: s.outc}
s.bufWriter = bufio.NewWriterSize(writer, bufSize)
Expand Down Expand Up @@ -274,11 +274,16 @@ func (s *netSink) FlushTimer(name string, value float64) {

func (s *netSink) run() {
addr := net.JoinHostPort(s.conf.StatsdHost, strconv.Itoa(s.conf.StatsdPort))
batch := GetSettings().ForcedBatching

batchc := make(chan *bytes.Buffer, cap(s.outc))

var reconnectFailed bool // true if last reconnect failed

t := time.NewTicker(flushInterval)
defer t.Stop()

// metric write loop (actively draining content of outc channel)
for {
if s.conn == nil {
if err := s.connect(addr); err != nil {
Expand Down Expand Up @@ -306,38 +311,52 @@ func (s *netSink) run() {
s.handleFlushErrorSize(err, buf.Len())
s.mu.Unlock()
}
putBuffer(buf)
putBuffer(buf) // todo understand: we write the stats buffer we have successfully sent to a pool, but anytime we access data from that pool, we clear it and reuse the allocation for the next stat
continue
default:
// Drop through in case retryc has nothing.
}

// if the channel is at capacity, it's blocking, let's signal for it to be flushed and unblock
if len(s.outc) == cap(s.outc) {
s.Flush()
}

select {
case <-t.C:
s.flush()
case done := <-s.doFlush:
// Only flush pending buffers, this prevents an issue where
// continuous writes prevent the flush loop from exiting.
//
// If there is an error writeToConn() will set the conn to
// nil thus breaking the loop.
//
n := len(s.outc)
n := len(batchc)
for i := 0; i < n && s.conn != nil; i++ {
buf := <-s.outc
buf := <-batchc
if err := s.writeToConn(buf); err != nil {
s.retryc <- buf
continue
}
putBuffer(buf)
putBuffer(buf) // todo understand: we write the stats buffer we have successfully sent to a pool, but anytime we access data from that pool, we clear it and reuse the allocation for the next stat
}

close(done)
case <-t.C:
s.flush()
case buf := <-s.outc:
// Normally we will send stats anytime outc has data
//
// Gauages and Counters are written to outc at a cadence of GOSTATS_FLUSH_INTERVAL_SECONDS
// Timers are written adhoc to outc
//
// With batch we will rely on doFlush which is also controlled by the GOSTATS_FLUSH_INTERVAL_SECONDS
//
// Side effects:
// * Implied Timer batching under the flush interval
// * ~1 second delay to Gauge and Counter writes by batching these writes to the next internval
if batch {
batchc <- buf
continue
}
if err := s.writeToConn(buf); err != nil {
s.retryc <- buf
continue
}
putBuffer(buf)
putBuffer(buf) // todo understand: we write the stats buffer we have successfully sent to a pool, but anytime we access data from that pool, we clear it and reuse the allocation for the next stat
}
}
}
Expand Down
62 changes: 62 additions & 0 deletions net_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,68 @@ func testNetSinkIntegration(t *testing.T, protocol string) {
})
}

func TestFlushTimerNoBatching(t *testing.T) {
err := os.Setenv("GOSTATS_FORCED_BATCHING", "false")
if err != nil {
t.Fatalf("Failed to set environment variable: %s", err)
}

expected := [...]string{
"timer_int:1|ms\n",
"timer_float:1.230000|ms\n",
}

ts, sink := setupTestNetSink(t, "tcp", false) // the protocol is arbitrary and unimportant for batching
defer ts.Close()

sink.FlushTimer("timer_int", 1)
sink.FlushTimer("timer_float", 1.23)

time.Sleep(2001 * time.Millisecond)

exp := strings.Join(expected[:], "")
buf := ts.String()
if buf != exp {
t.Errorf("Not all stats were written\ngot:\n%q\nwant:\n%q\n", buf, exp)
}

os.Unsetenv("GOSTATS_FORCED_BATCHING")
}

func TestFlushTimerBatching(t *testing.T) {
err := os.Setenv("GOSTATS_FORCED_BATCHING", "true")
if err != nil {
t.Fatalf("Failed to set environment variable: %s", err)
}

expected := [...]string{
"timer_int:1|ms\n",
"timer_float:1.230000|ms\n",
}

ts, sink := setupTestNetSink(t, "tcp", false) // the protocol is arbitrary and unimportant for batching
defer ts.Close()

sink.FlushTimer("timer_int", 1)
sink.FlushTimer("timer_float", 1.23)
time.Sleep(2001 * time.Millisecond)

if ts.String() != "" {
t.Errorf("Stats were written despite forced batching")
}

sink.Flush()
time.Sleep(2001 * time.Millisecond)

exp := strings.Join(expected[:], "")
buf := ts.String()
if buf != exp {
t.Errorf("Not all stats were written\ngot:\n%q\nwant:\n%q\n", buf, exp)
}

os.Unsetenv("GOSTATS_FORCED_BATCHING")
}

func TestNetSink_Integration_TCP(t *testing.T) {
testNetSinkIntegration(t, "tcp")
}
Expand Down
9 changes: 9 additions & 0 deletions settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const (
DefaultFlushIntervalS = 5
// DefaultLoggingSinkDisabled is the default behavior of logging sink suppression, default is false.
DefaultLoggingSinkDisabled = false
// DefaultForcedBatching defines if we want to force batching by default
DefaultForcedBatching = false
)

// The Settings type is used to configure gostats. gostats uses environment
Expand All @@ -38,6 +40,8 @@ type Settings struct {
// Disable the LoggingSink when USE_STATSD is false and use the NullSink instead.
// This will cause all stats to be silently dropped.
LoggingSinkDisabled bool `envconfig:"GOSTATS_LOGGING_SINK_DISABLED" default:"false"`
// Force batching under FlushIntervalS for all metrics
ForcedBatching bool `envconfig:"GOSTATS_FORCED_BATCHING" default:"false"`
}

// An envError is an error that occurred parsing an environment variable
Expand Down Expand Up @@ -101,13 +105,18 @@ func GetSettings() Settings {
if err != nil {
panic(err)
}
forcedBatching, err := envBool("GOSTATS_FORCED_BATCHING", DefaultForcedBatching)
if err != nil {
panic(err)
}
return Settings{
UseStatsd: useStatsd,
StatsdHost: envOr("STATSD_HOST", DefaultStatsdHost),
StatsdProtocol: envOr("STATSD_PROTOCOL", DefaultStatsdProtocol),
StatsdPort: statsdPort,
FlushIntervalS: flushIntervalS,
LoggingSinkDisabled: loggingSinkDisabled,
ForcedBatching: forcedBatching,
}
}

Expand Down
9 changes: 5 additions & 4 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (t *timer) AddDuration(dur time.Duration) {
}

func (t *timer) AddValue(value float64) {
t.sink.FlushTimer(t.name, value)
t.sink.FlushTimer(t.name, value) // writes the timer to a buffered channel which will send immediately unless GOSTATS_FORCED_BATCHING is set
}

func (t *timer) AllocateSpan() Timespan {
Expand Down Expand Up @@ -357,6 +357,7 @@ func (s *statStore) validateTags(tags map[string]string) {
}
}

// flush loop based on specified ticker
func (s *statStore) StartContext(ctx context.Context, ticker *time.Ticker) {
for {
select {
Expand All @@ -383,19 +384,19 @@ func (s *statStore) Flush() {
s.counters.Range(func(key, v interface{}) bool {
// do not flush counters that are set to zero
if value := v.(*counter).latch(); value != 0 {
s.sink.FlushCounter(key.(string), value)
s.sink.FlushCounter(key.(string), value) // writes the counter to a buffered channel which will be sent either right away or at the end of the stack depending on GOSTATS_FORCED_BATCHING
}
return true
})

s.gauges.Range(func(key, v interface{}) bool {
s.sink.FlushGauge(key.(string), v.(*gauge).Value())
s.sink.FlushGauge(key.(string), v.(*gauge).Value()) // writes the gauage to a buffered channel which will be sent either right away or at the end of the stack depending on GOSTATS_FORCED_BATCHING
return true
})

flushableSink, ok := s.sink.(FlushableSink)
if ok {
flushableSink.Flush()
flushableSink.Flush() // signal the buffered channel (buffered counters, gauges and timers) to be drained and sent if there is anythig to send
}
}

Expand Down
15 changes: 8 additions & 7 deletions stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ func TestValidateTags(t *testing.T) {
store.Flush()

expected := "test:1|c"
counter := sink.record
if !strings.Contains(counter, expected) {
t.Error("wanted counter value of test:1|c, got", counter)
output := sink.record
if !strings.Contains(output, expected) && !strings.Contains(output, "reserved_tag") {
t.Errorf("Expected without reserved tags: '%s' Got: '%s'", expected, output)
}

// A reserved tag should trigger adding the reserved_tag counter
Expand All @@ -89,10 +89,11 @@ func TestValidateTags(t *testing.T) {
store.NewCounterWithTags("test", map[string]string{"host": "i"}).Inc()
store.Flush()

expected = "reserved_tag:1|c\ntest.__host=i:1|c"
counter = sink.record
if !strings.Contains(counter, expected) {
t.Error("wanted counter value of test.___f=i:1|c, got", counter)
expected = "test.__host=i:1|c"
Copy link
Contributor Author

@maciuszek maciuszek Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this test is out of scope of this work but it was previously volatile with the order of reserved_tag vs test.__host not being deterministic

expectedReservedTag := "reserved_tag:1|c"
output = sink.record
if !strings.Contains(output, expected) && !strings.Contains(output, expectedReservedTag) {
t.Errorf("Expected: '%s' and '%s', In: '%s'", expected, expectedReservedTag, output)
}
}

Expand Down
Loading