Skip to content

Commit

Permalink
add custom tags to internal metrics (#231)
Browse files Browse the repository at this point in the history
* add custom tags to internal metrics

* make unit test stricter, update version
  • Loading branch information
shaan420 committed Oct 6, 2023
1 parent 9e0a0b8 commit d083654
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 0 deletions.
5 changes: 5 additions & 0 deletions m3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type Configuration struct {
// HistogramBucketTagPrecision is precision to use when formatting the metric tag
// with the histogram bucket bound values.
HistogramBucketTagPrecision uint `yaml:"histogramBucketTagPrecision"`

// CommonTagsInternal are tags that should be added to all internal metrics
// emitted by the reporter.
CommonTagsInternal map[string]string `yaml:"commonTagsInternal"`
}

// NewReporter creates a new M3 reporter from this configuration.
Expand All @@ -66,5 +70,6 @@ func (c Configuration) NewReporter() (Reporter, error) {
MaxPacketSizeBytes: c.PacketSize,
IncludeHost: c.IncludeHost,
HistogramBucketTagPrecision: c.HistogramBucketTagPrecision,
InternalTags: c.CommonTagsInternal,
})
}
1 change: 1 addition & 0 deletions m3/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func TestConfigSimple(t *testing.T) {
assert.True(t, ok)
assert.True(t, tagEquals(reporter.commonTags, "service", "my-service"))
assert.True(t, tagEquals(reporter.commonTags, "env", "test"))
assert.Equal(t, 0, len(c.CommonTagsInternal))
}

func TestConfigMulti(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions m3/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ type Options struct {
HistogramBucketIDName string
HistogramBucketName string
HistogramBucketTagPrecision uint
InternalTags map[string]string
}

// NewReporter creates a new M3 reporter.
Expand Down Expand Up @@ -288,6 +289,11 @@ func NewReporter(opts Options) (Reporter, error) {
internalTags := map[string]string{
"version": tally.Version,
}

for k, v := range opts.InternalTags {
internalTags[k] = v
}

r.batchSizeHistogram = r.AllocateHistogram("tally.internal.batch-size", internalTags, buckets)
r.numBatchesCounter = r.AllocateCounter("tally.internal.num-batches", internalTags)
r.numMetricsCounter = r.AllocateCounter("tally.internal.num-metrics", internalTags)
Expand Down
59 changes: 59 additions & 0 deletions m3/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,58 @@ func TestReporterResetTagsAfterReturnToPool(t *testing.T) {
require.Equal(t, 0, len(filtered[1].GetTags()))
}

func TestReporterCommmonTagsInternal(t *testing.T) {
var wg sync.WaitGroup
server := newFakeM3Server(t, &wg, false, Compact)
go server.Serve()
defer server.Close()

internalTags := map[string]string{
"internal1": "test1",
"internal2": "test2",
}

r, err := NewReporter(Options{
HostPorts: []string{server.Addr},
Service: "test-service",
CommonTags: defaultCommonTags,
MaxQueueSize: queueSize,
IncludeHost: true,
MaxPacketSizeBytes: maxPacketSize,
InternalTags: internalTags,
})
require.NoError(t, err)
defer r.Close()

c := r.AllocateCounter("testCounter1", nil)
c.ReportCount(1)
wg.Add(internalMetrics + 1)
r.Flush()
wg.Wait()

numInternalMetricsActual := 0
metrics := server.Service.getMetrics()
require.Equal(t, internalMetrics+1, len(metrics))
for _, metric := range metrics {
if strings.HasPrefix(metric.Name, "tally.internal") {
numInternalMetricsActual++
for k, v := range internalTags {
require.True(t, tagEquals(metric.Tags, k, v))
}
} else {
require.Equal(t, "testCounter1", metric.Name)
require.False(t, tagIncluded(metric.Tags, "internal1"))
require.False(t, tagIncluded(metric.Tags, "internal2"))
}
// The following tags should not be present as part of the individual metrics
// as they are common tags.
require.False(t, tagIncluded(metric.Tags, "host"))
require.False(t, tagIncluded(metric.Tags, "instance"))
require.False(t, tagIncluded(metric.Tags, "service"))
}
require.Equal(t, internalMetrics, numInternalMetricsActual)
}

func TestReporterHasReportingAndTaggingCapability(t *testing.T) {
r, err := NewReporter(Options{
HostPorts: []string{"127.0.0.1:9052"},
Expand Down Expand Up @@ -587,6 +639,13 @@ type fakeM3ServerPackets struct {
values [][]byte
}

// newFakeM3Server creates a new fake M3 server that listens on a random port
// and returns the server.
// The server will wait for the given wait group to be done before returning.
// If countBatches is true, the server will wait consider the wg.Add()s to be
// representing batches and will do a eg.Done() for each encountered batch.
// But if countBatches is false, the server will do the same thing but for individual
// metrics instead of batches.
func newFakeM3Server(t *testing.T, wg *sync.WaitGroup, countBatches bool, protocol Protocol) *fakeM3Server {
service := newFakeM3Service(wg, countBatches)
processor := m3thrift.NewM3Processor(service)
Expand Down

0 comments on commit d083654

Please sign in to comment.