Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] replace sync.Map with lru.Cache #158

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ module github.com/lyft/gostats

go 1.18

require github.com/kelseyhightower/envconfig v1.4.0
require (
github.com/hashicorp/golang-lru/v2 v2.0.3
github.com/kelseyhightower/envconfig v1.4.0
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
github.com/hashicorp/golang-lru/v2 v2.0.3 h1:kmRrRLlInXvng0SmLxmQpQkpbYAvcXm7NPDrgxJa9mE=
github.com/hashicorp/golang-lru/v2 v2.0.3/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
55 changes: 37 additions & 18 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync/atomic"
"time"

lru "github.com/hashicorp/golang-lru/v2"

tagspkg "github.com/lyft/gostats/internal/tags"
)

Expand Down Expand Up @@ -214,7 +216,14 @@ type StatGenerator interface {
// NewStore returns an Empty store that flushes to Sink passed as an argument.
// Note: the export argument is unused.
func NewStore(sink Sink, _ bool) Store {
return &statStore{sink: sink}
s := &statStore{sink: sink}

// lru.NewWithEvict can only return a non-nil error when cacheSize < 0.
cacheSize := 65536
s.counters, _ = lru.NewWithEvict(cacheSize, s.flushCounter)
s.timers, _ = lru.New[string, *timer](cacheSize)

return s
}

// NewDefaultStore returns a Store with a TCP statsd sink, and a running flush timer.
Expand Down Expand Up @@ -336,9 +345,11 @@ func (ts *timespan) CompleteWithDuration(value time.Duration) {
}

type statStore struct {
counters sync.Map
gauges sync.Map
timers sync.Map
counters *lru.Cache[string, *counter]
timers *lru.Cache[string, *timer]
// Gauges must not be expunged because they are client-side stateful.
// We use a sync.Map instead of a cache to ensure they are kept indefinitely.
gauges sync.Map

mu sync.RWMutex
statGenerators []StatGenerator
Expand Down Expand Up @@ -369,13 +380,15 @@ func (s *statStore) Flush() {
}
s.mu.RUnlock()

s.counters.Range(func(key, v interface{}) bool {
// do not flush counters that are set to zero
if value := v.(*counter).latch(); value != 0 {
s.sink.FlushCounter(key.(string), value)
for _, name := range s.counters.Keys() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're copying in the LRU code can we add a method that returns a slice of keys and values (this only increases the slices memory footprint by 50%)? Otherwise we have to lock/unlock a mutex and search for the value in the LRU map on each iteration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, seems reasonable.

counter, ok := s.counters.Peek(name)
if !ok {
// This counter was removed between retrieving the names
// and finding this specific counter.
continue
}
return true
})
s.flushCounter(name, counter)
}

s.gauges.Range(func(key, v interface{}) bool {
s.sink.FlushGauge(key.(string), v.(*gauge).Value())
Expand All @@ -388,6 +401,12 @@ func (s *statStore) Flush() {
}
}

func (s *statStore) flushCounter(name string, counter *counter) {
if value := counter.latch(); value != 0 {
s.sink.FlushCounter(name, value)
}
}

func (s *statStore) AddStatGenerator(statGenerator StatGenerator) {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -407,12 +426,12 @@ func (s *statStore) ScopeWithTags(name string, tags map[string]string) Scope {
}

func (s *statStore) newCounter(serializedName string) *counter {
if v, ok := s.counters.Load(serializedName); ok {
return v.(*counter)
if v, ok := s.counters.Get(serializedName); ok {
return v
}
c := new(counter)
if v, loaded := s.counters.LoadOrStore(serializedName, c); loaded {
return v.(*counter)
if v, ok, _ := s.counters.PeekOrAdd(serializedName, c); ok {
return v
}
return c
}
Expand Down Expand Up @@ -475,12 +494,12 @@ func (s *statStore) NewPerInstanceGauge(name string, tags map[string]string) Gau
}

func (s *statStore) newTimer(serializedName string, base time.Duration) *timer {
if v, ok := s.timers.Load(serializedName); ok {
return v.(*timer)
if v, ok := s.timers.Get(serializedName); ok {
return v
}
t := &timer{name: serializedName, sink: s.sink, base: base}
if v, loaded := s.timers.LoadOrStore(serializedName, t); loaded {
return v.(*timer)
if v, ok, _ := s.timers.PeekOrAdd(serializedName, t); ok {
return v
}
return t
}
Expand Down
6 changes: 3 additions & 3 deletions stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ func TestTagMapNotModified(t *testing.T) {
}

scopeGenerators := map[string]func() Scope{
"statStore": func() Scope { return &statStore{} },
"subScope": func() Scope { return newSubScope(&statStore{}, "name", nil) },
"statStore": func() Scope { return NewStore(nil, false) },
"subScope": func() Scope { return newSubScope(NewStore(nil, false).(*statStore), "name", nil) },
}

methodTestCases := map[string]TagMethod{
Expand Down Expand Up @@ -333,7 +333,7 @@ func TestPerInstanceStats(t *testing.T) {
testPerInstanceMethods := func(t *testing.T, setupScope func(Scope) Scope) {
for _, x := range testCases {
sink := mock.NewSink()
scope := setupScope(&statStore{sink: sink})
scope := setupScope(NewStore(sink, false).(*statStore))

scope.NewPerInstanceCounter("name", x.tags).Inc()
scope.NewPerInstanceGauge("name", x.tags).Inc()
Expand Down