Skip to content

Commit

Permalink
Merge branch 'uber-go:v3' into v3
Browse files Browse the repository at this point in the history
  • Loading branch information
brawndou authored Jan 23, 2024
2 parents 2ad2a13 + 5d0c8e8 commit 99a9aac
Show file tree
Hide file tree
Showing 15 changed files with 473 additions and 44 deletions.
10 changes: 4 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
language: go
sudo: false
go:
- 1.14.x
- 1.15.x
- 1.16.x
env:
global:
- GO15VENDOREXPERIMENT=1
- 1.18.x
- 1.19.x
- 1.20.x
- 1.21.x
cache:
directories:
- vendor
Expand Down
7 changes: 7 additions & 0 deletions generate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package tally

import (
_ "github.com/golang/mock/mockgen/model"
)

//go:generate mockgen -package tallymock -destination tallymock/stats_reporter.go -imports github.com/uber-go/tally github.com/uber-go/tally StatsReporter
31 changes: 22 additions & 9 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import:
version: ^1.27.0
- package: github.com/golang/protobuf
version: ^1.5.2
- package: github.com/golang/mock
testImport:
- package: github.com/axw/gocov
version: 54b98cfcac0c63fb3f9bd8e7ad241b724d4e985b
Expand Down
5 changes: 5 additions & 0 deletions m3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type Configuration struct {
// HistogramBucketTagPrecision is precision to use when formatting the metric tag
// with the histogram bucket bound values.
HistogramBucketTagPrecision uint `yaml:"histogramBucketTagPrecision"`

// InternalTags are tags that should be added to all internal metrics
// emitted by the reporter.
InternalTags map[string]string `yaml:"internalTags"`
}

// NewReporter creates a new M3 reporter from this configuration.
Expand All @@ -66,5 +70,6 @@ func (c Configuration) NewReporter() (Reporter, error) {
MaxPacketSizeBytes: c.PacketSize,
IncludeHost: c.IncludeHost,
HistogramBucketTagPrecision: c.HistogramBucketTagPrecision,
InternalTags: c.InternalTags,
})
}
1 change: 1 addition & 0 deletions m3/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func TestConfigSimple(t *testing.T) {
assert.True(t, ok)
assert.True(t, tagEquals(reporter.commonTags, "service", "my-service"))
assert.True(t, tagEquals(reporter.commonTags, "env", "test"))
assert.Equal(t, 0, len(c.InternalTags))
}

