Skip to content

Commit

Permalink
Merge pull request #35 from hongjianzhu/master
Browse files Browse the repository at this point in the history
add sample rate support and a little refactor
  • Loading branch information
quipo authored Aug 1, 2017
2 parents 20e051c + b5a8083 commit 107a2dd
Showing 1 changed file with 129 additions and 26 deletions.
155 changes: 129 additions & 26 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package statsd

import (
"errors"
"fmt"
"log"
"math/rand"
"net"
"os"
"strings"
Expand All @@ -24,16 +26,21 @@ type Logger interface {
// func init() {
// statsd.UDPPayloadSize = 16 * 1024
// }
var UDPPayloadSize int = 512
var UDPPayloadSize = 512

// Hostname is exported so clients can set it to something different than the default
var Hostname string

var errNotConnected = fmt.Errorf("cannot send stats, not connected to StatsD server")

// errors
var (
ErrInvalidCount = errors.New("count is less than 0")
ErrInvalidSampleRate = errors.New("sample rate is larger than 1 or less then 0")
)

func init() {
host, err := os.Hostname()
if nil == err {
if host, err := os.Hostname(); nil == err {
Hostname = host
}
}
Expand Down Expand Up @@ -98,30 +105,70 @@ func (c *StatsdClient) Close() error {

// Incr - Increment a counter metric. Often used to note a particular event
func (c *StatsdClient) Incr(stat string, count int64) error {
if 0 != count {
return c.send(stat, "%d|c", count)
return c.IncrWithSampling(stat, count, 1)
}

// IncrWithSampling - Increment a counter metric with sampling between 0 and 1
func (c *StatsdClient) IncrWithSampling(stat string, count int64, sampleRate float32) error {
if err := checkSampleRate(sampleRate); err != nil {
return err
}
return nil

if !shouldFire(sampleRate) {
return nil // ignore this call
}

if err := checkCount(count); err != nil {
return err
}

return c.send(stat, "%d|c", count, sampleRate)
}

// Decr - Decrement a counter metric. Often used to note a particular event
func (c *StatsdClient) Decr(stat string, count int64) error {
if 0 != count {
return c.send(stat, "%d|c", -count)
return c.DecrWithSampling(stat, count, 1)
}

// DecrWithSampling - Decrement a counter metric with sampling between 0 and 1
func (c *StatsdClient) DecrWithSampling(stat string, count int64, sampleRate float32) error {
if err := checkSampleRate(sampleRate); err != nil {
return err
}
return nil

if !shouldFire(sampleRate) {
return nil // ignore this call
}

if err := checkCount(count); err != nil {
return err
}

return c.send(stat, "%d|c", -count, sampleRate)
}

// Timing - Track a duration event
// the time delta must be given in milliseconds
func (c *StatsdClient) Timing(stat string, delta int64) error {
return c.send(stat, "%d|ms", delta)
return c.TimingWithSampling(stat, delta, 1)
}

func (c *StatsdClient) TimingWithSampling(stat string, delta int64, sampleRate float32) error {
if err := checkSampleRate(sampleRate); err != nil {
return err
}

if !shouldFire(sampleRate) {
return nil // ignore this call
}

return c.send(stat, "%d|ms", delta, sampleRate)
}

// PrecisionTiming - Track a duration event
// the time delta has to be a duration
func (c *StatsdClient) PrecisionTiming(stat string, delta time.Duration) error {
return c.send(stat, "%.6f|ms", float64(delta)/float64(time.Millisecond))
return c.send(stat, "%.6f|ms", float64(delta)/float64(time.Millisecond), 1)
}

// Gauge - Gauges are a constant data type. They are not subject to averaging,
Expand All @@ -131,62 +178,92 @@ func (c *StatsdClient) PrecisionTiming(stat string, delta time.Duration) error {
// underlying protocol, you can't explicitly set a gauge to a negative number without
// first setting it to zero.
func (c *StatsdClient) Gauge(stat string, value int64) error {
return c.GaugeWithSampling(stat, value, 1)
}

func (c *StatsdClient) GaugeWithSampling(stat string, value int64, sampleRate float32) error {
if err := checkSampleRate(sampleRate); err != nil {
return err
}

if !shouldFire(sampleRate) {
return nil // ignore this call
}

if value < 0 {
c.send(stat, "%d|g", 0)
return c.send(stat, "%d|g", value)
c.send(stat, "%d|g", 0, 1)
}
return c.send(stat, "%d|g", value)

return c.send(stat, "%d|g", value, sampleRate)
}

// GaugeDelta -- Send a change for a gauge
func (c *StatsdClient) GaugeDelta(stat string, value int64) error {
// Gauge Deltas are always sent with a leading '+' or '-'. The '-' takes care of itself but the '+' must added by hand
if value < 0 {
return c.send(stat, "%d|g", value)
return c.send(stat, "%d|g", value, 1)
}
return c.send(stat, "+%d|g", value)
return c.send(stat, "+%d|g", value, 1)
}

// FGauge -- Send a floating point value for a gauge
func (c *StatsdClient) FGauge(stat string, value float64) error {
return c.FGaugeWithSampling(stat, value, 1)
}

func (c *StatsdClient) FGaugeWithSampling(stat string, value float64, sampleRate float32) error {
if err := checkSampleRate(sampleRate); err != nil {
return err
}

if !shouldFire(sampleRate) {
return nil
}

if value < 0 {
c.send(stat, "%d|g", 0)
return c.send(stat, "%g|g", value)
c.send(stat, "%d|g", 0, 1)
}
return c.send(stat, "%g|g", value)

return c.send(stat, "%g|g", value, sampleRate)
}

// FGaugeDelta -- Send a floating point change for a gauge
func (c *StatsdClient) FGaugeDelta(stat string, value float64) error {
if value < 0 {
return c.send(stat, "%g|g", value)
return c.send(stat, "%g|g", value, 1)
}
return c.send(stat, "+%g|g", value)
return c.send(stat, "+%g|g", value, 1)
}

// Absolute - Send absolute-valued metric (not averaged/aggregated)
func (c *StatsdClient) Absolute(stat string, value int64) error {
return c.send(stat, "%d|a", value)
return c.send(stat, "%d|a", value, 1)
}

// FAbsolute - Send absolute-valued floating point metric (not averaged/aggregated)
func (c *StatsdClient) FAbsolute(stat string, value float64) error {
return c.send(stat, "%g|a", value)
return c.send(stat, "%g|a", value, 1)
}

// Total - Send a metric that is continously increasing, e.g. read operations since boot
func (c *StatsdClient) Total(stat string, value int64) error {
return c.send(stat, "%d|t", value)
return c.send(stat, "%d|t", value, 1)
}

// write a UDP packet with the statsd event
func (c *StatsdClient) send(stat string, format string, value interface{}) error {
func (c *StatsdClient) send(stat string, format string, value interface{}, sampleRate float32) error {
if c.conn == nil {
return errNotConnected
}

stat = strings.Replace(stat, "%HOST%", Hostname, 1)
// if sending tcp append a newline
format = fmt.Sprintf(c.eventStringTpl, c.prefix, stat, format)

if sampleRate != 1 {
format = fmt.Sprintf("%s|@%f", format, sampleRate)
}

_, err := fmt.Fprintf(c.conn, format, value)
return err
}
Expand Down Expand Up @@ -214,7 +291,7 @@ func (c *StatsdClient) SendEvents(events map[string]event.Event) error {
}

var n int
var stats []string = make([]string, 0)
var stats = make([]string, 0)

for _, e := range events {
for _, stat := range e.Stats() {
Expand Down Expand Up @@ -247,3 +324,29 @@ func (c *StatsdClient) SendEvents(events map[string]event.Event) error {

return nil
}

func checkCount(c int64) error {
if c <= 0 {
return ErrInvalidCount
}

return nil
}

func checkSampleRate(r float32) error {
if r < 0 || r > 1 {
return ErrInvalidSampleRate
}

return nil
}

func shouldFire(sampleRate float32) bool {
if sampleRate == 1 {
return true
}

r := rand.New(rand.NewSource(time.Now().Unix()))

return r.Float32() <= sampleRate
}

0 comments on commit 107a2dd

Please sign in to comment.