Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

M3 aggregator test #32

Open
wants to merge 3 commits into
base: db_main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion docker/m3aggregator/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,21 @@ RUN cd /go/src/github.com/m3db/m3/ && \
FROM alpine:3.11
LABEL maintainer="The M3DB Authors <[email protected]>"

# Provide timezone data to allow TZ environment variable to be set
# for parsing relative times such as "9am" correctly and respect
# the TZ environment variable.
RUN apk update
RUN apk add --no-cache bash
RUN apk add --no-cache iperf3
RUN apk add --no-cache curl

RUN apk add --no-cache tzdata
RUN apk add --no-cache tar

RUN curl -o /tmp/grpcurl_1.3.1_linux_x86_64.tar.gz -L https://github.com/fullstorydev/grpcurl/releases/download/v1.3.1/grpcurl_1.3.1_linux_x86_64.tar.gz
RUN tar -xvf /tmp/grpcurl_1.3.1_linux_x86_64.tar.gz
RUN mv grpcurl /bin

EXPOSE 5000/tcp 6000/tcp 6001/tcp

RUN apk add --no-cache curl jq
Expand All @@ -26,4 +41,4 @@ COPY --from=builder /go/src/github.com/m3db/m3/bin/m3aggregator /bin/
COPY --from=builder /go/src/github.com/m3db/m3/src/aggregator/config/m3aggregator.yml /etc/m3aggregator/m3aggregator.yml

ENTRYPOINT [ "/bin/m3aggregator" ]
CMD [ "-f", "/etc/m3aggregator/m3aggregator.yml" ]
CMD [ "-f", "/etc/m3aggregator/m3aggregator.yml" ]
16 changes: 13 additions & 3 deletions docker/m3coordinator/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# stage 1: build
FROM golang:1.16.5-alpine3.13 AS builder
FROM golang:1.18-alpine3.15 AS builder
LABEL maintainer="The M3DB Authors <[email protected]>"

# Install deps
Expand All @@ -15,18 +15,28 @@ RUN cd /go/src/github.com/m3db/m3/ && \
make m3coordinator-linux-amd64

# stage 2: lightweight "release"
FROM alpine:3.11
FROM alpine:3.14
LABEL maintainer="The M3DB Authors <[email protected]>"

# Provide timezone data to allow TZ environment variable to be set
# for parsing relative times such as "9am" correctly and respect
# the TZ environment variable.
RUN apk update
RUN apk add --no-cache bash
RUN apk add --no-cache iperf3
RUN apk add --no-cache curl

RUN apk add --no-cache tzdata
RUN apk add --no-cache tar

RUN curl -o /tmp/grpcurl_1.3.1_linux_x86_64.tar.gz -L https://github.com/fullstorydev/grpcurl/releases/download/v1.3.1/grpcurl_1.3.1_linux_x86_64.tar.gz
RUN tar -xvf /tmp/grpcurl_1.3.1_linux_x86_64.tar.gz
RUN mv grpcurl /bin

EXPOSE 7201/tcp 7203/tcp

COPY --from=builder /go/src/github.com/m3db/m3/bin/m3coordinator /bin/
COPY --from=builder /go/src/github.com/m3db/m3/src/query/config/m3coordinator-local-etcd.yml /etc/m3coordinator/m3coordinator.yml

ENTRYPOINT [ "/bin/m3coordinator" ]
CMD [ "-f", "/etc/m3coordinator/m3coordinator.yml" ]
CMD [ "-f", "/etc/m3coordinator/m3coordinator.yml" ]
2 changes: 2 additions & 0 deletions src/aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ func (agg *aggregator) AddTimedWithStagedMetadatas(
agg.metrics.addTimed.ReportError(err, agg.electionManager.ElectionState())
return err
}
//agg.logger.Debug("agg_test, AddTimedWithStagedMetadatas:" + string(metric.String()))

