From 43c1379c0577ac1eb74f9f3869cea07191c9992b Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Sun, 5 Feb 2017 22:57:49 -0500 Subject: [PATCH] Add multi reporters to support multiple backends for a single scope (#26) --- multi/README.md | 13 + multi/reporter.go | 198 ++++++++++++++ multi/reporter_test.go | 252 ++++++++++++++++++ statsd/README.md | 19 ++ statsd/{options.go => example/statsd_main.go} | 76 ++++-- statsd/reporter.go | 14 +- statsd/reporter_test.go | 2 +- 7 files changed, 544 insertions(+), 30 deletions(-) create mode 100644 multi/README.md create mode 100644 multi/reporter.go create mode 100644 multi/reporter_test.go create mode 100644 statsd/README.md rename statsd/{options.go => example/statsd_main.go} (50%) diff --git a/multi/README.md b/multi/README.md new file mode 100644 index 00000000..b8e67296 --- /dev/null +++ b/multi/README.md @@ -0,0 +1,13 @@ +# A buffered multi reporter + +Combine reporters to emit to many backends. + +Multiple `tally.StatsReporter` as a single reporter: +```go +reporter := NewMultiReporter(statsdReporter, ...) +``` + +Multiple `tally.CachedStatsReporter` as a single reporter: +```go +reporter := NewMultiCachedReporter(m3Reporter, promReporter, ...) +``` diff --git a/multi/reporter.go b/multi/reporter.go new file mode 100644 index 00000000..f5e5a7c4 --- /dev/null +++ b/multi/reporter.go @@ -0,0 +1,198 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package multi + +import ( + "time" + + "github.com/uber-go/tally" +) + +type multi struct { + multiBaseReporters multiBaseReporters + reporters []tally.StatsReporter +} + +// NewMultiReporter creates a new multi tally.StatsReporter. +func NewMultiReporter( + r ...tally.StatsReporter, +) tally.StatsReporter { + var baseReporters multiBaseReporters + for _, r := range r { + baseReporters = append(baseReporters, r) + } + return &multi{ + multiBaseReporters: baseReporters, + reporters: r, + } +} + +func (r *multi) ReportCounter( + name string, + tags map[string]string, + value int64, +) { + for _, r := range r.reporters { + r.ReportCounter(name, tags, value) + } +} + +func (r *multi) ReportGauge( + name string, + tags map[string]string, + value float64, +) { + for _, r := range r.reporters { + r.ReportGauge(name, tags, value) + } +} + +func (r *multi) ReportTimer( + name string, + tags map[string]string, + interval time.Duration, +) { + for _, r := range r.reporters { + r.ReportTimer(name, tags, interval) + } +} + +func (r *multi) Capabilities() tally.Capabilities { + return r.multiBaseReporters.Capabilities() +} + +func (r *multi) Flush() { + r.multiBaseReporters.Flush() +} + +type multiCached struct { + multiBaseReporters multiBaseReporters + reporters []tally.CachedStatsReporter +} + +// NewMultiCachedReporter creates a new multi tally.CachedStatsReporter. +func NewMultiCachedReporter( + r ...tally.CachedStatsReporter, +) tally.CachedStatsReporter { + var baseReporters multiBaseReporters + for _, r := range r { + baseReporters = append(baseReporters, r) + } + return &multiCached{ + multiBaseReporters: baseReporters, + reporters: r, + } +} + +func (r *multiCached) AllocateCounter( + name string, + tags map[string]string, +) tally.CachedCount { + metrics := make([]tally.CachedCount, 0, len(r.reporters)) + for _, r := range r.reporters { + metrics = append(metrics, r.AllocateCounter(name, tags)) + } + return multiMetric{counters: metrics} +} + +func (r *multiCached) AllocateGauge( + name string, + tags map[string]string, +) tally.CachedGauge { + metrics := make([]tally.CachedGauge, 0, len(r.reporters)) + for _, r := range r.reporters { + metrics = append(metrics, r.AllocateGauge(name, tags)) + } + return multiMetric{gauges: metrics} +} + +func (r *multiCached) AllocateTimer( + name string, + tags map[string]string, +) tally.CachedTimer { + metrics := make([]tally.CachedTimer, 0, len(r.reporters)) + for _, r := range r.reporters { + metrics = append(metrics, r.AllocateTimer(name, tags)) + } + return multiMetric{timers: metrics} +} + +func (r *multiCached) Capabilities() tally.Capabilities { + return r.multiBaseReporters.Capabilities() +} + +func (r *multiCached) Flush() { + r.multiBaseReporters.Flush() +} + +type multiMetric struct { + counters []tally.CachedCount + gauges []tally.CachedGauge + timers []tally.CachedTimer +} + +func (m multiMetric) ReportCount(value int64) { + for _, m := range m.counters { + m.ReportCount(value) + } +} + +func (m multiMetric) ReportGauge(value float64) { + for _, m := range m.gauges { + m.ReportGauge(value) + } +} + +func (m multiMetric) ReportTimer(interval time.Duration) { + for _, m := range m.timers { + m.ReportTimer(interval) + } +} + +type multiBaseReporters []tally.BaseStatsReporter + +func (r multiBaseReporters) Capabilities() tally.Capabilities { + c := &capabilities{reporting: true, tagging: true} + for _, r := range r { + c.reporting = c.reporting && r.Capabilities().Reporting() + c.tagging = c.tagging && r.Capabilities().Tagging() + } + return c +} + +func (r multiBaseReporters) Flush() { + for _, r := range r { + r.Flush() + } +} + +type capabilities struct { + reporting bool + tagging bool +} + +func (c *capabilities) Reporting() bool { + return c.reporting +} + +func (c *capabilities) Tagging() bool { + return c.tagging +} diff --git a/multi/reporter_test.go b/multi/reporter_test.go new file mode 100644 index 00000000..ad2ffb9e --- /dev/null +++ b/multi/reporter_test.go @@ -0,0 +1,252 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package multi + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" +) + +func TestMultiReporter(t *testing.T) { + a, b, c := + newCapturingStatsReporter(), + newCapturingStatsReporter(), + newCapturingStatsReporter() + all := []*capturingStatsReporter{a, b, c} + + r := NewMultiReporter(a, b, c) + + tags := []map[string]string{ + {"foo": "bar"}, + {"foo": "baz"}, + {"foo": "qux"}, + } + + r.ReportCounter("foo", tags[0], 42) + r.ReportCounter("foo", tags[0], 84) + r.ReportGauge("baz", tags[1], 42.0) + r.ReportTimer("qux", tags[2], 126*time.Millisecond) + for _, r := range all { + require.Equal(t, 2, len(r.counts)) + + assert.Equal(t, "foo", r.counts[0].name) + assert.Equal(t, tags[0], r.counts[0].tags) + assert.Equal(t, int64(42), r.counts[0].value) + + assert.Equal(t, "foo", r.counts[1].name) + assert.Equal(t, tags[0], r.counts[1].tags) + assert.Equal(t, int64(84), r.counts[1].value) + + assert.Equal(t, "baz", r.gauges[0].name) + assert.Equal(t, tags[1], r.gauges[0].tags) + assert.Equal(t, float64(42.0), r.gauges[0].value) + + assert.Equal(t, "qux", r.timers[0].name) + assert.Equal(t, tags[2], r.timers[0].tags) + assert.Equal(t, 126*time.Millisecond, r.timers[0].value) + } + + assert.NotNil(t, r.Capabilities()) + + r.Flush() + for _, r := range all { + assert.Equal(t, 1, r.flush) + } +} + +func TestMultiCachedReporter(t *testing.T) { + a, b, c := + newCapturingStatsReporter(), + newCapturingStatsReporter(), + newCapturingStatsReporter() + all := []*capturingStatsReporter{a, b, c} + + r := NewMultiCachedReporter(a, b, c) + + tags := []map[string]string{ + {"foo": "bar"}, + {"foo": "baz"}, + {"foo": "qux"}, + } + + ctr := r.AllocateCounter("foo", tags[0]) + ctr.ReportCount(42) + ctr.ReportCount(84) + + gauge := r.AllocateGauge("baz", tags[1]) + gauge.ReportGauge(42.0) + + tmr := r.AllocateTimer("qux", tags[2]) + tmr.ReportTimer(126 * time.Millisecond) + + for _, r := range all { + require.Equal(t, 2, len(r.counts)) + + assert.Equal(t, "foo", r.counts[0].name) + assert.Equal(t, tags[0], r.counts[0].tags) + assert.Equal(t, int64(42), r.counts[0].value) + + assert.Equal(t, "foo", r.counts[1].name) + assert.Equal(t, tags[0], r.counts[1].tags) + assert.Equal(t, int64(84), r.counts[1].value) + + assert.Equal(t, "baz", r.gauges[0].name) + assert.Equal(t, tags[1], r.gauges[0].tags) + assert.Equal(t, float64(42.0), r.gauges[0].value) + + assert.Equal(t, "qux", r.timers[0].name) + assert.Equal(t, tags[2], r.timers[0].tags) + assert.Equal(t, 126*time.Millisecond, r.timers[0].value) + } + + assert.NotNil(t, r.Capabilities()) + + r.Flush() + for _, r := range all { + assert.Equal(t, 1, r.flush) + } +} + +type capturingStatsReporter struct { + counts []capturedCount + gauges []capturedGauge + timers []capturedTimer + capabilities int + flush int +} + +type capturedCount struct { + name string + tags map[string]string + value int64 +} + +type capturedGauge struct { + name string + tags map[string]string + value float64 +} + +type capturedTimer struct { + name string + tags map[string]string + value time.Duration +} + +func newCapturingStatsReporter() *capturingStatsReporter { + return &capturingStatsReporter{} +} + +func (r *capturingStatsReporter) ReportCounter( + name string, + tags map[string]string, + value int64, +) { + r.counts = append(r.counts, capturedCount{name, tags, value}) +} + +func (r *capturingStatsReporter) ReportGauge( + name string, + tags map[string]string, + value float64, +) { + r.gauges = append(r.gauges, capturedGauge{name, tags, value}) +} + +func (r *capturingStatsReporter) ReportTimer( + name string, + tags map[string]string, + value time.Duration, +) { + r.timers = append(r.timers, capturedTimer{name, tags, value}) +} + +func (r *capturingStatsReporter) AllocateCounter( + name string, + tags map[string]string, +) tally.CachedCount { + return cachedCount{fn: func(value int64) { + r.counts = append(r.counts, capturedCount{name, tags, value}) + }} +} + +func (r *capturingStatsReporter) AllocateGauge( + name string, + tags map[string]string, +) tally.CachedGauge { + return cachedGauge{fn: func(value float64) { + r.gauges = append(r.gauges, capturedGauge{name, tags, value}) + }} +} + +func (r *capturingStatsReporter) AllocateTimer( + name string, + tags map[string]string, +) tally.CachedTimer { + return cachedTimer{fn: func(value time.Duration) { + r.timers = append(r.timers, capturedTimer{name, tags, value}) + }} +} + +func (r *capturingStatsReporter) Capabilities() tally.Capabilities { + r.capabilities++ + return r +} + +func (r *capturingStatsReporter) Reporting() bool { + return true +} + +func (r *capturingStatsReporter) Tagging() bool { + return true +} + +func (r *capturingStatsReporter) Flush() { + r.flush++ +} + +type cachedCount struct { + fn func(value int64) +} + +func (c cachedCount) ReportCount(value int64) { + c.fn(value) +} + +type cachedGauge struct { + fn func(value float64) +} + +func (c cachedGauge) ReportGauge(value float64) { + c.fn(value) +} + +type cachedTimer struct { + fn func(value time.Duration) +} + +func (c cachedTimer) ReportTimer(value time.Duration) { + c.fn(value) +} diff --git a/statsd/README.md b/statsd/README.md new file mode 100644 index 00000000..29ec5b30 --- /dev/null +++ b/statsd/README.md @@ -0,0 +1,19 @@ +# A buffered statsd reporter + +See `examples/statsd_main.go` for an end to end example. + +## Options + +You can use either a basic or a buffered statsd client +and pass it to the reporter along with options. + +The reporter options are: + +```go +// Options is a set of options for the tally reporter. +type Options struct { + // SampleRate is the metrics emission sample rate. If you + // do not set this value it will be set to 1. + SampleRate float32 +} +``` diff --git a/statsd/options.go b/statsd/example/statsd_main.go similarity index 50% rename from statsd/options.go rename to statsd/example/statsd_main.go index fc5ca6a0..06aa76df 100644 --- a/statsd/options.go +++ b/statsd/example/statsd_main.go @@ -18,38 +18,62 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package statsd +package main -const ( - defaultSampleRate = 1.0 -) +import ( + "log" + "math/rand" + "time" -// Options represents a set of statsd stats reporter options -type Options interface { - // SampleRate returns the sample rate - SampleRate() float32 + "github.com/cactus/go-statsd-client/statsd" - // SetSampleRate sets the sample rate and returns new options with the value set - SetSampleRate(value float32) Options -} + "github.com/uber-go/tally" + statsdreporter "github.com/uber-go/tally/statsd" +) -// NewOptions creates a new set of statsd stats reporter options -func NewOptions() Options { - return &options{ - sampleRate: defaultSampleRate, +// To view statsd emitted metrics locally you can use +// netcat with "nc 8125 -l -u" +func main() { + statter, err := statsd.NewBufferedClient("127.0.0.1:8125", + "stats", 100*time.Millisecond, 1440) + if err != nil { + log.Fatalf("could not create statsd client: %v", err) } -} -type options struct { - sampleRate float32 -} + opts := statsdreporter.Options{} + r := statsdreporter.NewReporter(statter, opts) -func (o *options) SampleRate() float32 { - return o.sampleRate -} + scope, closer := tally.NewRootScope("my-service", map[string]string{}, + r, 1*time.Second, tally.DefaultSeparator) + defer closer.Close() + + counter := scope.Counter("test-counter") + + gauge := scope.Gauge("test-gauge") + + timer := scope.Timer("test-timer") + + go func() { + for { + counter.Inc(1) + time.Sleep(time.Second) + } + }() + + go func() { + for { + gauge.Update(rand.Float64() * 1000) + time.Sleep(time.Second) + } + }() + + go func() { + for { + sw := timer.Start() + time.Sleep(time.Duration(rand.Float64() * float64(time.Second))) + sw.Stop() + } + }() -func (o *options) SetSampleRate(value float32) Options { - opts := *o - opts.sampleRate = value - return &opts + select {} } diff --git a/statsd/reporter.go b/statsd/reporter.go index e5ec3a71..7e9da95e 100644 --- a/statsd/reporter.go +++ b/statsd/reporter.go @@ -32,15 +32,23 @@ type cactusStatsReporter struct { sampleRate float32 } +// Options is a set of options for the tally reporter. +type Options struct { + // SampleRate is the metrics emission sample rate. If you + // do not set this value it will be set to 1. + SampleRate float32 +} + // NewReporter wraps a statsd.Statter for use with tally. Use either // statsd.NewClient or statsd.NewBufferedClient. func NewReporter(statsd statsd.Statter, opts Options) tally.StatsReporter { - if opts == nil { - opts = NewOptions() + var nilSampleRate float32 + if opts.SampleRate == nilSampleRate { + opts.SampleRate = 1.0 } return &cactusStatsReporter{ statter: statsd, - sampleRate: opts.SampleRate(), + sampleRate: opts.SampleRate, } } diff --git a/statsd/reporter_test.go b/statsd/reporter_test.go index 15ec78e6..75b7647b 100644 --- a/statsd/reporter_test.go +++ b/statsd/reporter_test.go @@ -27,7 +27,7 @@ import ( ) func TestCapabilities(t *testing.T) { - r := NewReporter(nil, nil) + r := NewReporter(nil, Options{}) assert.True(t, r.Capabilities().Reporting()) assert.False(t, r.Capabilities().Tagging()) }