func TestConfigMulti(t *testing.T) {
Expand Down
12 changes: 11 additions & 1 deletion m3/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ const (
DefaultHistogramBucketIDName = "bucketid"
// DefaultHistogramBucketName is the default histogram bucket name tag name
DefaultHistogramBucketName = "bucket"
// DefaultTagRedactValue is the default tag value to use when redacting
DefaultTagRedactValue = "global"
// DefaultHistogramBucketTagPrecision is the default
// precision to use when formatting the metric tag
// with the histogram bucket bound values.
Expand Down Expand Up @@ -149,6 +151,7 @@ type Options struct {
HistogramBucketIDName string
HistogramBucketName string
HistogramBucketTagPrecision uint
InternalTags map[string]string
}

// NewReporter creates a new M3 reporter.
Expand Down Expand Up @@ -286,8 +289,15 @@ func NewReporter(opts Options) (Reporter, error) {
}

internalTags := map[string]string{
"version": tally.Version,
"version": tally.Version,
"host": DefaultTagRedactValue,
"instance": DefaultTagRedactValue,
}

for k, v := range opts.InternalTags {
internalTags[k] = v
}

r.batchSizeHistogram = r.AllocateHistogram("tally.internal.batch-size", internalTags, buckets)
r.numBatchesCounter = r.AllocateCounter("tally.internal.num-batches", internalTags)
r.numMetricsCounter = r.AllocateCounter("tally.internal.num-metrics", internalTags)
Expand Down
62 changes: 62 additions & 0 deletions m3/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,61 @@ func TestReporterResetTagsAfterReturnToPool(t *testing.T) {
require.Equal(t, 0, len(filtered[1].GetTags()))
}

func TestReporterCommmonTagsInternal(t *testing.T) {
var wg sync.WaitGroup
server := newFakeM3Server(t, &wg, false, Compact)
go server.Serve()
defer server.Close()

internalTags := map[string]string{
"internal1": "test1",
"internal2": "test2",
}

r, err := NewReporter(Options{
HostPorts: []string{server.Addr},
Service: "test-service",
CommonTags: defaultCommonTags,
MaxQueueSize: queueSize,
IncludeHost: true,
MaxPacketSizeBytes: maxPacketSize,
InternalTags: internalTags,
})
require.NoError(t, err)
defer r.Close()

c := r.AllocateCounter("testCounter1", nil)
c.ReportCount(1)
wg.Add(internalMetrics + 1)
r.Flush()
wg.Wait()

numInternalMetricsActual := 0
metrics := server.Service.getMetrics()
require.Equal(t, internalMetrics+1, len(metrics))
for _, metric := range metrics {
if strings.HasPrefix(metric.Name, "tally.internal") {
numInternalMetricsActual++
for k, v := range internalTags {
require.True(t, tagEquals(metric.Tags, k, v))
}

// The following tags should be redacted.
require.True(t, tagEquals(metric.Tags, "host", DefaultTagRedactValue))
require.True(t, tagEquals(metric.Tags, "instance", DefaultTagRedactValue))
} else {
require.Equal(t, "testCounter1", metric.Name)
require.False(t, tagIncluded(metric.Tags, "internal1"))
require.False(t, tagIncluded(metric.Tags, "internal2"))
}

// The following tags should not be present as part of the individual metrics
// as they are common tags.
require.False(t, tagIncluded(metric.Tags, "service"))
}
require.Equal(t, internalMetrics, numInternalMetricsActual)
}

func TestReporterHasReportingAndTaggingCapability(t *testing.T) {
r, err := NewReporter(Options{
HostPorts: []string{"127.0.0.1:9052"},
Expand Down Expand Up @@ -587,6 +642,13 @@ type fakeM3ServerPackets struct {
values [][]byte
}

// newFakeM3Server creates a new fake M3 server that listens on a random port
// and returns the server.
// The server will wait for the given wait group to be done before returning.
// If countBatches is true, the server will wait consider the wg.Add()s to be
// representing batches and will do a eg.Done() for each encountered batch.
// But if countBatches is false, the server will do the same thing but for individual
// metrics instead of batches.
func newFakeM3Server(t *testing.T, wg *sync.WaitGroup, countBatches bool, protocol Protocol) *fakeM3Server {
service := newFakeM3Service(wg, countBatches)
processor := m3thrift.NewM3Processor(service)
Expand Down
35 changes: 25 additions & 10 deletions scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ const (
// OmitInternalMetrics turns off internal metrics submission.
OmitInternalMetrics

_defaultInitialSliceSize = 16
_defaultInitialSliceSize = 16
_defaultReportingInterval = 2 * time.Second
)

var (
Expand Down Expand Up @@ -101,19 +102,22 @@ type scope struct {
done chan struct{}
wg sync.WaitGroup
root bool
testScope bool
}

// ScopeOptions is a set of options to construct a scope.
type ScopeOptions struct {
Tags map[string]string
Prefix string
Reporter StatsReporter
CachedReporter CachedStatsReporter
Separator string
DefaultBuckets Buckets
SanitizeOptions *SanitizeOptions
Tags map[string]string
Prefix string
Reporter StatsReporter
CachedReporter CachedStatsReporter
Separator string
DefaultBuckets Buckets
SanitizeOptions *SanitizeOptions
MetricsOption InternalMetricOption

testScope bool
registryShardCount uint
MetricsOption InternalMetricOption
}

// NewRootScope creates a new root Scope with a set of options and
Expand All @@ -124,14 +128,24 @@ func NewRootScope(opts ScopeOptions, interval time.Duration) (Scope, io.Closer)
return s, s
}

// NewRootScopeWithDefaultInterval invokes NewRootScope with the default
// reporting interval of 2s.
func NewRootScopeWithDefaultInterval(opts ScopeOptions) (Scope, io.Closer) {
return NewRootScope(opts, _defaultReportingInterval)
}

// NewTestScope creates a new Scope without a stats reporter with the
// given prefix and adds the ability to take snapshots of metrics emitted
// to it.
func NewTestScope(
prefix string,
tags map[string]string,
) TestScope {
return newRootScope(ScopeOptions{Prefix: prefix, Tags: tags}, 0)
return newRootScope(ScopeOptions{
Prefix: prefix,
Tags: tags,
testScope: true,
}, 0)
}

func newRootScope(opts ScopeOptions, interval time.Duration) *scope {
Expand Down Expand Up @@ -176,6 +190,7 @@ func newRootScope(opts ScopeOptions, interval time.Duration) *scope {
separator: sanitizer.Name(opts.Separator),
timers: make(map[string]*timer),
root: true,
testScope: opts.testScope,
}

// NB(r): Take a copy of the tags on creation
Expand Down
60 changes: 47 additions & 13 deletions scope_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,23 +152,56 @@ func (r *scopeRegistry) Subscope(parent *scope, prefix string, tags map[string]s
// as the memory layout of []byte is a superset of string the below casting is safe and does not do any alloc
// However it cannot be used outside of the stack; a heap allocation is needed if that string needs to be stored
// in the map as a key
if s, ok := r.lockedLookup(subscopeBucket, *(*string)(unsafe.Pointer(&buf))); ok {
subscopeBucket.mu.RUnlock()
return s
var (
unsanitizedKey = *(*string)(unsafe.Pointer(&buf))
sanitizedKey string
)

s, ok := r.lockedLookup(subscopeBucket, unsanitizedKey)
if ok {
// If this subscope isn't closed or is a test scope, return it.
// Otherwise, report it immediately and delete it so that a new
// (functional) scope can be returned instead.
if !s.closed.Load() || s.testScope {
subscopeBucket.mu.RUnlock()
return s
}

switch {
case parent.reporter != nil:
s.report(parent.reporter)
case parent.cachedReporter != nil:
s.cachedReport()
}
}
subscopeBucket.mu.RUnlock()

// heap allocating the buf as a string to keep the key in the subscopes map
preSanitizeKey := string(buf)
tags = parent.copyAndSanitizeMap(tags)
key := scopeRegistryKey(prefix, parent.tags, tags)
sanitizedKey = scopeRegistryKey(prefix, parent.tags, tags)

// If a scope was found above but we didn't return, we need to remove the
// scope from both keys.
if ok {
r.removeWithRLock(subscopeBucket, unsanitizedKey)
r.removeWithRLock(subscopeBucket, sanitizedKey)
s.clearMetrics()
}

subscopeBucket.mu.RUnlock()

// Force-allocate the unsafe string as a safe string. Note that neither
// string(x) nor x+"" will have the desired effect (the former is a nop,
// and the latter will likely be elided), so append a new character and
// truncate instead.
//
// ref: https://go.dev/play/p/sxhExUKSxCw
unsanitizedKey = (unsanitizedKey + ".")[:len(unsanitizedKey)]

subscopeBucket.mu.Lock()
defer subscopeBucket.mu.Unlock()

if s, ok := r.lockedLookup(subscopeBucket, key); ok {
if _, ok = r.lockedLookup(subscopeBucket, preSanitizeKey); !ok {
subscopeBucket.s[preSanitizeKey] = s
if s, ok := r.lockedLookup(subscopeBucket, sanitizedKey); ok {
if _, ok = r.lockedLookup(subscopeBucket, unsanitizedKey); !ok {
subscopeBucket.s[unsanitizedKey] = s
}
return s
}
Expand Down Expand Up @@ -196,10 +229,11 @@ func (r *scopeRegistry) Subscope(parent *scope, prefix string, tags map[string]s
timers: make(map[string]*timer),
bucketCache: parent.bucketCache,
done: make(chan struct{}),
testScope: parent.testScope,
}
subscopeBucket.s[key] = subscope
if _, ok := r.lockedLookup(subscopeBucket, preSanitizeKey); !ok {
subscopeBucket.s[preSanitizeKey] = subscope
subscopeBucket.s[sanitizedKey] = subscope
if _, ok := r.lockedLookup(subscopeBucket, unsanitizedKey); !ok {
subscopeBucket.s[unsanitizedKey] = subscope
}
return subscope
}
Expand Down
Loading

0 comments on commit 99a9aac

Please sign in to comment.