Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
maciuszek committed Dec 24, 2024
1 parent 101187d commit 972503c
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 14 deletions.
14 changes: 7 additions & 7 deletions net_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,12 @@ func (w *sinkWriter) Write(p []byte) (int, error) {
}

func (s *netSink) Flush() {
// flushed buffer to outc which will write immediately
// flushed buffer to outc which will send immediately without batching
if s.flush() != nil {
return // nothing we can do
}
ch := make(chan struct{})
s.doFlush <- ch // send batches
s.doFlush <- ch // send collected outc batches
<-ch
}

Expand Down Expand Up @@ -319,29 +319,29 @@ func (s *netSink) run() {
// Drop through in case retryc has nothing.
}

// send batch if full
// send batched outc data anytime we're full and can't batch anymore
if len(batch) == cap(batch) {
batch = s.sendBatch(batch)
}

// flush to outc and batch and/or send outc
select {
case done := <-s.doFlush:
// send batch on doFlush signal
// send batched outc data
batch = s.sendBatch(batch)
close(done)
case buf := <-s.outc:
if isBatchEnabled {
// Batch outc data rely on doFlush signal to send
// Batch outc data and rely on the doFlush signal to send
batch = append(batch, *buf)
continue
}
// Without batching we will send stats if outc has data
// Send outc data immediately
if err := s.send(buf); err == nil {
putBuffer(buf)
}
case <-t.C:
s.flush() // from a higher level, stats write to s.bufWriter at GOSTATS_FLUSH_INTERVAL_SECONDS (or adhoc for Timers). this flushes them to outc every t.C tick
s.flush() // from a higher level, stats are written to s.bufWriter.buf at GOSTATS_FLUSH_INTERVAL_SECONDS (or adhoc for Timers). this flushes them to outc every t.C tick
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
DefaultFlushIntervalS = 5
// DefaultLoggingSinkDisabled is the default behavior of logging sink suppression, default is false.
DefaultLoggingSinkDisabled = false
// DefaultBatchSize implies if batching is enabled by default and the amount to batch
// DefaultBatchSize is the default maximum amount of stats we batch before sending, default is 0 which disables batching.
DefaultBatchSize = 0
)

Expand All @@ -40,7 +40,7 @@ type Settings struct {
// Disable the LoggingSink when USE_STATSD is false and use the NullSink instead.
// This will cause all stats to be silently dropped.
LoggingSinkDisabled bool `envconfig:"GOSTATS_LOGGING_SINK_DISABLED" default:"false"`
// Amount of stats to batch before sending. 0 is disabled.
// Max stats we batch before sending. 0 is disabled.
BatchSize int `envconfig:"GOSTATS_BATCH_SIZE" default:"0"`
}

Expand Down
10 changes: 5 additions & 5 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (t *timer) AddDuration(dur time.Duration) {
}

func (t *timer) AddValue(value float64) {
t.sink.FlushTimer(t.name, value) // writes the timer value to the buffer which will be flushed to outc every second, will be sent immediately when flushed to outc is batching is not enabeld
t.sink.FlushTimer(t.name, value) // writes the timer value to the buffer which will be flushed to outc at an interval, will be sent immediately when flushed to outc if batching is not enabeld
}

func (t *timer) AllocateSpan() Timespan {
Expand Down Expand Up @@ -357,7 +357,7 @@ func (s *statStore) validateTags(tags map[string]string) {
}
}

// stat buffer and flush loop based on specified ticker
// stat buffer and flush loop based on the specified ticker
func (s *statStore) StartContext(ctx context.Context, ticker *time.Ticker) {
for {
select {
Expand All @@ -384,19 +384,19 @@ func (s *statStore) Flush() {
s.counters.Range(func(key, v interface{}) bool {
// do not flush counters that are set to zero
if value := v.(*counter).latch(); value != 0 {
s.sink.FlushCounter(key.(string), value) // writes counters to buffer which will be flushed to both outc every second and at the end of this stack
s.sink.FlushCounter(key.(string), value) // writes counters to buffer which will be flushed to outc both at an interval and at the end of this stack
}
return true
})

s.gauges.Range(func(key, v interface{}) bool {
s.sink.FlushGauge(key.(string), v.(*gauge).Value()) // writes gauages to buffer which will be flushed to outc both every second and at the end of this stack
s.sink.FlushGauge(key.(string), v.(*gauge).Value()) // writes gauages to buffer which will be flushed to outc both at an interval and at the end of this stack
return true
})

flushableSink, ok := s.sink.(FlushableSink)
if ok {
flushableSink.Flush() // flushes buffer to outc which directly writes if batching is disabled and signals doFlush to send outc data if batching is enabaled
flushableSink.Flush() // flushes buffer to outc which directly writes if batching is disabled and signals doFlush to send batched outc data if batching is enabaled
}
}

Expand Down

0 comments on commit 972503c

Please sign in to comment.