Skip to content

Commit

Permalink
Fix possible flush after close semantics for scope (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington authored Feb 19, 2017
1 parent 3349347 commit 34be4a5
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 35 deletions.
60 changes: 44 additions & 16 deletions scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type scope struct {
defaultBuckets Buckets

registry *scopeRegistry
quit chan struct{}
status scopeStatus

cm sync.RWMutex
gm sync.RWMutex
Expand All @@ -79,6 +79,12 @@ type scope struct {
histograms map[string]*histogram
}

type scopeStatus struct {
sync.RWMutex
closed bool
quit chan struct{}
}

type scopeRegistry struct {
sync.RWMutex
subscopes map[string]*scope
Expand Down Expand Up @@ -147,7 +153,10 @@ func newRootScope(opts ScopeOptions, interval time.Duration) *scope {
registry: &scopeRegistry{
subscopes: make(map[string]*scope),
},
quit: make(chan struct{}, 1),
status: scopeStatus{
closed: false,
quit: make(chan struct{}, 1),
},

counters: make(map[string]*counter),
gauges: make(map[string]*gauge),
Expand Down Expand Up @@ -217,27 +226,42 @@ func (s *scope) cachedReport(c CachedStatsReporter) {
// reportLoop is used by the root scope for periodic reporting
func (s *scope) reportLoop(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
s.registry.RLock()
if s.reporter != nil {
for _, ss := range s.registry.subscopes {
ss.report(s.reporter)
}
} else if s.cachedReporter != nil {
for _, ss := range s.registry.subscopes {
ss.cachedReport(s.cachedReporter)
}
}

s.registry.RUnlock()
case <-s.quit:
s.reportLoopRun()
case <-s.status.quit:
return
}
}
}

func (s *scope) reportLoopRun() {
// Need to hold a status lock to ensure not to report
// and flush after a close
s.status.RLock()
if s.status.closed {
s.status.RUnlock()
return
}

s.registry.RLock()
if s.reporter != nil {
for _, ss := range s.registry.subscopes {
ss.report(s.reporter)
}
} else if s.cachedReporter != nil {
for _, ss := range s.registry.subscopes {
ss.cachedReport(s.cachedReporter)
}
}
s.registry.RUnlock()

s.status.RUnlock()
}

func (s *scope) Counter(name string) Counter {
s.cm.RLock()
val, ok := s.counters[name]
Expand Down Expand Up @@ -440,7 +464,11 @@ func (s *scope) Snapshot() Snapshot {
}

func (s *scope) Close() error {
close(s.quit)
s.status.Lock()
s.status.closed = true
close(s.status.quit)
s.status.Unlock()

if closer, ok := s.baseReporter.(io.Closer); ok {
return closer.Close()
}
Expand Down
81 changes: 62 additions & 19 deletions scope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package tally

import (
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -80,6 +81,8 @@ type testStatsReporter struct {
gauges map[string]*testFloatValue
timers map[string]*testIntValue
histograms map[string]*testHistogramValue

flushes int32
}

// newTestStatsReporter returns a new TestStatsReporter
Expand Down Expand Up @@ -248,19 +251,23 @@ func (r *testStatsReporter) Capabilities() Capabilities {
return capabilitiesReportingNoTagging
}

func (r *testStatsReporter) Flush() {}
func (r *testStatsReporter) Flush() {
atomic.AddInt32(&r.flushes, 1)
}

func TestWriteTimerImmediately(t *testing.T) {
r := newTestStatsReporter()
s, _ := NewRootScope(ScopeOptions{Reporter: r}, 0)
s, closer := NewRootScope(ScopeOptions{Reporter: r}, 0)
defer closer.Close()
r.tg.Add(1)
s.Timer("ticky").Record(time.Millisecond * 175)
r.tg.Wait()
}

func TestWriteTimerClosureImmediately(t *testing.T) {
r := newTestStatsReporter()
s, _ := NewRootScope(ScopeOptions{Reporter: r}, 0)
s, closer := NewRootScope(ScopeOptions{Reporter: r}, 0)
defer closer.Close()
r.tg.Add(1)
tm := s.Timer("ticky")
tm.Start().Stop()
Expand All @@ -269,8 +276,8 @@ func TestWriteTimerClosureImmediately(t *testing.T) {

func TestWriteReportLoop(t *testing.T) {
r := newTestStatsReporter()
s, close := NewRootScope(ScopeOptions{Reporter: r}, 10)
defer close.Close()
s, closer := NewRootScope(ScopeOptions{Reporter: r}, 10)
defer closer.Close()

r.cg.Add(1)
s.Counter("bar").Inc(1)
Expand All @@ -287,8 +294,8 @@ func TestWriteReportLoop(t *testing.T) {

func TestCachedReportLoop(t *testing.T) {
r := newTestStatsReporter()
s, close := NewRootScope(ScopeOptions{CachedReporter: r}, 10)
defer close.Close()
s, closer := NewRootScope(ScopeOptions{CachedReporter: r}, 10)
defer closer.Close()

r.cg.Add(1)
s.Counter("bar").Inc(1)
Expand All @@ -306,7 +313,9 @@ func TestCachedReportLoop(t *testing.T) {
func TestWriteOnce(t *testing.T) {
r := newTestStatsReporter()

root, _ := NewRootScope(ScopeOptions{Reporter: r}, 0)
root, closer := NewRootScope(ScopeOptions{Reporter: r}, 0)
defer closer.Close()

s := root.(*scope)

r.cg.Add(1)
Expand Down Expand Up @@ -338,7 +347,9 @@ func TestWriteOnce(t *testing.T) {
func TestCachedReporter(t *testing.T) {
r := newTestStatsReporter()

root, _ := NewRootScope(ScopeOptions{CachedReporter: r}, 0)
root, closer := NewRootScope(ScopeOptions{CachedReporter: r}, 0)
defer closer.Close()

s := root.(*scope)

r.cg.Add(1)
Expand Down Expand Up @@ -366,7 +377,9 @@ func TestCachedReporter(t *testing.T) {
func TestRootScopeWithoutPrefix(t *testing.T) {
r := newTestStatsReporter()

root, _ := NewRootScope(ScopeOptions{Reporter: r}, 0)
root, closer := NewRootScope(ScopeOptions{Reporter: r}, 0)
defer closer.Close()

s := root.(*scope)
r.cg.Add(1)
s.Counter("bar").Inc(1)
Expand All @@ -391,7 +404,9 @@ func TestRootScopeWithoutPrefix(t *testing.T) {
func TestRootScopeWithPrefix(t *testing.T) {
r := newTestStatsReporter()

root, _ := NewRootScope(ScopeOptions{Prefix: "foo", Reporter: r}, 0)
root, closer := NewRootScope(ScopeOptions{Prefix: "foo", Reporter: r}, 0)
defer closer.Close()

s := root.(*scope)
r.cg.Add(1)
s.Counter("bar").Inc(1)
Expand All @@ -416,7 +431,9 @@ func TestRootScopeWithPrefix(t *testing.T) {
func TestRootScopeWithDifferentSeparator(t *testing.T) {
r := newTestStatsReporter()

root, _ := NewRootScope(ScopeOptions{Prefix: "foo", Separator: "_", Reporter: r}, 0)
root, closer := NewRootScope(ScopeOptions{Prefix: "foo", Separator: "_", Reporter: r}, 0)
defer closer.Close()

s := root.(*scope)
r.cg.Add(1)
s.Counter("bar").Inc(1)
Expand All @@ -441,7 +458,9 @@ func TestRootScopeWithDifferentSeparator(t *testing.T) {
func TestSubScope(t *testing.T) {
r := newTestStatsReporter()

root, _ := NewRootScope(ScopeOptions{Prefix: "foo", Reporter: r}, 0)
root, closer := NewRootScope(ScopeOptions{Prefix: "foo", Reporter: r}, 0)
defer closer.Close()

s := root.SubScope("mork").(*scope)
r.cg.Add(1)
s.Counter("bar").Inc(1)
Expand All @@ -467,7 +486,9 @@ func TestTaggedSubScope(t *testing.T) {
r := newTestStatsReporter()

ts := map[string]string{"env": "test"}
root, _ := NewRootScope(ScopeOptions{Prefix: "foo", Tags: ts, Reporter: r}, 0)
root, closer := NewRootScope(ScopeOptions{Prefix: "foo", Tags: ts, Reporter: r}, 0)
defer closer.Close()

s := root.(*scope)

tscope := root.Tagged(map[string]string{"service": "test"}).(*scope)
Expand Down Expand Up @@ -514,7 +535,8 @@ func TestTaggedExistingReturnsSameScope(t *testing.T) {
nil,
{"env": "test"},
} {
root, _ := NewRootScope(ScopeOptions{Prefix: "foo", Tags: initialTags, Reporter: r}, 0)
root, closer := NewRootScope(ScopeOptions{Prefix: "foo", Tags: initialTags, Reporter: r}, 0)
defer closer.Close()

rootScope := root.(*scope)
fooScope := root.Tagged(map[string]string{"foo": "bar"}).(*scope)
Expand Down Expand Up @@ -565,13 +587,15 @@ func TestSnapshot(t *testing.T) {

func TestCapabilities(t *testing.T) {
r := newTestStatsReporter()
s, _ := NewRootScope(ScopeOptions{Reporter: r}, 0)
s, closer := NewRootScope(ScopeOptions{Reporter: r}, 0)
defer closer.Close()
assert.True(t, s.Capabilities().Reporting())
assert.False(t, s.Capabilities().Tagging())
}

func TestCapabilitiesNoReporter(t *testing.T) {
s, _ := NewRootScope(ScopeOptions{}, 0)
s, closer := NewRootScope(ScopeOptions{}, 0)
defer closer.Close()
assert.False(t, s.Capabilities().Reporting())
assert.False(t, s.Capabilities().Tagging())
}
Expand All @@ -583,7 +607,7 @@ func TestNilTagMerge(t *testing.T) {
func TestScopeDefaultBuckets(t *testing.T) {
r := newTestStatsReporter()

root, _ := NewRootScope(ScopeOptions{
root, closer := NewRootScope(ScopeOptions{
DefaultBuckets: DurationBuckets{
0 * time.Millisecond,
30 * time.Millisecond,
Expand All @@ -593,6 +617,8 @@ func TestScopeDefaultBuckets(t *testing.T) {
},
Reporter: r,
}, 0)
defer closer.Close()

s := root.(*scope)
r.hg.Add(2)
s.Histogram("baz", DefaultBuckets).RecordDuration(42 * time.Millisecond)
Expand All @@ -619,7 +645,9 @@ func newTestMets(scope Scope) testMets {
func TestReturnByValue(t *testing.T) {
r := newTestStatsReporter()

root, _ := NewRootScope(ScopeOptions{Reporter: r}, 0)
root, closer := NewRootScope(ScopeOptions{Reporter: r}, 0)
defer closer.Close()

s := root.(*scope)
mets := newTestMets(s)

Expand All @@ -630,3 +658,18 @@ func TestReturnByValue(t *testing.T) {

assert.EqualValues(t, 3, r.counters["honk"].val)
}

func TestScopeAvoidReportLoopRunOnClose(t *testing.T) {
r := newTestStatsReporter()
root, closer := NewRootScope(ScopeOptions{Reporter: r}, 0)

s := root.(*scope)
s.reportLoopRun()

assert.Equal(t, int32(1), atomic.LoadInt32(&r.flushes))

assert.NoError(t, closer.Close())

s.reportLoopRun()
assert.Equal(t, int32(1), atomic.LoadInt32(&r.flushes))
}

0 comments on commit 34be4a5

Please sign in to comment.