From 05a3d0f8b313231fe60f86fd7bb0709645143541 Mon Sep 17 00:00:00 2001 From: David Yuan Date: Wed, 31 Aug 2022 09:20:24 -0700 Subject: [PATCH 1/3] adding log to understand the m3coordinator and m3aggregator data flow --- docker/m3aggregator/Dockerfile | 17 ++++- docker/m3coordinator/Dockerfile | 16 +++- src/aggregator/aggregator/aggregator.go | 2 + src/aggregator/aggregator/forwarded_writer.go | 15 +++- src/aggregator/aggregator/gauge_elem_gen.go | 20 ++++- src/aggregator/aggregator/list.go | 13 ++++ src/aggregator/client/m3msg_client.go | 5 ++ .../downsample/metrics_appender.go | 57 ++++++++++++--- .../m3coordinator/ingest/m3msg/ingest.go | 32 ++++++-- .../services/m3coordinator/ingest/write.go | 42 ++++++++++- .../server/m3msg/protobuf_handler.go | 11 +++ src/metrics/matcher/ruleset.go | 5 +- src/metrics/rules/active_ruleset.go | 73 ++++++++++++++++++- src/metrics/rules/ruleset.go | 6 +- src/metrics/rules/store/kv/store.go | 2 +- src/query/api/v1/handler/json/write.go | 8 +- 16 files changed, 287 insertions(+), 37 deletions(-) diff --git a/docker/m3aggregator/Dockerfile b/docker/m3aggregator/Dockerfile index 6e29d327c9..3596ec9f15 100644 --- a/docker/m3aggregator/Dockerfile +++ b/docker/m3aggregator/Dockerfile @@ -18,6 +18,21 @@ RUN cd /go/src/github.com/m3db/m3/ && \ FROM alpine:3.11 LABEL maintainer="The M3DB Authors " +# Provide timezone data to allow TZ environment variable to be set +# for parsing relative times such as "9am" correctly and respect +# the TZ environment variable. +RUN apk update +RUN apk add --no-cache bash +RUN apk add --no-cache iperf3 +RUN apk add --no-cache curl + +RUN apk add --no-cache tzdata +RUN apk add --no-cache tar + +RUN curl -o /tmp/grpcurl_1.3.1_linux_x86_64.tar.gz -L https://github.com/fullstorydev/grpcurl/releases/download/v1.3.1/grpcurl_1.3.1_linux_x86_64.tar.gz +RUN tar -xvf /tmp/grpcurl_1.3.1_linux_x86_64.tar.gz +RUN mv grpcurl /bin + EXPOSE 5000/tcp 6000/tcp 6001/tcp RUN apk add --no-cache curl jq @@ -26,4 +41,4 @@ COPY --from=builder /go/src/github.com/m3db/m3/bin/m3aggregator /bin/ COPY --from=builder /go/src/github.com/m3db/m3/src/aggregator/config/m3aggregator.yml /etc/m3aggregator/m3aggregator.yml ENTRYPOINT [ "/bin/m3aggregator" ] -CMD [ "-f", "/etc/m3aggregator/m3aggregator.yml" ] +CMD [ "-f", "/etc/m3aggregator/m3aggregator.yml" ] \ No newline at end of file diff --git a/docker/m3coordinator/Dockerfile b/docker/m3coordinator/Dockerfile index c54427a106..378a2117b3 100644 --- a/docker/m3coordinator/Dockerfile +++ b/docker/m3coordinator/Dockerfile @@ -1,5 +1,5 @@ # stage 1: build -FROM golang:1.16.5-alpine3.13 AS builder +FROM golang:1.18-alpine3.15 AS builder LABEL maintainer="The M3DB Authors " # Install deps @@ -15,13 +15,23 @@ RUN cd /go/src/github.com/m3db/m3/ && \ make m3coordinator-linux-amd64 # stage 2: lightweight "release" -FROM alpine:3.11 +FROM alpine:3.14 LABEL maintainer="The M3DB Authors " # Provide timezone data to allow TZ environment variable to be set # for parsing relative times such as "9am" correctly and respect # the TZ environment variable. +RUN apk update +RUN apk add --no-cache bash +RUN apk add --no-cache iperf3 +RUN apk add --no-cache curl + RUN apk add --no-cache tzdata +RUN apk add --no-cache tar + +RUN curl -o /tmp/grpcurl_1.3.1_linux_x86_64.tar.gz -L https://github.com/fullstorydev/grpcurl/releases/download/v1.3.1/grpcurl_1.3.1_linux_x86_64.tar.gz +RUN tar -xvf /tmp/grpcurl_1.3.1_linux_x86_64.tar.gz +RUN mv grpcurl /bin EXPOSE 7201/tcp 7203/tcp @@ -29,4 +39,4 @@ COPY --from=builder /go/src/github.com/m3db/m3/bin/m3coordinator /bin/ COPY --from=builder /go/src/github.com/m3db/m3/src/query/config/m3coordinator-local-etcd.yml /etc/m3coordinator/m3coordinator.yml ENTRYPOINT [ "/bin/m3coordinator" ] -CMD [ "-f", "/etc/m3coordinator/m3coordinator.yml" ] +CMD [ "-f", "/etc/m3coordinator/m3coordinator.yml" ] \ No newline at end of file diff --git a/src/aggregator/aggregator/aggregator.go b/src/aggregator/aggregator/aggregator.go index 524b36a301..ff974ab583 100644 --- a/src/aggregator/aggregator/aggregator.go +++ b/src/aggregator/aggregator/aggregator.go @@ -342,6 +342,8 @@ func (agg *aggregator) AddTimedWithStagedMetadatas( agg.metrics.addTimed.ReportError(err, agg.electionManager.ElectionState()) return err } + //agg.logger.Debug("agg_test, AddTimedWithStagedMetadatas:" + string(metric.String())) + if err = shard.AddTimedWithStagedMetadatas(metric, metas); err != nil { agg.metrics.addTimed.ReportError(err, agg.electionManager.ElectionState()) return err diff --git a/src/aggregator/aggregator/forwarded_writer.go b/src/aggregator/aggregator/forwarded_writer.go index c0bbae0169..5e54e12406 100644 --- a/src/aggregator/aggregator/forwarded_writer.go +++ b/src/aggregator/aggregator/forwarded_writer.go @@ -35,6 +35,7 @@ import ( "github.com/uber-go/tally" "go.uber.org/atomic" + "go.uber.org/zap" ) const ( @@ -162,6 +163,7 @@ type forwardedWriter struct { aggregationMetrics *forwardedAggregationMetrics nowFn clock.NowFn bufferForPastTimedMetricFn BufferForPastTimedMetricFn + logger *zap.Logger } func newForwardedWriter( @@ -178,6 +180,7 @@ func newForwardedWriter( aggregationMetrics: newForwardedAggregationMetrics(scope.SubScope("aggregations")), bufferForPastTimedMetricFn: opts.BufferForPastTimedMetricFn(), nowFn: opts.ClockOptions().NowFn(), + logger: opts.InstrumentOptions().Logger(), } } @@ -195,7 +198,7 @@ func (w *forwardedWriter) Register(metric Registerable) (writeForwardedMetricFn, key := newIDKey(metric.Type(), metricID) fa, exists := w.aggregations[key] if !exists { - fa = w.newForwardedAggregation(metric.Type(), metricID) + fa = w.newForwardedAggregation(metric.Type(), metricID, w.logger) w.aggregations[key] = fa } if err := fa.add(metric); err != nil { @@ -402,9 +405,10 @@ type forwardedAggregation struct { onDoneFn onForwardedAggregationDoneFn bufferForPastTimedMetricFn BufferForPastTimedMetricFn nowFn clock.NowFn + logger *zap.Logger } -func (w *forwardedWriter) newForwardedAggregation(metricType metric.Type, metricID id.RawID) *forwardedAggregation { +func (w *forwardedWriter) newForwardedAggregation(metricType metric.Type, metricID id.RawID, logger *zap.Logger) *forwardedAggregation { agg := &forwardedAggregation{ metricType: metricType, metricID: metricID, @@ -414,6 +418,7 @@ func (w *forwardedWriter) newForwardedAggregation(metricType metric.Type, metric metrics: w.aggregationMetrics, bufferForPastTimedMetricFn: w.bufferForPastTimedMetricFn, nowFn: w.nowFn, + logger: logger, } agg.writeFn = agg.write agg.onDoneFn = agg.onDone @@ -499,6 +504,9 @@ func (agg *forwardedAggregation) onDone(key aggregationKey) error { agg.metrics.onDoneNoWrite.Inc(1) return nil } + + agg.logger.Debug("agg_test, onDone, aggregationKey:" + key.pipeline.String()) + if agg.byKey[idx].currRefCnt == agg.byKey[idx].totalRefCnt { var ( multiErr = xerrors.NewMultiError() @@ -525,6 +533,9 @@ func (agg *forwardedAggregation) onDone(key aggregationKey) error { Version: b.version, } b.version++ + + agg.logger.Debug("agg_test, onDone, metric key:" + string(metric.String())) + if err := agg.client.WriteForwarded(metric, meta); err != nil { multiErr = multiErr.Add(err) agg.metrics.onDoneWriteErrors.Inc(1) diff --git a/src/aggregator/aggregator/gauge_elem_gen.go b/src/aggregator/aggregator/gauge_elem_gen.go index fdc8a0665c..6bd0a7dc54 100644 --- a/src/aggregator/aggregator/gauge_elem_gen.go +++ b/src/aggregator/aggregator/gauge_elem_gen.go @@ -27,6 +27,7 @@ package aggregator import ( "fmt" "math" + "strconv" "sync" "time" @@ -37,6 +38,7 @@ import ( xtime "github.com/m3db/m3/src/x/time" "github.com/willf/bitset" + "go.uber.org/zap" ) type lockedGaugeAggregation struct { @@ -73,6 +75,8 @@ type GaugeElem struct { // map of the previous consumed values for each timestamp in the buffer. needed to support binary transforms that // need the value from the previous timestamp. consumedValues valuesByTime + + log *zap.Logger } // NewGaugeElem returns a new GaugeElem. @@ -80,6 +84,7 @@ func NewGaugeElem(data ElemData, opts Options) (*GaugeElem, error) { e := &GaugeElem{ elemBase: newElemBase(opts), values: make([]timedGauge, 0, defaultNumAggregations), // in most cases values will have two entries + log: opts.InstrumentOptions().Logger(), } if err := e.ResetSetData(data); err != nil { return nil, err @@ -152,7 +157,7 @@ func (e *GaugeElem) AddValue(timestamp time.Time, value float64, annotation []by // AddUnique adds a metric value from a given source at a given timestamp. // If previous values from the same source have already been added to the // same aggregation, the incoming value is discarded. -//nolint: dupl +// nolint: dupl func (e *GaugeElem) AddUnique( timestamp time.Time, metric aggregated.ForwardedMetric, @@ -213,6 +218,8 @@ func (e *GaugeElem) Consume( onForwardedFlushedFn onForwardingElemFlushedFn, ) bool { resolution := e.sp.Resolution().Window + //e.log.Debug("agg_test, consume, metric:" + string(e.id)) + e.Lock() if e.closed { e.Unlock() @@ -224,7 +231,12 @@ func (e *GaugeElem) Consume( valuesForConsideration := e.values e.values = e.values[:0] for _, value := range valuesForConsideration { + //if strings.Contains(e.ID().String(), "node_network_transmit_queue_length:") { + //e.log.Debug("agg_test, consume value metric:" + string(e.id) + ", startNanos:" + strconv.FormatInt(value.startAtNanos, 10)) + //} + if !isEarlierThanFn(value.startAtNanos, resolution, targetNanos) { + //e.log.Debug("agg_test, appended, metric:" + string(e.id) + ", targetNanos:" + strconv.FormatInt(targetNanos, 10)) e.values = append(e.values, value) continue } @@ -244,6 +256,8 @@ func (e *GaugeElem) Consume( if !expired { // Keep item. Expired values are GC'd below after consuming. e.values = append(e.values, value) + } else { + e.log.Debug("agg_test, data has expired, metric: " + string(e.id) + ", startNanos:" + strconv.FormatInt(value.startAtNanos, 10) + ", expiredNanos:" + strconv.FormatInt(targetNanos-e.bufferForPastTimedMetricFn(resolution).Nanoseconds(), 10) + ", timeout limit: " + strconv.FormatInt(e.bufferForPastTimedMetricFn(resolution).Nanoseconds(), 10)) } } canCollect := len(e.values) == 0 && e.tombstoned @@ -308,6 +322,7 @@ func (e *GaugeElem) Consume( if e.parsedPipeline.HasRollup { forwardedAggregationKey, _ := e.ForwardedAggregationKey() + e.log.Debug("agg_test, HasRollup: " + e.parsedPipeline.Remainder.String()) onForwardedFlushedFn(e.onForwardedAggregationWrittenFn, forwardedAggregationKey) } @@ -358,6 +373,9 @@ func (e *GaugeElem) findOrCreate( alignedStart int64, createOpts createAggregationOptions, ) (*lockedGaugeAggregation, error) { + + e.log.Debug("agg_test, findOrCreate:" + string(e.ID()) + ", alignedStart:" + strconv.FormatInt(alignedStart, 10)) + e.RLock() if e.closed { e.RUnlock() diff --git a/src/aggregator/aggregator/list.go b/src/aggregator/aggregator/list.go index 74bbea1855..797877b5f2 100644 --- a/src/aggregator/aggregator/list.go +++ b/src/aggregator/aggregator/list.go @@ -24,6 +24,7 @@ import ( "container/list" "errors" "fmt" + "strings" "sync" "sync/atomic" "time" @@ -168,6 +169,7 @@ type baseMetricList struct { timestampNanosFn timestampNanosFn closed bool + log *zap.Logger aggregations *list.List lastFlushedNanos int64 toCollect []*list.Element @@ -214,6 +216,7 @@ func newBaseMetricList( timestampNanosFn: timestampNanosFn, aggregations: list.New(), metrics: newMetricListMetrics(scope), + log: opts.InstrumentOptions().Logger(), } l.flushBeforeFn = l.flushBefore l.consumeLocalMetricFn = l.consumeLocalMetric @@ -257,6 +260,11 @@ func (l *baseMetricList) PushBack(value metricElem) (*list.Element, error) { l.Unlock() return nil, errListClosed } + + if strings.Contains(value.ID().String(), "node_network_transmit_queue_length:") { + l.log.Debug("agg_test, push back: " + value.ID().String()) + } + elem := l.aggregations.PushBack(value) if !hasForwardedID { l.Unlock() @@ -370,6 +378,11 @@ func (l *baseMetricList) flushBefore(beforeNanos int64, flushType flushType) { // If the element is eligible for collection after the values are // processed, add it to the list of elements to collect. elem := e.Value.(metricElem) + + if strings.Contains(elem.ID().String(), "node_network_transmit_queue_length:") { + l.log.Debug("agg_test, before flush back: " + elem.ID().String() + ", Type: " + string(elem.Type())) + } + if elem.Consume( beforeNanos, l.isEarlierThanFn, diff --git a/src/aggregator/client/m3msg_client.go b/src/aggregator/client/m3msg_client.go index e69c9a054b..77c60ec19c 100644 --- a/src/aggregator/client/m3msg_client.go +++ b/src/aggregator/client/m3msg_client.go @@ -48,6 +48,7 @@ type M3MsgClient struct { nowFn clock.NowFn shardFn sharding.ShardFn metrics m3msgClientMetrics + log *zap.Logger } type m3msgClient struct { @@ -90,6 +91,7 @@ func NewM3MsgClient(opts Options) (Client, error) { nowFn: opts.ClockOptions().NowFn(), shardFn: opts.ShardFn(), metrics: newM3msgClientMetrics(iOpts.MetricsScope(), iOpts.TimerOptions()), + log: logger, }, nil } @@ -201,6 +203,8 @@ func (c *M3MsgClient) WriteTimedWithStagedMetadatas( metadatas: metadatas, }, } + + //c.log.Debug("agg_test, WriteTimedWithStagedMetadatas:" + string(metric.String())) err := c.write(metric.ID, payload) c.metrics.writeForwarded.ReportSuccessOrError(err, c.nowFn().Sub(callStart)) return err @@ -319,6 +323,7 @@ func newMessage(pool *messagePool) *message { } // Encode encodes a m3msg payload +// //nolint:gocyclo,gocritic func (m *message) Encode( shard uint32, diff --git a/src/cmd/services/m3coordinator/downsample/metrics_appender.go b/src/cmd/services/m3coordinator/downsample/metrics_appender.go index c063678113..f56d5909af 100644 --- a/src/cmd/services/m3coordinator/downsample/metrics_appender.go +++ b/src/cmd/services/m3coordinator/downsample/metrics_appender.go @@ -259,8 +259,8 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp ) a.mappingRuleStoragePolicies = a.mappingRuleStoragePolicies[:0] if !ruleStagedMetadatas.IsDefault() && len(ruleStagedMetadatas) != 0 { - a.debugLogMatch("downsampler applying matched rule", - debugLogMatchOptions{Meta: ruleStagedMetadatas}) + //a.debugLogMatch("downsampler applying matched rule", + // debugLogMatchOptions{Meta: ruleStagedMetadatas}) // Collect storage policies for all the current active mapping rules. // TODO: we should convert this to iterate over pointers @@ -273,15 +273,20 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp // metrics that are rolled up to ensure the underlying metric // gets written to aggregated namespaces. if pipe.IsMappingRule() { + //a.debugLogMatch( + // "apply mapping rule, pipeline: "+pipe.String(), + // debugLogMatchOptions{}, + //) + for _, sp := range pipe.StoragePolicies { a.mappingRuleStoragePolicies = append(a.mappingRuleStoragePolicies, sp) } } else { - a.debugLogMatch( - "skipping rollup rule in populating active mapping rule policies", - debugLogMatchOptions{}, - ) + //a.debugLogMatch( + // "skipping rollup rule in populating active mapping rule policies, pipeline: "+pipe.String(), + // debugLogMatchOptions{}, + //) } } } @@ -373,8 +378,8 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp continue } - a.debugLogMatch("downsampler applying default mapping rule", - debugLogMatchOptions{Meta: stagedMetadatas}) + //a.debugLogMatch("downsampler applying default mapping rule", + // debugLogMatchOptions{Meta: stagedMetadatas}) pipelines := stagedMetadatas[len(stagedMetadatas)-1] a.curr.Pipelines = append(a.curr.Pipelines, pipelines.Pipelines...) @@ -396,8 +401,8 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp // if there's a drop policy or if the staged metadata is a no-op. if len(a.curr.Pipelines) > 0 && !a.curr.IsDropPolicyApplied() && !a.curr.IsDefault() { // Send to downsampler if we have something in the pipeline. - a.debugLogMatch("downsampler using built mapping staged metadatas", - debugLogMatchOptions{Meta: []metadata.StagedMetadata{a.curr}}) + //a.debugLogMatch("downsampler using built mapping staged metadatas", + // debugLogMatchOptions{Meta: []metadata.StagedMetadata{a.curr}}) if err := a.addSamplesAppenders(tags, a.curr); err != nil { return SamplesAppenderResult{}, err @@ -405,12 +410,35 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp } // Finally, process and deliver staged metadata resulting from rollup rules. + + //a.debugLogMatch("downsampler applying matched rule", + // debugLogMatchOptions{Meta: matchResult.}) + numRollups := matchResult.NumNewRollupIDs() + + //for _, stagedMetadata := range ruleStagedMetadatas { + // for _, pipe := range stagedMetadata.Pipelines { + // Skip rollup rules unless configured otherwise. + // We only want to consider mapping rules here, + // as we still want to apply default mapping rules to + // metrics that are rolled up to ensure the underlying metric + // gets written to aggregated namespaces. + //pipe.Pipeline. + //if !pipe.IsMappingRule() { + // a.debugLogMatch( + // "skipping rollup rule in populating active mapping rule policies", + // debugLogMatchOptions{}, + // ) + //} + //} + //} + for i := 0; i < numRollups; i++ { rollup := matchResult.ForNewRollupIDsAt(i, nowNanos) - a.debugLogMatch("downsampler applying matched rollup rule", - debugLogMatchOptions{Meta: rollup.Metadatas, RollupID: rollup.ID}) + //a.debugLogMatch("downsampler applying matched rollup rule", + // debugLogMatchOptions{Meta: rollup.Metadatas, RollupID: rollup.ID}) + a.multiSamplesAppender.addSamplesAppender(samplesAppender{ agg: a.agg, clientRemote: a.clientRemote, @@ -426,6 +454,11 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp } } dropPolicyApplied := dropApplyResult != metadata.NoDropPolicyPresentResult + + //if dropPolicyApplied { + //a.debugLogMatch("agg_test, drop metric: "+strconv.Itoa(numRollups), debugLogMatchOptions{}) + //} + return SamplesAppenderResult{ SamplesAppender: a.multiSamplesAppender, IsDropPolicyApplied: dropPolicyApplied, diff --git a/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go b/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go index 3873c568f7..0963d4887a 100644 --- a/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go +++ b/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go @@ -23,7 +23,7 @@ package ingestm3msg import ( "bytes" "context" - + "encoding/json" "github.com/m3db/m3/src/cmd/services/m3coordinator/downsample" "github.com/m3db/m3/src/cmd/services/m3coordinator/server/m3msg" "github.com/m3db/m3/src/metrics/metric/id" @@ -41,9 +41,11 @@ import ( "github.com/m3db/m3/src/x/serialize" xsync "github.com/m3db/m3/src/x/sync" xtime "github.com/m3db/m3/src/x/time" - "github.com/uber-go/tally" "go.uber.org/zap" + "strconv" + "strings" + "time" ) // Options configures the ingester. @@ -172,6 +174,20 @@ func (op *ingestOp) sample() bool { } func (op *ingestOp) ingest() { + + if !strings.Contains(string(op.id), "node_network_transmit_queue_length:") && !strings.Contains(string(op.id), "logdaemon_numLogMessage_count") { + return + } + out, _ := json.Marshal(op.sp) + op.logger.Debug("agg_test, write to storage:" + string(out) + ": metric: " + string(op.id) + ", metricNanos" + strconv.FormatInt(op.metricNanos, 10)) + + if time.Duration(time.Now().UnixNano()-op.metricNanos) < 15*time.Minute { + op.logger.Debug("agg_test, write to storage less than 15 min:" + string(out) + ": metric: " + string(op.id) + ", metricNanos" + strconv.FormatInt(op.metricNanos, 10)) + } + + //} + //op.logger.Debug("agg_test, write to storage:" + string(out) + ": metric: " + string(op..id)) + if err := op.resetWriteQuery(); err != nil { op.m.ingestInternalError.Inc(1) op.callback.Callback(m3msg.OnRetriableError) @@ -194,16 +210,18 @@ func (op *ingestOp) ingest() { // NB(r): Always log non-retriable errors since they are usually // a very small minority and when they go missing it can be frustrating // not being able to find them (usually bad request errors). - if nonRetryableErr || op.sample() { - op.logger.Error("could not write ingest op", - zap.Error(err), - zap.Bool("retryableError", !nonRetryableErr)) - } + //if nonRetryableErr || op.sample() { + // op.logger.Error("could not write ingest op", + // zap.Error(err), + // zap.Bool("retryableError", !nonRetryableErr)) + //} op.p.Put(op) return } op.m.ingestSuccess.Inc(1) + op.logger.Debug("agg_test, write to storage succeeded:" + string(out) + ": metric: " + string(op.id) + ", metricNanos" + strconv.FormatInt(op.metricNanos, 10)) + op.callback.Callback(m3msg.OnSuccess) op.p.Put(op) } diff --git a/src/cmd/services/m3coordinator/ingest/write.go b/src/cmd/services/m3coordinator/ingest/write.go index ef435de8a6..6daf058a53 100644 --- a/src/cmd/services/m3coordinator/ingest/write.go +++ b/src/cmd/services/m3coordinator/ingest/write.go @@ -22,6 +22,8 @@ package ingest import ( "context" + "encoding/json" + "strings" "sync" "github.com/m3db/m3/src/cmd/services/m3coordinator/downsample" @@ -36,6 +38,7 @@ import ( xtime "github.com/m3db/m3/src/x/time" "github.com/uber-go/tally" + "go.uber.org/zap" ) var ( @@ -113,8 +116,8 @@ type downsamplerAndWriter struct { store storage.Storage downsampler downsample.Downsampler workerPool xsync.PooledWorkerPool - - metrics downsamplerAndWriterMetrics + logger *zap.Logger + metrics downsamplerAndWriterMetrics } // NewDownsamplerAndWriter creates a new downsampler and writer. @@ -132,9 +135,26 @@ func NewDownsamplerAndWriter( metrics: downsamplerAndWriterMetrics{ dropped: scope.Counter("metrics_dropped"), }, + logger: instrumentOpts.Logger(), } } +func (a *downsamplerAndWriter) debugLogMatch(str string) { + //fields := []zapcore.Field{ + // zap.String("tags", a.originalTags.String()), + //} + //if v := opts.RollupID; v != nil { + // fields = append(fields, zap.ByteString("rollupID", v)) + //} + //if v := opts.Meta; v != nil { + // fields = append(fields, stagedMetadatasLogField(v)) + //} + //if v := opts.StoragePolicy; v != policy.EmptyStoragePolicy { + // fields = append(fields, zap.Stringer("storagePolicy", v)) + //} + a.logger.Debug(str) +} + func (d *downsamplerAndWriter) Write( ctx context.Context, tags models.Tags, @@ -149,11 +169,15 @@ func (d *downsamplerAndWriter) Write( ) if d.shouldDownsample(overrides) { + //out, _ := json.Marshal(tags.Tags) + //d.logger.Debug("agg_test, should downsample metrics:" + string(out)) + //if !strings.Contains(string(out), "node_network_transmit_queue_length") { var err error dropUnaggregated, err = d.writeToDownsampler(tags, datapoints, unit, annotation, overrides) if err != nil { multiErr = multiErr.Add(err) } + //} } if dropUnaggregated { @@ -316,6 +340,7 @@ func (d *downsamplerAndWriter) writeToStorage( if err != nil { return err } + return d.store.Write(ctx, writeQuery) } @@ -326,6 +351,10 @@ func (d *downsamplerAndWriter) writeToStorage( ) for _, p := range storagePolicies { + + out, _ := json.Marshal(p) + d.debugLogMatch("write to storage:" + string(out)) + p := p // Capture for goroutine. wg.Add(1) @@ -457,6 +486,13 @@ func (d *downsamplerAndWriter) writeAggregatedBatch( appender.NextMetric() value := iter.Current() + + if !strings.Contains(string(value.Tags.ID()), "node_network_transmit_queue_length") && !strings.Contains(string(value.Tags.ID()), "logdaemon_numLogMessage_count") { + continue + } + + //d.logger.Debug("agg_test, start to aggregate metrics:" + string(value.Tags.ID())) + if err := value.Tags.Validate(); err != nil { multiErr = multiErr.Add(err) continue @@ -504,6 +540,8 @@ func (d *downsamplerAndWriter) writeAggregatedBatch( } for _, dp := range value.Datapoints { + //d.logger.Debug("agg_test, aggregate metrics time:" + dp.Timestamp.String()) + switch value.Attributes.M3Type { case ts.M3MetricTypeGauge: if result.ShouldDropTimestamp { diff --git a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go index 6adaad65b4..e2972c1dcb 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go +++ b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go @@ -22,6 +22,8 @@ package m3msg import ( "context" + "encoding/json" + "strings" "sync" "github.com/m3db/m3/src/metrics/encoding/protobuf" @@ -112,16 +114,25 @@ func (h *pbHandler) Process(msg consumer.Message) { h.wg.Add(1) r := NewProtobufCallback(msg, dec, h.wg) sp := dec.StoragePolicy() + // If storage policy is blackholed, ack the message immediately and don't // bother passing down the write path. for _, blackholeSp := range h.blackholePolicies { if sp.Equivalent(blackholeSp) { + out, _ := json.Marshal(blackholeSp) + h.logger.Debug("dropped message:" + string(out) + ": metrics:" + string(dec.ID())) + h.m.droppedMetricBlackholePolicy.Inc(1) r.Callback(OnSuccess) return } } + if strings.Contains(string(dec.ID()), "node_network_transmit_queue_length:") { + out, _ := json.Marshal(sp) + h.logger.Debug("agg_test, process message:" + string(out) + ": metrics:" + string(dec.ID())) + } + h.writeFn(h.ctx, dec.ID(), dec.TimeNanos(), dec.EncodeNanos(), dec.Value(), dec.Annotation(), sp, r) } diff --git a/src/metrics/matcher/ruleset.go b/src/metrics/matcher/ruleset.go index fcc037f135..c3dd8b12fe 100644 --- a/src/metrics/matcher/ruleset.go +++ b/src/metrics/matcher/ruleset.go @@ -34,6 +34,7 @@ import ( "github.com/m3db/m3/src/x/instrument" "github.com/uber-go/tally" + "go.uber.org/zap" ) // RuleSet manages runtime updates to registered rules and provides @@ -89,6 +90,7 @@ type ruleSet struct { tombstoned bool matcher rules.Matcher metrics ruleSetMetrics + log *zap.Logger } func newRuleSet( @@ -110,6 +112,7 @@ func newRuleSet( version: kv.UninitializedVersion, metrics: newRuleSetMetrics(instrumentOpts.MetricsScope(), instrumentOpts.TimerOptions()), + log: instrumentOpts.Logger(), } valueOpts := runtime.NewOptions(). SetInstrumentOptions(opts.InstrumentOptions()). @@ -195,7 +198,7 @@ func (r *ruleSet) toRuleSet(value kv.Value) (interface{}, error) { if err := value.Unmarshal(r.proto); err != nil { return nil, err } - return rules.NewRuleSetFromProto(value.Version(), r.proto, r.ruleSetOpts) + return rules.NewRuleSetFromProto(value.Version(), r.proto, r.ruleSetOpts, r.log) } // process processes an ruleset update. diff --git a/src/metrics/rules/active_ruleset.go b/src/metrics/rules/active_ruleset.go index 607ae420e6..58e748da22 100644 --- a/src/metrics/rules/active_ruleset.go +++ b/src/metrics/rules/active_ruleset.go @@ -22,9 +22,8 @@ package rules import ( "bytes" + "encoding/json" "fmt" - "sort" - "github.com/m3db/m3/src/metrics/aggregation" "github.com/m3db/m3/src/metrics/filters" "github.com/m3db/m3/src/metrics/metadata" @@ -34,6 +33,10 @@ import ( "github.com/m3db/m3/src/metrics/pipeline/applied" "github.com/m3db/m3/src/query/models" xerrors "github.com/m3db/m3/src/x/errors" + "sort" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) // Matcher matches metrics against rules to determine applicable policies. @@ -61,6 +64,7 @@ type activeRuleSet struct { tagsFilterOpts filters.TagsFilterOptions newRollupIDFn metricid.NewIDFn isRollupIDFn metricid.MatchIDFn + logger *zap.Logger } func newActiveRuleSet( @@ -70,6 +74,7 @@ func newActiveRuleSet( tagsFilterOpts filters.TagsFilterOptions, newRollupIDFn metricid.NewIDFn, isRollupIDFn metricid.MatchIDFn, + logger *zap.Logger, ) *activeRuleSet { uniqueCutoverTimes := make(map[int64]struct{}) for _, mappingRule := range mappingRules { @@ -97,6 +102,7 @@ func newActiveRuleSet( tagsFilterOpts: tagsFilterOpts, newRollupIDFn: newRollupIDFn, isRollupIDFn: isRollupIDFn, + logger: logger, } } @@ -221,6 +227,11 @@ func (as *activeRuleSet) ReverseMatch( return NewMatchResult(as.version, nextCutoverNanos, forExistingID, nil, keepOriginal) } +func stagedMetadatasLogField(sm metadata.PipelineMetadatas) zapcore.Field { + out, _ := json.Marshal(sm) + return zap.Any("pipeline:", out) +} + // NB(xichen): can further consolidate pipelines with the same aggregation ID // and same applied pipeline but different storage policies to reduce amount of // data that needed to be stored in memory and sent across the wire. @@ -234,6 +245,13 @@ func (as *activeRuleSet) forwardMatchAt( merge(rollupResults.forExistingID). unique(). toStagedMetadata() + + //out, _ := json.Marshal(rollupResults) + //as.logger.Debug("agg_test, rollupResults," + string(out)) + // + //mapout, _ := json.Marshal(forExistingID) + //as.logger.Debug("agg_test, forExistingID," + string(mapout)) + forNewRollupIDs := make([]IDWithMetadatas, 0, len(rollupResults.forNewRollupIDs)) for _, idWithMatchResult := range rollupResults.forNewRollupIDs { stagedMetadata := idWithMatchResult.matchResults.unique().toStagedMetadata() @@ -307,6 +325,10 @@ func (as *activeRuleSet) rollupResultsFor(id []byte, timeNanos int64) rollupResu ) for _, rollupRule := range as.rollupRules { + + //out, _ := json.Marshal(rollupRule) + //as.logger.Debug("agg_test, iterate rollupRules," + string(out)) + snapshot := rollupRule.activeSnapshot(timeNanos) if snapshot == nil { continue @@ -333,6 +355,9 @@ func (as *activeRuleSet) rollupResultsFor(id []byte, timeNanos int64) rollupResu } for _, target := range snapshot.targets { + //out, _ := json.Marshal(target) + //as.logger.Debug("agg_test, iterate target," + string(out)) + rollupTargets = append(rollupTargets, target.clone()) tags = append(tags, snapshot.tags) } @@ -375,7 +400,9 @@ func (as *activeRuleSet) toRollupResults( ) for idx, target := range targets { + pipeline := target.Pipeline + // A rollup target should always have a non-empty pipeline but // just being defensive here. if pipeline.IsEmpty() { @@ -390,6 +417,10 @@ func (as *activeRuleSet) toRollupResults( firstOp = pipeline.At(0) toApply mpipeline.Pipeline ) + + //out, _ := json.Marshal(target) + //as.logger.Debug("agg_test, iterate toRollupResults target," + string(out) + ", firstOp:" + firstOp.String() + ", numSteps:" + strconv.Itoa(numSteps)) + switch firstOp.Type { case mpipeline.AggregationOpType: aggregationID, err = aggregation.CompressTypes(firstOp.Aggregation.Type) @@ -412,6 +443,9 @@ func (as *activeRuleSet) toRollupResults( tags[idx], matchRollupTargetOptions{generateRollupID: true}, ) + + //as.logger.Debug("agg_test, toRollupResults target match" + strconv.FormatBool(matched) + ", rollupID:" + string(rollupID)) + if !matched { // The incoming metric ID did not match the rollup target. continue @@ -424,7 +458,14 @@ func (as *activeRuleSet) toRollupResults( continue } tagPairs = tagPairs[:0] + + //toApply_out, _ := json.Marshal(toApply) + //as.logger.Debug("agg_test, toRollupResults to apply:" + string(toApply_out)) + applied, err := as.applyIDToPipeline(sortedTagPairBytes, toApply, tagPairs, tags[idx]) + + //as.logger.Debug("agg_test, toRollupResults applied:" + string(applied.String())) + if err != nil { err = fmt.Errorf("failed to apply id %s to pipeline %v: %v", id, toApply, err) multiErr = multiErr.Add(err) @@ -436,6 +477,9 @@ func (as *activeRuleSet) toRollupResults( Pipeline: applied, ResendEnabled: target.ResendEnabled, } + + //as.logger.Debug("agg_test, toRollupResults pipeline: " + newPipeline.String()) + if rollupID == nil { // The applied pipeline applies to the incoming ID. pipelines = append(pipelines, newPipeline) @@ -448,6 +492,10 @@ func (as *activeRuleSet) toRollupResults( cutoverNanos: cutoverNanos, pipelines: []metadata.PipelineMetadata{newPipeline}, } + + //match_out, _ := json.Marshal(matchResults) + //as.logger.Debug("agg_test, toRollupResults matchResults," + string(match_out)) + newRollupIDResult := idWithMatchResults{id: rollupID, matchResults: matchResults} newRollupIDResults = append(newRollupIDResults, newRollupIDResult) } @@ -484,11 +532,18 @@ func (as *activeRuleSet) matchRollupTarget( nameTagValue []byte ) + //rollupOp_out, _ := json.Marshal(rollupOp) + //as.logger.Debug("agg_test, matchRollupTarget rollupOp_out: " + string(rollupOp_out)) + defer sortedTagIter.Close() // Iterate through each tag, looking to match it with corresponding filter tags on the rule for hasMoreTags := sortedTagIter.Next(); hasMoreTags; hasMoreTags = sortedTagIter.Next() { + tagName, tagVal := sortedTagIter.Current() + + //as.logger.Debug("agg_test, matchRollupTarget tagName: " + string(tagName) + ", tagVal: " + string(tagVal)) + // nolint:gosimple isNameTag := bytes.Compare(tagName, nameTagName) == 0 if isNameTag { @@ -532,6 +587,8 @@ func (as *activeRuleSet) matchRollupTarget( } res := bytes.Compare(tagName, rollupTags[matchTagIdx]) + //as.logger.Debug("agg_test, matchRollupTarget match: " + strconv.Itoa(res) + ", tagName" + string(tagName) + ", rollupTag:" + string(rollupTags[matchTagIdx])) + if res == 0 { // Skip excluded tag. matchTagIdx++ @@ -556,6 +613,10 @@ func (as *activeRuleSet) matchRollupTarget( } newName := rollupOp.NewName(nameTagValue) + + //tagPairs_out, _ := json.Marshal(tagPairs) + //as.logger.Debug("agg_test, matchRollupTarget tagPairs: " + string(tagPairs_out)) + return as.newRollupIDFn(newName, tagPairs), true } @@ -585,6 +646,9 @@ func (as *activeRuleSet) applyIDToPipeline( tags, matchRollupTargetOptions{generateRollupID: true}, ) + //rollupOp_out, _ := json.Marshal(rollupOp) + //as.logger.Debug("agg_test, applyIDToPipeline target match: " + strconv.FormatBool(matched) + ", rollupOp: " + string(rollupOp_out) + ", rollupID: " + string(rollupID)) + if !matched { err := fmt.Errorf("existing tag pairs %s do not contain all rollup tags %s", sortedTagPairBytes, rollupOp.Tags) return applied.Pipeline{}, err @@ -593,11 +657,16 @@ func (as *activeRuleSet) applyIDToPipeline( Type: mpipeline.RollupOpType, Rollup: applied.RollupOp{ID: rollupID, AggregationID: rollupOp.AggregationID}, } + + //as.logger.Debug("agg_test, applyIDToPipeline opUnion : " + string(opUnion.String())) + default: return applied.Pipeline{}, fmt.Errorf("unexpected pipeline op type: %v", pipelineOp.Type) } operations = append(operations, opUnion) } + //operations_out, _ := json.Marshal(operations) + //as.logger.Debug("agg_test, applyIDToPipeline opUnion : " + operations) return applied.NewPipeline(operations), nil } diff --git a/src/metrics/rules/ruleset.go b/src/metrics/rules/ruleset.go index 621dc70ff9..a5d75aacfc 100644 --- a/src/metrics/rules/ruleset.go +++ b/src/metrics/rules/ruleset.go @@ -37,6 +37,7 @@ import ( xerrors "github.com/m3db/m3/src/x/errors" "github.com/pborman/uuid" + "go.uber.org/zap" ) const ( @@ -145,10 +146,11 @@ type ruleSet struct { tagsFilterOpts filters.TagsFilterOptions newRollupIDFn metricid.NewIDFn isRollupIDFn metricid.MatchIDFn + log *zap.Logger } // NewRuleSetFromProto creates a new RuleSet from a proto object. -func NewRuleSetFromProto(version int, rs *rulepb.RuleSet, opts Options) (RuleSet, error) { +func NewRuleSetFromProto(version int, rs *rulepb.RuleSet, opts Options, log *zap.Logger) (RuleSet, error) { if rs == nil { return nil, errNilRuleSetProto } @@ -183,6 +185,7 @@ func NewRuleSetFromProto(version int, rs *rulepb.RuleSet, opts Options) (RuleSet tagsFilterOpts: tagsFilterOpts, newRollupIDFn: opts.NewRollupIDFn(), isRollupIDFn: opts.IsRollupIDFn(), + log: log, }, nil } @@ -226,6 +229,7 @@ func (rs *ruleSet) ActiveSet(timeNanos int64) Matcher { rs.tagsFilterOpts, rs.newRollupIDFn, rs.isRollupIDFn, + rs.log, ) } diff --git a/src/metrics/rules/store/kv/store.go b/src/metrics/rules/store/kv/store.go index 78132c347d..f7b51221be 100644 --- a/src/metrics/rules/store/kv/store.go +++ b/src/metrics/rules/store/kv/store.go @@ -77,7 +77,7 @@ func (s *store) ReadRuleSet(nsName string) (rules.RuleSet, error) { return nil, fmt.Errorf("could not fetch RuleSet %s: %v", nsName, err.Error()) } - rs, err := rules.NewRuleSetFromProto(version, &ruleSet, rules.NewOptions()) + rs, err := rules.NewRuleSetFromProto(version, &ruleSet, rules.NewOptions(), nil) if err != nil { return nil, fmt.Errorf("could not fetch RuleSet %s: %v", nsName, err.Error()) } diff --git a/src/query/api/v1/handler/json/write.go b/src/query/api/v1/handler/json/write.go index 934561a345..3c05d8aa7d 100644 --- a/src/query/api/v1/handler/json/write.go +++ b/src/query/api/v1/handler/json/write.go @@ -93,10 +93,10 @@ func (h *WriteJSONHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } if err := h.store.Write(r.Context(), writeQuery); err != nil { - logger := logging.WithContext(r.Context(), h.instrumentOpts) - logger.Error("write error", - zap.String("remoteAddr", r.RemoteAddr), - zap.Error(err)) + //logger := logging.WithContext(r.Context(), h.instrumentOpts) + //logger.Error("write error", + // zap.String("remoteAddr", r.RemoteAddr), + // zap.Error(err)) xhttp.WriteError(w, err) } } From 684f2ac8e87c8db350b6e319b5a954e5375e6134 Mon Sep 17 00:00:00 2001 From: David Yuan Date: Mon, 19 Sep 2022 16:53:44 -0700 Subject: [PATCH 2/3] Add container label for the high cpu alerts --- src/aggregator/aggregator/entry.go | 133 +++++++++--------- src/aggregator/aggregator/list.go | 4 +- .../downsample/metrics_appender.go | 12 +- .../m3coordinator/ingest/m3msg/ingest.go | 24 ++-- .../services/m3coordinator/ingest/write.go | 12 +- 5 files changed, 94 insertions(+), 91 deletions(-) diff --git a/src/aggregator/aggregator/entry.go b/src/aggregator/aggregator/entry.go index 85ff5c8303..893f2b3790 100644 --- a/src/aggregator/aggregator/entry.go +++ b/src/aggregator/aggregator/entry.go @@ -23,7 +23,6 @@ package aggregator import ( "container/list" "errors" - "fmt" "sync" "time" @@ -169,6 +168,7 @@ type entryMetrics struct { } // NewEntryMetrics creates new entry metrics. +// //nolint:golint,revive func NewEntryMetrics(scope tally.Scope) *entryMetrics { scope = scope.SubScope("entry") @@ -819,45 +819,45 @@ func (e *Entry) checkTimestampForTimedMetric( currNanos int64, resolution time.Duration, ) error { - metricTimeNanos := metric.TimeNanos - e.metrics.timed.ingestDelay.RecordDuration(time.Duration(e.nowFn().UnixNano() - metricTimeNanos)) - timedBufferFuture := e.opts.BufferForFutureTimedMetric() - if metricTimeNanos-currNanos > timedBufferFuture.Nanoseconds() { - e.metrics.timed.tooFarInTheFuture.Inc(1) - if !e.opts.VerboseErrors() { - // Don't return verbose errors if not enabled. - return errTooFarInTheFuture - } - timestamp := time.Unix(0, metricTimeNanos) - futureLimit := time.Unix(0, currNanos+timedBufferFuture.Nanoseconds()) - err := fmt.Errorf("datapoint for aggregation too far in future: "+ - "off_by=%s, timestamp=%s, future_limit=%s, "+ - "timestamp_unix_nanos=%d, future_limit_unix_nanos=%d", - timestamp.Sub(futureLimit).String(), - timestamp.Format(errTimestampFormat), - futureLimit.Format(errTimestampFormat), - timestamp.UnixNano(), futureLimit.UnixNano()) - return xerrors.NewRenamedError(errTooFarInTheFuture, err) - } - bufferPastFn := e.opts.BufferForPastTimedMetricFn() - timedBufferPast := bufferPastFn(resolution) - if currNanos-metricTimeNanos > timedBufferPast.Nanoseconds() { - e.metrics.timed.tooFarInThePast.Inc(1) - if !e.opts.VerboseErrors() { - // Don't return verbose errors if not enabled. - return errTooFarInThePast - } - timestamp := time.Unix(0, metricTimeNanos) - pastLimit := time.Unix(0, currNanos-timedBufferPast.Nanoseconds()) - err := fmt.Errorf("datapoint for aggregation too far in past: "+ - "off_by=%s, timestamp=%s, past_limit=%s, "+ - "timestamp_unix_nanos=%d, past_limit_unix_nanos=%d", - pastLimit.Sub(timestamp).String(), - timestamp.Format(errTimestampFormat), - pastLimit.Format(errTimestampFormat), - timestamp.UnixNano(), pastLimit.UnixNano()) - return xerrors.NewRenamedError(errTooFarInThePast, err) - } + //metricTimeNanos := metric.TimeNanos + //e.metrics.timed.ingestDelay.RecordDuration(time.Duration(e.nowFn().UnixNano() - metricTimeNanos)) + //timedBufferFuture := e.opts.BufferForFutureTimedMetric() + //if metricTimeNanos-currNanos > timedBufferFuture.Nanoseconds() { + // e.metrics.timed.tooFarInTheFuture.Inc(1) + // if !e.opts.VerboseErrors() { + // // Don't return verbose errors if not enabled. + // return errTooFarInTheFuture + // } + // timestamp := time.Unix(0, metricTimeNanos) + // futureLimit := time.Unix(0, currNanos+timedBufferFuture.Nanoseconds()) + // err := fmt.Errorf("datapoint for aggregation too far in future: "+ + // "off_by=%s, timestamp=%s, future_limit=%s, "+ + // "timestamp_unix_nanos=%d, future_limit_unix_nanos=%d", + // timestamp.Sub(futureLimit).String(), + // timestamp.Format(errTimestampFormat), + // futureLimit.Format(errTimestampFormat), + // timestamp.UnixNano(), futureLimit.UnixNano()) + // return xerrors.NewRenamedError(errTooFarInTheFuture, err) + //} + //bufferPastFn := e.opts.BufferForPastTimedMetricFn() + //timedBufferPast := bufferPastFn(resolution) + //if currNanos-metricTimeNanos > timedBufferPast.Nanoseconds() { + // e.metrics.timed.tooFarInThePast.Inc(1) + // if !e.opts.VerboseErrors() { + // // Don't return verbose errors if not enabled. + // return errTooFarInThePast + // } + // timestamp := time.Unix(0, metricTimeNanos) + // pastLimit := time.Unix(0, currNanos-timedBufferPast.Nanoseconds()) + // err := fmt.Errorf("datapoint for aggregation too far in past: "+ + // "off_by=%s, timestamp=%s, past_limit=%s, "+ + // "timestamp_unix_nanos=%d, past_limit_unix_nanos=%d", + // pastLimit.Sub(timestamp).String(), + // timestamp.Format(errTimestampFormat), + // pastLimit.Format(errTimestampFormat), + // timestamp.UnixNano(), pastLimit.UnixNano()) + // return xerrors.NewRenamedError(errTooFarInThePast, err) + //} return nil } @@ -1005,33 +1005,36 @@ func (e *Entry) checkLatenessForForwardedMetric( resolution time.Duration, numForwardedTimes int, ) error { - metricTimeNanos := metric.TimeNanos - maxAllowedForwardingDelayFn := e.opts.MaxAllowedForwardingDelayFn() - maxLatenessAllowed := maxAllowedForwardingDelayFn(resolution, numForwardedTimes) - if metadata.ResendEnabled { - maxLatenessAllowed = e.opts.BufferForPastTimedMetricFn()(resolution) - } - if currNanos-metricTimeNanos <= maxLatenessAllowed.Nanoseconds() { - return nil - } - - e.metrics.forwarded.arrivedTooLate.Inc(1) + //metricTimeNanos := metric.TimeNanos + //maxAllowedForwardingDelayFn := e.opts.MaxAllowedForwardingDelayFn() + //maxLatenessAllowed := maxAllowedForwardingDelayFn(resolution, numForwardedTimes) + //if metadata.ResendEnabled { + // maxLatenessAllowed = e.opts.BufferForPastTimedMetricFn()(resolution) + //} - if !e.opts.VerboseErrors() { - // Don't return verbose errors if not enabled. - return errArrivedTooLate - } + return nil - timestamp := time.Unix(0, metricTimeNanos) - pastLimit := time.Unix(0, currNanos-maxLatenessAllowed.Nanoseconds()) - err := fmt.Errorf("datapoint for aggregation forwarded too late: "+ - "id=%s, off_by=%s, timestamp=%s, past_limit=%s, "+ - "timestamp_unix_nanos=%d, past_limit_unix_nanos=%d", - metric.ID, maxLatenessAllowed.String(), - timestamp.Format(errTimestampFormat), - pastLimit.Format(errTimestampFormat), - timestamp.UnixNano(), pastLimit.UnixNano()) - return xerrors.NewRenamedError(errArrivedTooLate, err) + //if currNanos-metricTimeNanos <= maxLatenessAllowed.Nanoseconds() { + // return nil + //} + // + //e.metrics.forwarded.arrivedTooLate.Inc(1) + // + //if !e.opts.VerboseErrors() { + // // Don't return verbose errors if not enabled. + // return errArrivedTooLate + //} + // + //timestamp := time.Unix(0, metricTimeNanos) + //pastLimit := time.Unix(0, currNanos-maxLatenessAllowed.Nanoseconds()) + //err := fmt.Errorf("datapoint for aggregation forwarded too late: "+ + // "id=%s, off_by=%s, timestamp=%s, past_limit=%s, "+ + // "timestamp_unix_nanos=%d, past_limit_unix_nanos=%d", + // metric.ID, maxLatenessAllowed.String(), + // timestamp.Format(errTimestampFormat), + // pastLimit.Format(errTimestampFormat), + // timestamp.UnixNano(), pastLimit.UnixNano()) + //return xerrors.NewRenamedError(errArrivedTooLate, err) } func (e *Entry) updateForwardMetadataWithLock( diff --git a/src/aggregator/aggregator/list.go b/src/aggregator/aggregator/list.go index 797877b5f2..6bfc71883f 100644 --- a/src/aggregator/aggregator/list.go +++ b/src/aggregator/aggregator/list.go @@ -379,9 +379,7 @@ func (l *baseMetricList) flushBefore(beforeNanos int64, flushType flushType) { // processed, add it to the list of elements to collect. elem := e.Value.(metricElem) - if strings.Contains(elem.ID().String(), "node_network_transmit_queue_length:") { - l.log.Debug("agg_test, before flush back: " + elem.ID().String() + ", Type: " + string(elem.Type())) - } + l.log.Debug("agg_test, before flush back: " + elem.ID().String() + ", Type: " + string(elem.Type())) if elem.Consume( beforeNanos, diff --git a/src/cmd/services/m3coordinator/downsample/metrics_appender.go b/src/cmd/services/m3coordinator/downsample/metrics_appender.go index f56d5909af..a1fb264cf2 100644 --- a/src/cmd/services/m3coordinator/downsample/metrics_appender.go +++ b/src/cmd/services/m3coordinator/downsample/metrics_appender.go @@ -259,8 +259,8 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp ) a.mappingRuleStoragePolicies = a.mappingRuleStoragePolicies[:0] if !ruleStagedMetadatas.IsDefault() && len(ruleStagedMetadatas) != 0 { - //a.debugLogMatch("downsampler applying matched rule", - // debugLogMatchOptions{Meta: ruleStagedMetadatas}) + a.debugLogMatch("downsampler applying matched rule", + debugLogMatchOptions{Meta: ruleStagedMetadatas}) // Collect storage policies for all the current active mapping rules. // TODO: we should convert this to iterate over pointers @@ -302,7 +302,7 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp // if so then skip aggregating for that storage policy. // This is what we calculated in the step above. // 2. Any type of drop rule has been set. Drop rules should mean that the auto-mapping rules are ignored. - if !a.curr.Pipelines.IsDropPolicySet() { + if !a.curr.Pipelines.IsDropPolicySet() && false { // No drop rule has been set as part of rule matching. for idx, stagedMetadatasProto := range a.defaultStagedMetadatasProtos { // NB(r): Need to take copy of default staged metadatas as we @@ -376,10 +376,12 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp a.debugLogMatch("downsampler skipping default mapping rule completely", debugLogMatchOptions{Meta: stagedMetadataBeforeFilter}) continue + } else { + continue } - //a.debugLogMatch("downsampler applying default mapping rule", - // debugLogMatchOptions{Meta: stagedMetadatas}) + a.debugLogMatch("downsampler applying default mapping rule", + debugLogMatchOptions{Meta: stagedMetadatas}) pipelines := stagedMetadatas[len(stagedMetadatas)-1] a.curr.Pipelines = append(a.curr.Pipelines, pipelines.Pipelines...) diff --git a/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go b/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go index 0963d4887a..4d90189b1d 100644 --- a/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go +++ b/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go @@ -23,7 +23,6 @@ package ingestm3msg import ( "bytes" "context" - "encoding/json" "github.com/m3db/m3/src/cmd/services/m3coordinator/downsample" "github.com/m3db/m3/src/cmd/services/m3coordinator/server/m3msg" "github.com/m3db/m3/src/metrics/metric/id" @@ -43,9 +42,6 @@ import ( xtime "github.com/m3db/m3/src/x/time" "github.com/uber-go/tally" "go.uber.org/zap" - "strconv" - "strings" - "time" ) // Options configures the ingester. @@ -175,15 +171,17 @@ func (op *ingestOp) sample() bool { func (op *ingestOp) ingest() { - if !strings.Contains(string(op.id), "node_network_transmit_queue_length:") && !strings.Contains(string(op.id), "logdaemon_numLogMessage_count") { - return - } - out, _ := json.Marshal(op.sp) - op.logger.Debug("agg_test, write to storage:" + string(out) + ": metric: " + string(op.id) + ", metricNanos" + strconv.FormatInt(op.metricNanos, 10)) + //if !strings.Contains(string(op.id), "node_network_transmit_queue_length:") && + // !strings.Contains(string(op.id), "logdaemon_numLogMessage_count") && + // !strings.Contains(string(op.id), "logdaemon_rateLimitRemaining_bytes") { + // return + //} + //out, _ := json.Marshal(op.sp) + //op.logger.Debug("agg_test, write to storage:" + string(out) + ": metric: " + string(op.id) + ", metricNanos" + strconv.FormatInt(op.metricNanos, 10)) - if time.Duration(time.Now().UnixNano()-op.metricNanos) < 15*time.Minute { - op.logger.Debug("agg_test, write to storage less than 15 min:" + string(out) + ": metric: " + string(op.id) + ", metricNanos" + strconv.FormatInt(op.metricNanos, 10)) - } + //if time.Duration(time.Now().UnixNano()-op.metricNanos) < 15*time.Minute { + // op.logger.Debug("agg_test, write to storage less than 15 min:" + string(out) + ": metric: " + string(op.id) + ", metricNanos" + strconv.FormatInt(op.metricNanos, 10)) + //} //} //op.logger.Debug("agg_test, write to storage:" + string(out) + ": metric: " + string(op..id)) @@ -220,7 +218,7 @@ func (op *ingestOp) ingest() { return } op.m.ingestSuccess.Inc(1) - op.logger.Debug("agg_test, write to storage succeeded:" + string(out) + ": metric: " + string(op.id) + ", metricNanos" + strconv.FormatInt(op.metricNanos, 10)) + //op.logger.Debug("agg_test, write to storage succeeded:" + string(out) + ": metric: " + string(op.id) + ", metricNanos" + strconv.FormatInt(op.metricNanos, 10)) op.callback.Callback(m3msg.OnSuccess) op.p.Put(op) diff --git a/src/cmd/services/m3coordinator/ingest/write.go b/src/cmd/services/m3coordinator/ingest/write.go index 6daf058a53..10dbeec58b 100644 --- a/src/cmd/services/m3coordinator/ingest/write.go +++ b/src/cmd/services/m3coordinator/ingest/write.go @@ -23,7 +23,6 @@ package ingest import ( "context" "encoding/json" - "strings" "sync" "github.com/m3db/m3/src/cmd/services/m3coordinator/downsample" @@ -487,9 +486,12 @@ func (d *downsamplerAndWriter) writeAggregatedBatch( value := iter.Current() - if !strings.Contains(string(value.Tags.ID()), "node_network_transmit_queue_length") && !strings.Contains(string(value.Tags.ID()), "logdaemon_numLogMessage_count") { - continue - } + //if !strings.Contains(string(value.Tags.ID()), "node_network_transmit_queue_length:") && + // !strings.Contains(string(value.Tags.ID()), "logdaemon_numLogMessage_count") && + // !strings.Contains(string(value.Tags.ID()), "logdaemon_rateLimitRemaining_bytes") && + // !strings.Contains(string(value.Tags.ID()), "logdaemon_streamLatency_milliseconds_bucket") { + // continue + //} //d.logger.Debug("agg_test, start to aggregate metrics:" + string(value.Tags.ID())) @@ -540,7 +542,7 @@ func (d *downsamplerAndWriter) writeAggregatedBatch( } for _, dp := range value.Datapoints { - //d.logger.Debug("agg_test, aggregate metrics time:" + dp.Timestamp.String()) + //d.logger.Debug("agg_test, aggregate metrics time:" + dp.Timestamp.String() + ", tags" + value.Tags.String()) switch value.Attributes.M3Type { case ts.M3MetricTypeGauge: From 2487113a269addffbe87521128e469be00b84bc2 Mon Sep 17 00:00:00 2001 From: David Yuan Date: Mon, 23 Jan 2023 13:21:59 -0800 Subject: [PATCH 3/3] add test log --- src/aggregator/aggregator/entry.go | 137 +++++++++--------- src/aggregator/aggregator/forwarded_writer.go | 9 +- src/aggregator/aggregator/gauge_elem_gen.go | 34 +++-- src/aggregator/aggregator/list.go | 9 +- .../server/m3msg/protobuf_handler.go | 2 +- 5 files changed, 105 insertions(+), 86 deletions(-) diff --git a/src/aggregator/aggregator/entry.go b/src/aggregator/aggregator/entry.go index 893f2b3790..58f949ad82 100644 --- a/src/aggregator/aggregator/entry.go +++ b/src/aggregator/aggregator/entry.go @@ -23,6 +23,7 @@ package aggregator import ( "container/list" "errors" + "fmt" "sync" "time" @@ -819,46 +820,46 @@ func (e *Entry) checkTimestampForTimedMetric( currNanos int64, resolution time.Duration, ) error { - //metricTimeNanos := metric.TimeNanos - //e.metrics.timed.ingestDelay.RecordDuration(time.Duration(e.nowFn().UnixNano() - metricTimeNanos)) - //timedBufferFuture := e.opts.BufferForFutureTimedMetric() - //if metricTimeNanos-currNanos > timedBufferFuture.Nanoseconds() { - // e.metrics.timed.tooFarInTheFuture.Inc(1) - // if !e.opts.VerboseErrors() { - // // Don't return verbose errors if not enabled. - // return errTooFarInTheFuture - // } - // timestamp := time.Unix(0, metricTimeNanos) - // futureLimit := time.Unix(0, currNanos+timedBufferFuture.Nanoseconds()) - // err := fmt.Errorf("datapoint for aggregation too far in future: "+ - // "off_by=%s, timestamp=%s, future_limit=%s, "+ - // "timestamp_unix_nanos=%d, future_limit_unix_nanos=%d", - // timestamp.Sub(futureLimit).String(), - // timestamp.Format(errTimestampFormat), - // futureLimit.Format(errTimestampFormat), - // timestamp.UnixNano(), futureLimit.UnixNano()) - // return xerrors.NewRenamedError(errTooFarInTheFuture, err) - //} - //bufferPastFn := e.opts.BufferForPastTimedMetricFn() - //timedBufferPast := bufferPastFn(resolution) - //if currNanos-metricTimeNanos > timedBufferPast.Nanoseconds() { - // e.metrics.timed.tooFarInThePast.Inc(1) - // if !e.opts.VerboseErrors() { - // // Don't return verbose errors if not enabled. - // return errTooFarInThePast - // } - // timestamp := time.Unix(0, metricTimeNanos) - // pastLimit := time.Unix(0, currNanos-timedBufferPast.Nanoseconds()) - // err := fmt.Errorf("datapoint for aggregation too far in past: "+ - // "off_by=%s, timestamp=%s, past_limit=%s, "+ - // "timestamp_unix_nanos=%d, past_limit_unix_nanos=%d", - // pastLimit.Sub(timestamp).String(), - // timestamp.Format(errTimestampFormat), - // pastLimit.Format(errTimestampFormat), - // timestamp.UnixNano(), pastLimit.UnixNano()) - // return xerrors.NewRenamedError(errTooFarInThePast, err) - //} - return nil + metricTimeNanos := metric.TimeNanos + e.metrics.timed.ingestDelay.RecordDuration(time.Duration(e.nowFn().UnixNano() - metricTimeNanos)) + timedBufferFuture := e.opts.BufferForFutureTimedMetric() + if metricTimeNanos-currNanos > timedBufferFuture.Nanoseconds() { + e.metrics.timed.tooFarInTheFuture.Inc(1) + if !e.opts.VerboseErrors() { + // Don't return verbose errors if not enabled. + return errTooFarInTheFuture + } + timestamp := time.Unix(0, metricTimeNanos) + futureLimit := time.Unix(0, currNanos+timedBufferFuture.Nanoseconds()) + err := fmt.Errorf("datapoint for aggregation too far in future: "+ + "off_by=%s, timestamp=%s, future_limit=%s, "+ + "timestamp_unix_nanos=%d, future_limit_unix_nanos=%d", + timestamp.Sub(futureLimit).String(), + timestamp.Format(errTimestampFormat), + futureLimit.Format(errTimestampFormat), + timestamp.UnixNano(), futureLimit.UnixNano()) + return xerrors.NewRenamedError(errTooFarInTheFuture, err) + } + bufferPastFn := e.opts.BufferForPastTimedMetricFn() + timedBufferPast := bufferPastFn(resolution) + if currNanos-metricTimeNanos > timedBufferPast.Nanoseconds() { + e.metrics.timed.tooFarInThePast.Inc(1) + if !e.opts.VerboseErrors() { + // Don't return verbose errors if not enabled. + return errTooFarInThePast + } + timestamp := time.Unix(0, metricTimeNanos) + pastLimit := time.Unix(0, currNanos-timedBufferPast.Nanoseconds()) + err := fmt.Errorf("datapoint for aggregation too far in past: "+ + "off_by=%s, timestamp=%s, past_limit=%s, "+ + "timestamp_unix_nanos=%d, past_limit_unix_nanos=%d", + pastLimit.Sub(timestamp).String(), + timestamp.Format(errTimestampFormat), + pastLimit.Format(errTimestampFormat), + timestamp.UnixNano(), pastLimit.UnixNano()) + return xerrors.NewRenamedError(errTooFarInThePast, err) + } + //return nil } func (e *Entry) updateTimedMetadataWithLock( @@ -1005,36 +1006,36 @@ func (e *Entry) checkLatenessForForwardedMetric( resolution time.Duration, numForwardedTimes int, ) error { - //metricTimeNanos := metric.TimeNanos - //maxAllowedForwardingDelayFn := e.opts.MaxAllowedForwardingDelayFn() - //maxLatenessAllowed := maxAllowedForwardingDelayFn(resolution, numForwardedTimes) - //if metadata.ResendEnabled { - // maxLatenessAllowed = e.opts.BufferForPastTimedMetricFn()(resolution) - //} + metricTimeNanos := metric.TimeNanos + maxAllowedForwardingDelayFn := e.opts.MaxAllowedForwardingDelayFn() + maxLatenessAllowed := maxAllowedForwardingDelayFn(resolution, numForwardedTimes) + if metadata.ResendEnabled { + maxLatenessAllowed = e.opts.BufferForPastTimedMetricFn()(resolution) + } - return nil + //return nil + + if currNanos-metricTimeNanos <= maxLatenessAllowed.Nanoseconds() { + return nil + } + + e.metrics.forwarded.arrivedTooLate.Inc(1) + + if !e.opts.VerboseErrors() { + // Don't return verbose errors if not enabled. + return errArrivedTooLate + } - //if currNanos-metricTimeNanos <= maxLatenessAllowed.Nanoseconds() { - // return nil - //} - // - //e.metrics.forwarded.arrivedTooLate.Inc(1) - // - //if !e.opts.VerboseErrors() { - // // Don't return verbose errors if not enabled. - // return errArrivedTooLate - //} - // - //timestamp := time.Unix(0, metricTimeNanos) - //pastLimit := time.Unix(0, currNanos-maxLatenessAllowed.Nanoseconds()) - //err := fmt.Errorf("datapoint for aggregation forwarded too late: "+ - // "id=%s, off_by=%s, timestamp=%s, past_limit=%s, "+ - // "timestamp_unix_nanos=%d, past_limit_unix_nanos=%d", - // metric.ID, maxLatenessAllowed.String(), - // timestamp.Format(errTimestampFormat), - // pastLimit.Format(errTimestampFormat), - // timestamp.UnixNano(), pastLimit.UnixNano()) - //return xerrors.NewRenamedError(errArrivedTooLate, err) + timestamp := time.Unix(0, metricTimeNanos) + pastLimit := time.Unix(0, currNanos-maxLatenessAllowed.Nanoseconds()) + err := fmt.Errorf("datapoint for aggregation forwarded too late: "+ + "id=%s, off_by=%s, timestamp=%s, past_limit=%s, "+ + "timestamp_unix_nanos=%d, past_limit_unix_nanos=%d", + metric.ID, maxLatenessAllowed.String(), + timestamp.Format(errTimestampFormat), + pastLimit.Format(errTimestampFormat), + timestamp.UnixNano(), pastLimit.UnixNano()) + return xerrors.NewRenamedError(errArrivedTooLate, err) } func (e *Entry) updateForwardMetadataWithLock( diff --git a/src/aggregator/aggregator/forwarded_writer.go b/src/aggregator/aggregator/forwarded_writer.go index 5e54e12406..4b49ba4bf2 100644 --- a/src/aggregator/aggregator/forwarded_writer.go +++ b/src/aggregator/aggregator/forwarded_writer.go @@ -23,6 +23,8 @@ package aggregator import ( "errors" "fmt" + "strconv" + "strings" "github.com/m3db/m3/src/aggregator/client" "github.com/m3db/m3/src/aggregator/hash" @@ -505,7 +507,7 @@ func (agg *forwardedAggregation) onDone(key aggregationKey) error { return nil } - agg.logger.Debug("agg_test, onDone, aggregationKey:" + key.pipeline.String()) + //agg.logger.Debug("agg_test, onDone, aggregationKey:" + key.pipeline.String()) if agg.byKey[idx].currRefCnt == agg.byKey[idx].totalRefCnt { var ( @@ -534,8 +536,9 @@ func (agg *forwardedAggregation) onDone(key aggregationKey) error { } b.version++ - agg.logger.Debug("agg_test, onDone, metric key:" + string(metric.String())) - + if strings.Contains(metric.String(), "rulemanager_latest_rule_evaluation_time") { + agg.logger.Debug("agg_test, onDone, metric key:" + string(metric.String()) + ", timeNano:" + strconv.FormatInt(metric.TimeNanos, 10)) + } if err := agg.client.WriteForwarded(metric, meta); err != nil { multiErr = multiErr.Add(err) agg.metrics.onDoneWriteErrors.Inc(1) diff --git a/src/aggregator/aggregator/gauge_elem_gen.go b/src/aggregator/aggregator/gauge_elem_gen.go index 6bd0a7dc54..a378922466 100644 --- a/src/aggregator/aggregator/gauge_elem_gen.go +++ b/src/aggregator/aggregator/gauge_elem_gen.go @@ -28,6 +28,7 @@ import ( "fmt" "math" "strconv" + "strings" "sync" "time" @@ -231,12 +232,12 @@ func (e *GaugeElem) Consume( valuesForConsideration := e.values e.values = e.values[:0] for _, value := range valuesForConsideration { - //if strings.Contains(e.ID().String(), "node_network_transmit_queue_length:") { - //e.log.Debug("agg_test, consume value metric:" + string(e.id) + ", startNanos:" + strconv.FormatInt(value.startAtNanos, 10)) - //} + if strings.Contains(e.ID().String(), "rulemanager_latest_rule_evaluation_time") { + e.log.Debug("agg_test, consume value metric:" + string(e.id) + ", startNanos:" + strconv.FormatInt(value.startAtNanos, 10)) + } if !isEarlierThanFn(value.startAtNanos, resolution, targetNanos) { - //e.log.Debug("agg_test, appended, metric:" + string(e.id) + ", targetNanos:" + strconv.FormatInt(targetNanos, 10)) + e.values = append(e.values, value) continue } @@ -246,6 +247,10 @@ func (e *GaugeElem) Consume( // since any metrics intended for this value are rejected for being too late. expiredNanos := targetNanos - e.bufferForPastTimedMetricFn(resolution).Nanoseconds() expired = value.startAtNanos < expiredNanos + + if strings.Contains(e.ID().String(), "rulemanager_latest_rule_evaluation_time") { + e.log.Debug("agg_test, resend is enabled, metric:" + string(e.id) + ", expiredNanos:" + strconv.FormatInt(expiredNanos, 10)) + } } // Modify the by value copy with whether it needs time flush and accumulate. @@ -256,8 +261,6 @@ func (e *GaugeElem) Consume( if !expired { // Keep item. Expired values are GC'd below after consuming. e.values = append(e.values, value) - } else { - e.log.Debug("agg_test, data has expired, metric: " + string(e.id) + ", startNanos:" + strconv.FormatInt(value.startAtNanos, 10) + ", expiredNanos:" + strconv.FormatInt(targetNanos-e.bufferForPastTimedMetricFn(resolution).Nanoseconds(), 10) + ", timeout limit: " + strconv.FormatInt(e.bufferForPastTimedMetricFn(resolution).Nanoseconds(), 10)) } } canCollect := len(e.values) == 0 && e.tombstoned @@ -296,6 +299,11 @@ func (e *GaugeElem) Consume( // Closes the aggregation object after it's processed. if expired { + + if strings.Contains(e.ID().String(), "rulemanager_latest_rule_evaluation_time") { + e.log.Debug("agg_test, resend is enabled, metric:" + string(e.id) + ", startAtNanos:" + strconv.FormatInt(e.toConsume[i].startAtNanos, 10)) + } + // Cleanup expired item. e.toConsume[i].lockedAgg.closed = true e.toConsume[i].lockedAgg.aggregation.Close() @@ -322,7 +330,9 @@ func (e *GaugeElem) Consume( if e.parsedPipeline.HasRollup { forwardedAggregationKey, _ := e.ForwardedAggregationKey() - e.log.Debug("agg_test, HasRollup: " + e.parsedPipeline.Remainder.String()) + if strings.Contains(e.ID().String(), "rulemanager_latest_rule_evaluation_time") { + e.log.Debug("agg_test, HasRollup: " + e.parsedPipeline.Remainder.String()) + } onForwardedFlushedFn(e.onForwardedAggregationWrittenFn, forwardedAggregationKey) } @@ -374,8 +384,6 @@ func (e *GaugeElem) findOrCreate( createOpts createAggregationOptions, ) (*lockedGaugeAggregation, error) { - e.log.Debug("agg_test, findOrCreate:" + string(e.ID()) + ", alignedStart:" + strconv.FormatInt(alignedStart, 10)) - e.RLock() if e.closed { e.RUnlock() @@ -397,6 +405,9 @@ func (e *GaugeElem) findOrCreate( idx, found = e.indexOfWithLock(alignedStart) if found { agg := e.values[idx].lockedAgg + if strings.Contains(string(e.ID()), "rulemanager_latest_rule_evaluation_time") { + e.log.Debug("agg_test, findOrCreate, found! :" + string(e.ID()) + ", startAtNanos:" + strconv.FormatInt(e.values[idx].startAtNanos, 10)) + } e.Unlock() return agg, nil } @@ -430,6 +441,11 @@ func (e *GaugeElem) findOrCreate( }, } agg := e.values[idx].lockedAgg + + if strings.Contains(string(e.ID()), "rulemanager_latest_rule_evaluation_time") { + e.log.Debug("agg_test, findOrCreate, not found! :" + string(e.ID()) + ", startAtNanos:" + strconv.FormatInt(alignedStart, 10)) + } + e.Unlock() return agg, nil } diff --git a/src/aggregator/aggregator/list.go b/src/aggregator/aggregator/list.go index 6bfc71883f..4df73c1f72 100644 --- a/src/aggregator/aggregator/list.go +++ b/src/aggregator/aggregator/list.go @@ -24,7 +24,6 @@ import ( "container/list" "errors" "fmt" - "strings" "sync" "sync/atomic" "time" @@ -261,9 +260,9 @@ func (l *baseMetricList) PushBack(value metricElem) (*list.Element, error) { return nil, errListClosed } - if strings.Contains(value.ID().String(), "node_network_transmit_queue_length:") { - l.log.Debug("agg_test, push back: " + value.ID().String()) - } + //if strings.Contains(value.ID().String(), "node_network_transmit_queue_length:") { + // l.log.Debug("agg_test, push back: " + value.ID().String()) + //} elem := l.aggregations.PushBack(value) if !hasForwardedID { @@ -379,7 +378,7 @@ func (l *baseMetricList) flushBefore(beforeNanos int64, flushType flushType) { // processed, add it to the list of elements to collect. elem := e.Value.(metricElem) - l.log.Debug("agg_test, before flush back: " + elem.ID().String() + ", Type: " + string(elem.Type())) + //l.log.Debug("agg_test, before flush back: " + elem.ID().String() + ", Type: " + string(elem.Type())) if elem.Consume( beforeNanos, diff --git a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go index e2972c1dcb..aa768275e5 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go +++ b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go @@ -128,7 +128,7 @@ func (h *pbHandler) Process(msg consumer.Message) { } } - if strings.Contains(string(dec.ID()), "node_network_transmit_queue_length:") { + if strings.Contains(string(dec.ID()), "rulemanager_latest_rule_evaluation_time") { out, _ := json.Marshal(sp) h.logger.Debug("agg_test, process message:" + string(out) + ": metrics:" + string(dec.ID())) }