if err = shard.AddTimedWithStagedMetadatas(metric, metas); err != nil {
agg.metrics.addTimed.ReportError(err, agg.electionManager.ElectionState())
return err
Expand Down
6 changes: 5 additions & 1 deletion src/aggregator/aggregator/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ type entryMetrics struct {
}

// NewEntryMetrics creates new entry metrics.
//
//nolint:golint,revive
func NewEntryMetrics(scope tally.Scope) *entryMetrics {
scope = scope.SubScope("entry")
Expand Down Expand Up @@ -858,7 +859,7 @@ func (e *Entry) checkTimestampForTimedMetric(
timestamp.UnixNano(), pastLimit.UnixNano())
return xerrors.NewRenamedError(errTooFarInThePast, err)
}
return nil
//return nil
}

func (e *Entry) updateTimedMetadataWithLock(
Expand Down Expand Up @@ -1011,6 +1012,9 @@ func (e *Entry) checkLatenessForForwardedMetric(
if metadata.ResendEnabled {
maxLatenessAllowed = e.opts.BufferForPastTimedMetricFn()(resolution)
}

//return nil

if currNanos-metricTimeNanos <= maxLatenessAllowed.Nanoseconds() {
return nil
}
Expand Down
18 changes: 16 additions & 2 deletions src/aggregator/aggregator/forwarded_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ package aggregator
import (
"errors"
"fmt"
"strconv"
"strings"

"github.com/m3db/m3/src/aggregator/client"
"github.com/m3db/m3/src/aggregator/hash"
Expand All @@ -35,6 +37,7 @@ import (

"github.com/uber-go/tally"
"go.uber.org/atomic"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -162,6 +165,7 @@ type forwardedWriter struct {
aggregationMetrics *forwardedAggregationMetrics
nowFn clock.NowFn
bufferForPastTimedMetricFn BufferForPastTimedMetricFn
logger *zap.Logger
}

func newForwardedWriter(
Expand All @@ -178,6 +182,7 @@ func newForwardedWriter(
aggregationMetrics: newForwardedAggregationMetrics(scope.SubScope("aggregations")),
bufferForPastTimedMetricFn: opts.BufferForPastTimedMetricFn(),
nowFn: opts.ClockOptions().NowFn(),
logger: opts.InstrumentOptions().Logger(),
}
}

Expand All @@ -195,7 +200,7 @@ func (w *forwardedWriter) Register(metric Registerable) (writeForwardedMetricFn,
key := newIDKey(metric.Type(), metricID)
fa, exists := w.aggregations[key]
if !exists {
fa = w.newForwardedAggregation(metric.Type(), metricID)
fa = w.newForwardedAggregation(metric.Type(), metricID, w.logger)
w.aggregations[key] = fa
}
if err := fa.add(metric); err != nil {
Expand Down Expand Up @@ -402,9 +407,10 @@ type forwardedAggregation struct {
onDoneFn onForwardedAggregationDoneFn
bufferForPastTimedMetricFn BufferForPastTimedMetricFn
nowFn clock.NowFn
logger *zap.Logger
}

func (w *forwardedWriter) newForwardedAggregation(metricType metric.Type, metricID id.RawID) *forwardedAggregation {
func (w *forwardedWriter) newForwardedAggregation(metricType metric.Type, metricID id.RawID, logger *zap.Logger) *forwardedAggregation {
agg := &forwardedAggregation{
metricType: metricType,
metricID: metricID,
Expand All @@ -414,6 +420,7 @@ func (w *forwardedWriter) newForwardedAggregation(metricType metric.Type, metric
metrics: w.aggregationMetrics,
bufferForPastTimedMetricFn: w.bufferForPastTimedMetricFn,
nowFn: w.nowFn,
logger: logger,
}
agg.writeFn = agg.write
agg.onDoneFn = agg.onDone
Expand Down Expand Up @@ -499,6 +506,9 @@ func (agg *forwardedAggregation) onDone(key aggregationKey) error {
agg.metrics.onDoneNoWrite.Inc(1)
return nil
}

//agg.logger.Debug("agg_test, onDone, aggregationKey:" + key.pipeline.String())

if agg.byKey[idx].currRefCnt == agg.byKey[idx].totalRefCnt {
var (
multiErr = xerrors.NewMultiError()
Expand All @@ -525,6 +535,10 @@ func (agg *forwardedAggregation) onDone(key aggregationKey) error {
Version: b.version,
}
b.version++

if strings.Contains(metric.String(), "rulemanager_latest_rule_evaluation_time") {
agg.logger.Debug("agg_test, onDone, metric key:" + string(metric.String()) + ", timeNano:" + strconv.FormatInt(metric.TimeNanos, 10))
}
if err := agg.client.WriteForwarded(metric, meta); err != nil {
multiErr = multiErr.Add(err)
agg.metrics.onDoneWriteErrors.Inc(1)
Expand Down
36 changes: 35 additions & 1 deletion src/aggregator/aggregator/gauge_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions src/aggregator/aggregator/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ type baseMetricList struct {
timestampNanosFn timestampNanosFn

closed bool
log *zap.Logger
aggregations *list.List
lastFlushedNanos int64
toCollect []*list.Element
Expand Down Expand Up @@ -214,6 +215,7 @@ func newBaseMetricList(
timestampNanosFn: timestampNanosFn,
aggregations: list.New(),
metrics: newMetricListMetrics(scope),
log: opts.InstrumentOptions().Logger(),
}
l.flushBeforeFn = l.flushBefore
l.consumeLocalMetricFn = l.consumeLocalMetric
Expand Down Expand Up @@ -257,6 +259,11 @@ func (l *baseMetricList) PushBack(value metricElem) (*list.Element, error) {
l.Unlock()
return nil, errListClosed
}

//if strings.Contains(value.ID().String(), "node_network_transmit_queue_length:") {
// l.log.Debug("agg_test, push back: " + value.ID().String())
//}

elem := l.aggregations.PushBack(value)
if !hasForwardedID {
l.Unlock()
Expand Down Expand Up @@ -370,6 +377,9 @@ func (l *baseMetricList) flushBefore(beforeNanos int64, flushType flushType) {
// If the element is eligible for collection after the values are
// processed, add it to the list of elements to collect.
elem := e.Value.(metricElem)

//l.log.Debug("agg_test, before flush back: " + elem.ID().String() + ", Type: " + string(elem.Type()))

if elem.Consume(
beforeNanos,
l.isEarlierThanFn,
Expand Down
5 changes: 5 additions & 0 deletions src/aggregator/client/m3msg_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type M3MsgClient struct {
nowFn clock.NowFn
shardFn sharding.ShardFn
metrics m3msgClientMetrics
log *zap.Logger
}

type m3msgClient struct {
Expand Down Expand Up @@ -90,6 +91,7 @@ func NewM3MsgClient(opts Options) (Client, error) {
nowFn: opts.ClockOptions().NowFn(),
shardFn: opts.ShardFn(),
metrics: newM3msgClientMetrics(iOpts.MetricsScope(), iOpts.TimerOptions()),
log: logger,
}, nil
}

Expand Down Expand Up @@ -201,6 +203,8 @@ func (c *M3MsgClient) WriteTimedWithStagedMetadatas(
metadatas: metadatas,
},
}

//c.log.Debug("agg_test, WriteTimedWithStagedMetadatas:" + string(metric.String()))
err := c.write(metric.ID, payload)
c.metrics.writeForwarded.ReportSuccessOrError(err, c.nowFn().Sub(callStart))
return err
Expand Down Expand Up @@ -319,6 +323,7 @@ func newMessage(pool *messagePool) *message {
}

// Encode encodes a m3msg payload
//
//nolint:gocyclo,gocritic
func (m *message) Encode(
shard uint32,
Expand Down
Loading