From b5a8083104a5f2a70a89bdd1fd8b41ec797bd647 Mon Sep 17 00:00:00 2001 From: hongjianzhu Date: Fri, 8 Apr 2016 16:24:37 +0800 Subject: [PATCH] add sample rate support --- client.go | 155 +++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 129 insertions(+), 26 deletions(-) diff --git a/client.go b/client.go index dd533de..cff2af7 100644 --- a/client.go +++ b/client.go @@ -1,8 +1,10 @@ package statsd import ( + "errors" "fmt" "log" + "math/rand" "net" "os" "strings" @@ -24,16 +26,21 @@ type Logger interface { // func init() { // statsd.UDPPayloadSize = 16 * 1024 // } -var UDPPayloadSize int = 512 +var UDPPayloadSize = 512 // Hostname is exported so clients can set it to something different than the default var Hostname string var errNotConnected = fmt.Errorf("cannot send stats, not connected to StatsD server") +// errors +var ( + ErrInvalidCount = errors.New("count is less than 0") + ErrInvalidSampleRate = errors.New("sample rate is larger than 1 or less then 0") +) + func init() { - host, err := os.Hostname() - if nil == err { + if host, err := os.Hostname(); nil == err { Hostname = host } } @@ -98,30 +105,70 @@ func (c *StatsdClient) Close() error { // Incr - Increment a counter metric. Often used to note a particular event func (c *StatsdClient) Incr(stat string, count int64) error { - if 0 != count { - return c.send(stat, "%d|c", count) + return c.IncrWithSampling(stat, count, 1) +} + +// IncrWithSampling - Increment a counter metric with sampling between 0 and 1 +func (c *StatsdClient) IncrWithSampling(stat string, count int64, sampleRate float32) error { + if err := checkSampleRate(sampleRate); err != nil { + return err } - return nil + + if !shouldFire(sampleRate) { + return nil // ignore this call + } + + if err := checkCount(count); err != nil { + return err + } + + return c.send(stat, "%d|c", count, sampleRate) } // Decr - Decrement a counter metric. Often used to note a particular event func (c *StatsdClient) Decr(stat string, count int64) error { - if 0 != count { - return c.send(stat, "%d|c", -count) + return c.DecrWithSampling(stat, count, 1) +} + +// DecrWithSampling - Decrement a counter metric with sampling between 0 and 1 +func (c *StatsdClient) DecrWithSampling(stat string, count int64, sampleRate float32) error { + if err := checkSampleRate(sampleRate); err != nil { + return err } - return nil + + if !shouldFire(sampleRate) { + return nil // ignore this call + } + + if err := checkCount(count); err != nil { + return err + } + + return c.send(stat, "%d|c", -count, sampleRate) } // Timing - Track a duration event // the time delta must be given in milliseconds func (c *StatsdClient) Timing(stat string, delta int64) error { - return c.send(stat, "%d|ms", delta) + return c.TimingWithSampling(stat, delta, 1) +} + +func (c *StatsdClient) TimingWithSampling(stat string, delta int64, sampleRate float32) error { + if err := checkSampleRate(sampleRate); err != nil { + return err + } + + if !shouldFire(sampleRate) { + return nil // ignore this call + } + + return c.send(stat, "%d|ms", delta, sampleRate) } // PrecisionTiming - Track a duration event // the time delta has to be a duration func (c *StatsdClient) PrecisionTiming(stat string, delta time.Duration) error { - return c.send(stat, "%.6f|ms", float64(delta)/float64(time.Millisecond)) + return c.send(stat, "%.6f|ms", float64(delta)/float64(time.Millisecond), 1) } // Gauge - Gauges are a constant data type. They are not subject to averaging, @@ -131,62 +178,92 @@ func (c *StatsdClient) PrecisionTiming(stat string, delta time.Duration) error { // underlying protocol, you can't explicitly set a gauge to a negative number without // first setting it to zero. func (c *StatsdClient) Gauge(stat string, value int64) error { + return c.GaugeWithSampling(stat, value, 1) +} + +func (c *StatsdClient) GaugeWithSampling(stat string, value int64, sampleRate float32) error { + if err := checkSampleRate(sampleRate); err != nil { + return err + } + + if !shouldFire(sampleRate) { + return nil // ignore this call + } + if value < 0 { - c.send(stat, "%d|g", 0) - return c.send(stat, "%d|g", value) + c.send(stat, "%d|g", 0, 1) } - return c.send(stat, "%d|g", value) + + return c.send(stat, "%d|g", value, sampleRate) } // GaugeDelta -- Send a change for a gauge func (c *StatsdClient) GaugeDelta(stat string, value int64) error { // Gauge Deltas are always sent with a leading '+' or '-'. The '-' takes care of itself but the '+' must added by hand if value < 0 { - return c.send(stat, "%d|g", value) + return c.send(stat, "%d|g", value, 1) } - return c.send(stat, "+%d|g", value) + return c.send(stat, "+%d|g", value, 1) } // FGauge -- Send a floating point value for a gauge func (c *StatsdClient) FGauge(stat string, value float64) error { + return c.FGaugeWithSampling(stat, value, 1) +} + +func (c *StatsdClient) FGaugeWithSampling(stat string, value float64, sampleRate float32) error { + if err := checkSampleRate(sampleRate); err != nil { + return err + } + + if !shouldFire(sampleRate) { + return nil + } + if value < 0 { - c.send(stat, "%d|g", 0) - return c.send(stat, "%g|g", value) + c.send(stat, "%d|g", 0, 1) } - return c.send(stat, "%g|g", value) + + return c.send(stat, "%g|g", value, sampleRate) } // FGaugeDelta -- Send a floating point change for a gauge func (c *StatsdClient) FGaugeDelta(stat string, value float64) error { if value < 0 { - return c.send(stat, "%g|g", value) + return c.send(stat, "%g|g", value, 1) } - return c.send(stat, "+%g|g", value) + return c.send(stat, "+%g|g", value, 1) } // Absolute - Send absolute-valued metric (not averaged/aggregated) func (c *StatsdClient) Absolute(stat string, value int64) error { - return c.send(stat, "%d|a", value) + return c.send(stat, "%d|a", value, 1) } // FAbsolute - Send absolute-valued floating point metric (not averaged/aggregated) func (c *StatsdClient) FAbsolute(stat string, value float64) error { - return c.send(stat, "%g|a", value) + return c.send(stat, "%g|a", value, 1) } // Total - Send a metric that is continously increasing, e.g. read operations since boot func (c *StatsdClient) Total(stat string, value int64) error { - return c.send(stat, "%d|t", value) + return c.send(stat, "%d|t", value, 1) } // write a UDP packet with the statsd event -func (c *StatsdClient) send(stat string, format string, value interface{}) error { +func (c *StatsdClient) send(stat string, format string, value interface{}, sampleRate float32) error { if c.conn == nil { return errNotConnected } + stat = strings.Replace(stat, "%HOST%", Hostname, 1) // if sending tcp append a newline format = fmt.Sprintf(c.eventStringTpl, c.prefix, stat, format) + + if sampleRate != 1 { + format = fmt.Sprintf("%s|@%f", format, sampleRate) + } + _, err := fmt.Fprintf(c.conn, format, value) return err } @@ -214,7 +291,7 @@ func (c *StatsdClient) SendEvents(events map[string]event.Event) error { } var n int - var stats []string = make([]string, 0) + var stats = make([]string, 0) for _, e := range events { for _, stat := range e.Stats() { @@ -247,3 +324,29 @@ func (c *StatsdClient) SendEvents(events map[string]event.Event) error { return nil } + +func checkCount(c int64) error { + if c <= 0 { + return ErrInvalidCount + } + + return nil +} + +func checkSampleRate(r float32) error { + if r < 0 || r > 1 { + return ErrInvalidSampleRate + } + + return nil +} + +func shouldFire(sampleRate float32) bool { + if sampleRate == 1 { + return true + } + + r := rand.New(rand.NewSource(time.Now().Unix())) + + return r.Float32() <= sampleRate +}