Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable configurable TTL-based scope/metric deallocation #150

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
language: go
sudo: false
go:
- 1.13.x
- 1.14.x
- tip
- 1.15.x
- 1.16.x
env:
global:
- GO15VENDOREXPERIMENT=1
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ export GO15VENDOREXPERIMENT=1

BENCH_FLAGS ?= -cpuprofile=cpu.pprof -memprofile=mem.pprof -benchmem
PKGS ?= $(shell glide novendor)
PKG_FILES ?= *.go example/*.go m3
PKG_FILES ?= *.go example/*.go m3/*.go m3/customtransports m3/thriftudp
LINT_IGNORE = m3/thrift\|thirdparty
LICENSE_IGNORE = thirdparty

Expand Down
18 changes: 10 additions & 8 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import:
version: ^1
- package: github.com/pkg/errors
version: ^0.8.1
- package: github.com/twmb/murmur3
version: ^1.1.5
testImport:
- package: github.com/axw/gocov
version: 54b98cfcac0c63fb3f9bd8e7ad241b724d4e985b
Expand Down
115 changes: 77 additions & 38 deletions histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ var (
errBucketsCountNeedsGreaterThanZero = errors.New("n needs to be > 0")
errBucketsStartNeedsGreaterThanZero = errors.New("start needs to be > 0")
errBucketsFactorNeedsGreaterThanOne = errors.New("factor needs to be > 1")

_singleBucket = bucketPair{
lowerBoundDuration: time.Duration(math.MinInt64),
upperBoundDuration: time.Duration(math.MaxInt64),
lowerBoundValue: -math.MaxFloat64,
upperBoundValue: math.MaxFloat64,
}
)

// ValueBuckets is a set of float64 values that implements Buckets.
Expand Down Expand Up @@ -119,73 +126,105 @@ func (v DurationBuckets) AsDurations() []time.Duration {
return []time.Duration(v)
}

func newBucketPair(
htype histogramType,
durations []time.Duration,
prevDuration time.Duration,
values []float64,
prevValue float64,
upperBoundIndex int,
) bucketPair {
var pair bucketPair

switch htype {
case durationHistogramType:
pair = bucketPair{
lowerBoundDuration: prevDuration,
upperBoundDuration: durations[upperBoundIndex],
}
case valueHistogramType:
pair = bucketPair{
lowerBoundValue: prevValue,
upperBoundValue: values[upperBoundIndex],
}
default:
// nop
}

return pair
}

// BucketPairs creates a set of bucket pairs from a set
// of buckets describing the lower and upper bounds for
// each derived bucket.
func BucketPairs(buckets Buckets) []BucketPair {
htype := valueHistogramType
if _, ok := buckets.(DurationBuckets); ok {
htype = durationHistogramType
}

if buckets == nil || buckets.Len() < 1 {
return []BucketPair{

bucketPair{
lowerBoundValue: -math.MaxFloat64,
upperBoundValue: math.MaxFloat64,
lowerBoundDuration: time.Duration(math.MinInt64),
upperBoundDuration: time.Duration(math.MaxInt64),
},
}
return []BucketPair{_singleBucket}
}

var (
asValueBuckets = copyAndSortValues(buckets.AsValues())
asDurationBuckets = copyAndSortDurations(buckets.AsDurations())
pairs = make([]BucketPair, 0, buckets.Len()+2)
values []float64
durations []time.Duration
prevDuration = _singleBucket.lowerBoundDuration
prevValue = _singleBucket.lowerBoundValue
pairs = make([]BucketPair, 0, buckets.Len()+2)
pair bucketPair
)

pairs = append(pairs, bucketPair{
lowerBoundValue: -math.MaxFloat64,
upperBoundValue: asValueBuckets[0],
lowerBoundDuration: time.Duration(math.MinInt64),
upperBoundDuration: asDurationBuckets[0],
})
switch htype {
case durationHistogramType:
durations = copyAndSortDurations(buckets.AsDurations())
pair.lowerBoundDuration = prevDuration
pair.upperBoundDuration = durations[0]
case valueHistogramType:
values = copyAndSortValues(buckets.AsValues())
pair.lowerBoundValue = prevValue
pair.upperBoundValue = values[0]
default:
// n.b. This branch will never be executed because htype is only ever
// one of two values.
panic("unsupported histogram type")
}

prevValueBucket, prevDurationBucket :=
asValueBuckets[0], asDurationBuckets[0]
pairs = append(pairs, pair)
prevDuration = pairs[0].UpperBoundDuration()
prevValue = pairs[0].UpperBoundValue()

for i := 1; i < buckets.Len(); i++ {
pairs = append(pairs, bucketPair{
lowerBoundValue: prevValueBucket,
upperBoundValue: asValueBuckets[i],
lowerBoundDuration: prevDurationBucket,
upperBoundDuration: asDurationBuckets[i],
})
prevValueBucket, prevDurationBucket =
asValueBuckets[i], asDurationBuckets[i]
pairs = append(
pairs,
newBucketPair(htype, durations, prevDuration, values, prevValue, i),
)

prevValue = pairs[i].UpperBoundValue()
prevDuration = pairs[i].UpperBoundDuration()
}

pairs = append(pairs, bucketPair{
lowerBoundValue: prevValueBucket,
upperBoundValue: math.MaxFloat64,
lowerBoundDuration: prevDurationBucket,
upperBoundDuration: time.Duration(math.MaxInt64),
lowerBoundValue: prevValue,
upperBoundValue: _singleBucket.upperBoundValue,
lowerBoundDuration: prevDuration,
upperBoundDuration: _singleBucket.upperBoundDuration,
})

return pairs
}

func copyAndSortValues(values []float64) []float64 {
valuesCopy := make([]float64, len(values))
for i := range values {
valuesCopy[i] = values[i]
}
copy(valuesCopy, values)
sort.Sort(ValueBuckets(valuesCopy))
return valuesCopy
}

func copyAndSortDurations(durations []time.Duration) []time.Duration {
durationsCopy := make([]time.Duration, len(durations))
for i := range durations {
durationsCopy[i] = durations[i]
}
copy(durationsCopy, durations)
sort.Sort(DurationBuckets(durationsCopy))
return durationsCopy
}
Expand Down
83 changes: 83 additions & 0 deletions internal/identity/accumulator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package identity

import (
"github.com/twmb/murmur3"
)

const (
_hashSeed uint64 = 23
_hashFold uint64 = 31
)

// Accumulator is a commutative folding accumulator.
type Accumulator uint64

// NewAccumulator creates a new Accumulator with a default seed value.
//
// n.b. Here and elsewhere, we use nosplit to avoid stack size checks, which
// are unnecessary as memory width is bounded to each instance of `a` (a
// uint64) and, potentially, a single stack-local loop temporary while
// iterating.
func NewAccumulator() Accumulator {
return Accumulator(_hashSeed)
}

// NewAccumulatorWithSeed creates a new Accumulator with the provided seed value.
func NewAccumulatorWithSeed(seed uint64) Accumulator {
return Accumulator(seed)
}

// AddString hashes str and folds it into the accumulator.
func (a Accumulator) AddString(str string) Accumulator {
return a + (Accumulator(murmur3.StringSum64(str)) * Accumulator(_hashFold))
}

// AddStrings serially hashes and folds each of strs into the accumulator.
//go:nosplit
func (a Accumulator) AddStrings(strs ...string) Accumulator {
for _, str := range strs {
a += (Accumulator(murmur3.StringSum64(str)) * Accumulator(_hashFold))
}

return a
}

// AddUint64 folds u64 into the accumulator.
func (a Accumulator) AddUint64(u64 uint64) Accumulator {
return a + Accumulator(u64*_hashFold)
}

// AddUint64s serially folds each of u64s into the accumulator.
//go:nosplit
func (a Accumulator) AddUint64s(u64s ...uint64) Accumulator {
for _, u64 := range u64s {
a += Accumulator(u64 * _hashFold)
}

return a
}

// Value returns the accumulated value.
func (a Accumulator) Value() uint64 {
return uint64(a)
}
1 change: 1 addition & 0 deletions m3/customtransports/buffered_read_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestTBufferedReadTransport(t *testing.T) {
secondRead := make([]byte, 7)
n, err = trans.Read(secondRead)
require.Equal(t, 6, n)
require.NoError(t, err)
require.Equal(t, []byte("String"), secondRead[0:6])
require.Equal(t, uint64(0), trans.RemainingBytes())
}
Expand Down
7 changes: 5 additions & 2 deletions m3/example/local_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

"github.com/uber-go/tally/m3"
customtransport "github.com/uber-go/tally/m3/customtransports"
m3thrift "github.com/uber-go/tally/m3/thrift"
m3thrift "github.com/uber-go/tally/m3/thrift/v1"
"github.com/uber-go/tally/thirdparty/github.com/apache/thrift/lib/go/thrift"
)

Expand Down Expand Up @@ -86,7 +86,10 @@ func (f *localM3Server) Serve() error {
} else {
proto = thrift.NewTBinaryProtocolTransport(trans)
}
f.processor.Process(proto, proto)

if _, err = f.processor.Process(proto, proto); err != nil {
fmt.Println("Error processing thrift metric:", err)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion m3/example/m3_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

"github.com/uber-go/tally"
"github.com/uber-go/tally/m3"
m3thrift "github.com/uber-go/tally/m3/thrift"
m3thrift "github.com/uber-go/tally/m3/thrift/v1"

validator "gopkg.in/validator.v2"
yaml "gopkg.in/yaml.v2"
Expand Down
Loading