From 6c4631652c6aab57c64f65c2e0aaec2e9aae3a64 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Thu, 7 Dec 2017 10:04:56 -0500 Subject: [PATCH] Fix TestBatchSizes failing on Travis for M3 reporter (#70) --- Makefile | 18 --------- histogram_test.go | 4 +- m3/reporter_test.go | 94 +++++++++++++++------------------------------ 3 files changed, 33 insertions(+), 83 deletions(-) diff --git a/Makefile b/Makefile index 9e81ca21..fa1161a1 100644 --- a/Makefile +++ b/Makefile @@ -5,16 +5,6 @@ PKGS ?= $(shell glide novendor) PKG_FILES ?= *.go example/*.go m3 LINT_IGNORE = m3/thrift -# The linting tools evolve with each Go version, so run them only on the latest -# stable release. -GO_VERSION := $(shell go version | cut -d " " -f 3) -GO_MINOR_VERSION := $(word 2,$(subst ., ,$(GO_VERSION))) -LINTABLE_MINOR_VERSIONS := 6 7 -ifneq ($(filter $(LINTABLE_MINOR_VERSIONS),$(GO_MINOR_VERSION)),) -SHOULD_LINT := true -endif - - .PHONY: all all: lint test @@ -26,16 +16,11 @@ dependencies: @echo "Installing test dependencies..." go install ./vendor/github.com/axw/gocov/gocov go install ./vendor/github.com/mattn/goveralls -ifdef SHOULD_LINT @echo "Installing golint..." go install ./vendor/github.com/golang/lint/golint -else - @echo "Not installing golint, since we don't expect to lint on" $(GO_VERSION) -endif .PHONY: lint lint: -ifdef SHOULD_LINT @rm -rf lint.log @echo "Checking formatting..." @gofmt -d -s $(PKG_FILES) 2>&1 | grep -v $(LINT_IGNORE) | tee lint.log @@ -50,9 +35,6 @@ ifdef SHOULD_LINT @echo "Checking for license headers..." @./check_license.sh | tee -a lint.log @[ ! -s lint.log ] -else - @echo "Skipping linters on" $(GO_VERSION) -endif .PHONY: test test: diff --git a/histogram_test.go b/histogram_test.go index c1eb57b1..dcbf173c 100644 --- a/histogram_test.go +++ b/histogram_test.go @@ -169,7 +169,7 @@ func TestMustMakeExponentialDurationBucketsPanicsOnBadFactor(t *testing.T) { func TestBucketPairsNoRaceWhenSorted(t *testing.T) { buckets := DurationBuckets{} for i := 0; i < 99; i++ { - buckets = append(buckets, time.Duration(i) * time.Second) + buckets = append(buckets, time.Duration(i)*time.Second) } newPair := func() { pairs := BucketPairs(buckets) @@ -183,7 +183,7 @@ func TestBucketPairsNoRaceWhenSorted(t *testing.T) { func TestBucketPairsNoRaceWhenUnsorted(t *testing.T) { buckets := DurationBuckets{} for i := 100; i > 1; i-- { - buckets = append(buckets, time.Duration(i) * time.Second) + buckets = append(buckets, time.Duration(i)*time.Second) } newPair := func() { pairs := BucketPairs(buckets) diff --git a/m3/reporter_test.go b/m3/reporter_test.go index d421cfb5..0a6fe589 100644 --- a/m3/reporter_test.go +++ b/m3/reporter_test.go @@ -350,9 +350,9 @@ func TestReporterHistogram(t *testing.T) { } func TestBatchSizes(t *testing.T) { - server := newSimpleServer(t) - go server.serve() - defer server.close() + server := newFakeM3Server(t, nil, false, Compact) + go server.Serve() + defer server.Close() commonTags := map[string]string{ "env": "test", @@ -360,7 +360,7 @@ func TestBatchSizes(t *testing.T) { } maxPacketSize := int32(1440) r, err := NewReporter(Options{ - HostPorts: []string{server.addr()}, + HostPorts: []string{server.Addr}, Service: "test-service", CommonTags: commonTags, MaxQueueSize: 10000, @@ -409,12 +409,12 @@ func TestBatchSizes(t *testing.T) { r.Close() }() - for len(server.getPackets()) < 100 { + for len(server.Packets()) < 100 { time.Sleep(shortInterval) } atomic.StoreUint32(&stop, 1) - for _, packet := range server.getPackets() { + for _, packet := range server.Packets() { require.True(t, len(packet) < int(maxPacketSize)) } } @@ -494,61 +494,6 @@ func TestReporterHasReportingAndTaggingCapability(t *testing.T) { assert.True(t, r.Capabilities().Tagging()) } -type simpleServer struct { - conn *net.UDPConn - t *testing.T - packets [][]byte - sync.Mutex - closed int32 -} - -func newSimpleServer(t *testing.T) *simpleServer { - addr, err := net.ResolveUDPAddr("udp", ":0") - require.NoError(t, err) - - conn, err := net.ListenUDP(addr.Network(), addr) - require.NoError(t, err) - - return &simpleServer{conn: conn, t: t} -} - -func (s *simpleServer) serve() { - readBuf := make([]byte, 64000) - for atomic.LoadInt32(&s.closed) == 0 { - n, err := s.conn.Read(readBuf) - if err != nil { - if atomic.LoadInt32(&s.closed) == 0 { - s.t.Errorf("FakeM3Server failed to Read: %v", err) - } - return - } - s.Lock() - s.packets = append(s.packets, readBuf[0:n]) - s.Unlock() - readBuf = make([]byte, 64000) - } -} - -func (s *simpleServer) getPackets() [][]byte { - s.Lock() - defer s.Unlock() - copy := make([][]byte, len(s.packets)) - for i, packet := range s.packets { - copy[i] = packet - } - - return copy -} - -func (s *simpleServer) close() error { - atomic.AddInt32(&s.closed, 1) - return s.conn.Close() -} - -func (s *simpleServer) addr() string { - return s.conn.LocalAddr().String() -} - type fakeM3Server struct { t *testing.T Service *fakeM3Service @@ -557,6 +502,12 @@ type fakeM3Server struct { processor thrift.TProcessor conn *net.UDPConn closed int32 + packets fakeM3ServerPackets +} + +type fakeM3ServerPackets struct { + sync.RWMutex + values [][]byte } func newFakeM3Server(t *testing.T, wg *sync.WaitGroup, countBatches bool, protocol Protocol) *fakeM3Server { @@ -585,6 +536,11 @@ func (f *fakeM3Server) Serve() { } return } + + f.packets.Lock() + f.packets.values = append(f.packets.values, readBuf[0:n]) + f.packets.Unlock() + trans, _ := customtransport.NewTBufferedReadTransport(bytes.NewBuffer(readBuf[0:n])) var proto thrift.TProtocol if f.protocol == Compact { @@ -601,6 +557,18 @@ func (f *fakeM3Server) Close() error { return f.conn.Close() } +func (f *fakeM3Server) Packets() [][]byte { + f.packets.Lock() + defer f.packets.Unlock() + + copy := make([][]byte, len(f.packets.values)) + for i, packet := range f.packets.values { + copy[i] = packet + } + + return copy +} + func newFakeM3Service(wg *sync.WaitGroup, countBatches bool) *fakeM3Service { return &fakeM3Service{wg: wg, countBatches: countBatches} } @@ -628,13 +596,13 @@ func (m *fakeM3Service) getMetrics() []*m3thrift.Metric { func (m *fakeM3Service) EmitMetricBatch(batch *m3thrift.MetricBatch) (err error) { m.lock.Lock() m.batches = append(m.batches, batch) - if m.countBatches { + if m.wg != nil && m.countBatches { m.wg.Done() } for _, metric := range batch.Metrics { m.metrics = append(m.metrics, metric) - if !m.countBatches { + if m.wg != nil && !m.countBatches { m.wg.Done() } }