Skip to content

Commit

Permalink
Fix TestBatchSizes failing on Travis for M3 reporter (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington authored Dec 7, 2017
1 parent f34b223 commit 6c46316
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 83 deletions.
18 changes: 0 additions & 18 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,6 @@ PKGS ?= $(shell glide novendor)
PKG_FILES ?= *.go example/*.go m3
LINT_IGNORE = m3/thrift

# The linting tools evolve with each Go version, so run them only on the latest
# stable release.
GO_VERSION := $(shell go version | cut -d " " -f 3)
GO_MINOR_VERSION := $(word 2,$(subst ., ,$(GO_VERSION)))
LINTABLE_MINOR_VERSIONS := 6 7
ifneq ($(filter $(LINTABLE_MINOR_VERSIONS),$(GO_MINOR_VERSION)),)
SHOULD_LINT := true
endif


.PHONY: all
all: lint test

Expand All @@ -26,16 +16,11 @@ dependencies:
@echo "Installing test dependencies..."
go install ./vendor/github.com/axw/gocov/gocov
go install ./vendor/github.com/mattn/goveralls
ifdef SHOULD_LINT
@echo "Installing golint..."
go install ./vendor/github.com/golang/lint/golint
else
@echo "Not installing golint, since we don't expect to lint on" $(GO_VERSION)
endif

.PHONY: lint
lint:
ifdef SHOULD_LINT
@rm -rf lint.log
@echo "Checking formatting..."
@gofmt -d -s $(PKG_FILES) 2>&1 | grep -v $(LINT_IGNORE) | tee lint.log
Expand All @@ -50,9 +35,6 @@ ifdef SHOULD_LINT
@echo "Checking for license headers..."
@./check_license.sh | tee -a lint.log
@[ ! -s lint.log ]
else
@echo "Skipping linters on" $(GO_VERSION)
endif

.PHONY: test
test:
Expand Down
4 changes: 2 additions & 2 deletions histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func TestMustMakeExponentialDurationBucketsPanicsOnBadFactor(t *testing.T) {
func TestBucketPairsNoRaceWhenSorted(t *testing.T) {
buckets := DurationBuckets{}
for i := 0; i < 99; i++ {
buckets = append(buckets, time.Duration(i) * time.Second)
buckets = append(buckets, time.Duration(i)*time.Second)
}
newPair := func() {
pairs := BucketPairs(buckets)
Expand All @@ -183,7 +183,7 @@ func TestBucketPairsNoRaceWhenSorted(t *testing.T) {
func TestBucketPairsNoRaceWhenUnsorted(t *testing.T) {
buckets := DurationBuckets{}
for i := 100; i > 1; i-- {
buckets = append(buckets, time.Duration(i) * time.Second)
buckets = append(buckets, time.Duration(i)*time.Second)
}
newPair := func() {
pairs := BucketPairs(buckets)
Expand Down
94 changes: 31 additions & 63 deletions m3/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,17 +350,17 @@ func TestReporterHistogram(t *testing.T) {
}

func TestBatchSizes(t *testing.T) {
server := newSimpleServer(t)
go server.serve()
defer server.close()
server := newFakeM3Server(t, nil, false, Compact)
go server.Serve()
defer server.Close()

commonTags := map[string]string{
"env": "test",
"domain": "pod" + strconv.Itoa(rand.Intn(100)),
}
maxPacketSize := int32(1440)
r, err := NewReporter(Options{
HostPorts: []string{server.addr()},
HostPorts: []string{server.Addr},
Service: "test-service",
CommonTags: commonTags,
MaxQueueSize: 10000,
Expand Down Expand Up @@ -409,12 +409,12 @@ func TestBatchSizes(t *testing.T) {
r.Close()
}()

for len(server.getPackets()) < 100 {
for len(server.Packets()) < 100 {
time.Sleep(shortInterval)
}

atomic.StoreUint32(&stop, 1)
for _, packet := range server.getPackets() {
for _, packet := range server.Packets() {
require.True(t, len(packet) < int(maxPacketSize))
}
}
Expand Down Expand Up @@ -494,61 +494,6 @@ func TestReporterHasReportingAndTaggingCapability(t *testing.T) {
assert.True(t, r.Capabilities().Tagging())
}

type simpleServer struct {
conn *net.UDPConn
t *testing.T
packets [][]byte
sync.Mutex
closed int32
}

func newSimpleServer(t *testing.T) *simpleServer {
addr, err := net.ResolveUDPAddr("udp", ":0")
require.NoError(t, err)

conn, err := net.ListenUDP(addr.Network(), addr)
require.NoError(t, err)

return &simpleServer{conn: conn, t: t}
}

func (s *simpleServer) serve() {
readBuf := make([]byte, 64000)
for atomic.LoadInt32(&s.closed) == 0 {
n, err := s.conn.Read(readBuf)
if err != nil {
if atomic.LoadInt32(&s.closed) == 0 {
s.t.Errorf("FakeM3Server failed to Read: %v", err)
}
return
}
s.Lock()
s.packets = append(s.packets, readBuf[0:n])
s.Unlock()
readBuf = make([]byte, 64000)
}
}

func (s *simpleServer) getPackets() [][]byte {
s.Lock()
defer s.Unlock()
copy := make([][]byte, len(s.packets))
for i, packet := range s.packets {
copy[i] = packet
}

return copy
}

func (s *simpleServer) close() error {
atomic.AddInt32(&s.closed, 1)
return s.conn.Close()
}

func (s *simpleServer) addr() string {
return s.conn.LocalAddr().String()
}

type fakeM3Server struct {
t *testing.T
Service *fakeM3Service
Expand All @@ -557,6 +502,12 @@ type fakeM3Server struct {
processor thrift.TProcessor
conn *net.UDPConn
closed int32
packets fakeM3ServerPackets
}

type fakeM3ServerPackets struct {
sync.RWMutex
values [][]byte
}

func newFakeM3Server(t *testing.T, wg *sync.WaitGroup, countBatches bool, protocol Protocol) *fakeM3Server {
Expand Down Expand Up @@ -585,6 +536,11 @@ func (f *fakeM3Server) Serve() {
}
return
}

f.packets.Lock()
f.packets.values = append(f.packets.values, readBuf[0:n])
f.packets.Unlock()

trans, _ := customtransport.NewTBufferedReadTransport(bytes.NewBuffer(readBuf[0:n]))
var proto thrift.TProtocol
if f.protocol == Compact {
Expand All @@ -601,6 +557,18 @@ func (f *fakeM3Server) Close() error {
return f.conn.Close()
}

func (f *fakeM3Server) Packets() [][]byte {
f.packets.Lock()
defer f.packets.Unlock()

copy := make([][]byte, len(f.packets.values))
for i, packet := range f.packets.values {
copy[i] = packet
}

return copy
}

func newFakeM3Service(wg *sync.WaitGroup, countBatches bool) *fakeM3Service {
return &fakeM3Service{wg: wg, countBatches: countBatches}
}
Expand Down Expand Up @@ -628,13 +596,13 @@ func (m *fakeM3Service) getMetrics() []*m3thrift.Metric {
func (m *fakeM3Service) EmitMetricBatch(batch *m3thrift.MetricBatch) (err error) {
m.lock.Lock()
m.batches = append(m.batches, batch)
if m.countBatches {
if m.wg != nil && m.countBatches {
m.wg.Done()
}

for _, metric := range batch.Metrics {
m.metrics = append(m.metrics, metric)
if !m.countBatches {
if m.wg != nil && !m.countBatches {
m.wg.Done()
}
}
Expand Down

0 comments on commit 6c46316

Please sign in to comment.