diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index 29ae366405..7236a9a419 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -35,6 +35,7 @@ import ( "github.com/m3db/m3/src/query/graphite/graphite" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" + "github.com/m3db/m3/src/query/storage/cache" "github.com/m3db/m3/src/query/storage/m3" "github.com/m3db/m3/src/query/storage/m3/consolidators" "github.com/m3db/m3/src/query/storage/m3/storagemetadata" @@ -146,8 +147,8 @@ type Configuration struct { // ListenAddress is the server listen address. ListenAddress *string `yaml:"listenAddress"` - // RedisAddress is the Redis address for caching. - RedisCacheAddress *string `yaml:"redisCacheAddress"` + // Specifies the settings for Redis cache-enabled coordinators + RedisCacheSpec *cache.RedisCacheSpec `yaml:"redisCacheSpec"` // Filter is the read/write/complete tags filter configuration. Filter FilterConfiguration `yaml:"filter"` diff --git a/src/query/api/v1/handler/prom/prom.go b/src/query/api/v1/handler/prom/prom.go index 7be2cc39f2..96938dbb33 100644 --- a/src/query/api/v1/handler/prom/prom.go +++ b/src/query/api/v1/handler/prom/prom.go @@ -69,8 +69,8 @@ func withEngine(promQLEngineFn options.PromQLEngineFn, instant bool) Option { func newDefaultOptions(hOpts options.HandlerOptions) opts { // Check if Redis Address exists, otherwise let it be "" redisAddress := "" - if hOpts.Config().RedisCacheAddress != nil { - redisAddress = *hOpts.Config().RedisCacheAddress + if hOpts.Config().RedisCacheSpec != nil && hOpts.Config().RedisCacheSpec.RedisCacheAddress != "" { + redisAddress = hOpts.Config().RedisCacheSpec.RedisCacheAddress } queryable := prometheus.NewPrometheusQueryable( prometheus.PrometheusOptions{ diff --git a/src/query/api/v1/handler/prom/read.go b/src/query/api/v1/handler/prom/read.go index ddb46d41bd..b91b861a98 100644 --- a/src/query/api/v1/handler/prom/read.go +++ b/src/query/api/v1/handler/prom/read.go @@ -25,7 +25,10 @@ package prom import ( "context" "errors" + "math" + "math/rand" "net/http" + "sort" "sync" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" @@ -84,14 +87,67 @@ var ( params.Now) } } + + // Ratio of queries we make a check for + DefaultCheckSampleRate float64 = 0.0 + // Threshold in % to determine if there's difference in results (1 means 1% diff) + DefaultComparePercentThreshold float64 = 1.0 ) +// Compares results a, b to the specified percent threshold +// Results should be vectors +func compareResults(a, b *promql.Result, threshold float64) bool { + if a.Value.Type() != parser.ValueTypeVector || b.Value.Type() != parser.ValueTypeVector { + return false + } + v1 := a.Value.(promql.Vector) + v2 := b.Value.(promql.Vector) + + if len(v1) != len(v2) { + return false + } else { + sort.Slice(v1, func(i, j int) bool { + return v1[i].Metric.String() < v1[j].Metric.String() + }) + sort.Slice(v2, func(i, j int) bool { + return v2[i].Metric.String() < v2[j].Metric.String() + }) + + for i := range v1 { + if v1[i].Metric.String() != v2[i].Metric.String() { + return false + } + if v1[i].Point.V == 0 && v2[i].Point.V != 0 { + return false + } + percent_diff := math.Abs(v1[i].Point.V-v2[i].Point.V) / v1[i].Point.V * 100 + if percent_diff > threshold { + return false + } + } + } + return true +} + +type queryCheckMetrics struct { + queryCheckMismatch tally.Counter + queryCheckTotal tally.Counter +} + +type queryCheckConfig struct { + CheckSampleRate float64 + ComparePercentThreshold float64 +} + type readHandler struct { hOpts options.HandlerOptions scope tally.Scope logger *zap.Logger opts opts returnedDataMetrics native.PromReadReturnedDataMetrics + + queryCheckConfig queryCheckConfig + queryCheckMetrics queryCheckMetrics } func newReadHandler( @@ -101,12 +157,29 @@ func newReadHandler( scope := hOpts.InstrumentOpts().MetricsScope().Tagged( map[string]string{"handler": "prometheus-read"}, ) + queryCheckMetrics := queryCheckMetrics{ + queryCheckMismatch: scope.Counter("query_check_mismatches"), + queryCheckTotal: scope.Counter("query_check_total"), + } + checkSampleRate := DefaultCheckSampleRate + comparePercentThreshold := DefaultComparePercentThreshold + // If specified, use config, otherwise default + if hOpts.Config().RedisCacheSpec != nil { + checkSampleRate = hOpts.Config().RedisCacheSpec.CheckSampleRate + comparePercentThreshold = hOpts.Config().RedisCacheSpec.ComparePercentThreshold + } + queryCheckConfig := queryCheckConfig{ + CheckSampleRate: checkSampleRate, + ComparePercentThreshold: comparePercentThreshold, + } return &readHandler{ hOpts: hOpts, opts: options, scope: scope, logger: hOpts.InstrumentOpts().Logger(), returnedDataMetrics: native.NewPromReadReturnedDataMetrics(scope), + queryCheckMetrics: queryCheckMetrics, + queryCheckConfig: queryCheckConfig, }, nil } @@ -149,6 +222,7 @@ func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { defer qry.Close() res := qry.Exec(ctx) + // h.logger.Info("final result", zap.String("result", res.Value.String()), zap.Float64("base", CheckSampleRate)) if res.Err != nil { h.logger.Error("error executing query", zap.Error(res.Err), zap.String("query", params.Query), @@ -177,6 +251,27 @@ func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + // Rulemanager results are vector values (list of metric + value) + // Take a random number and check if under rate so we check a proportion of the queries + if rand.Float64() < float64(h.queryCheckConfig.CheckSampleRate) && res.Value.Type() == parser.ValueTypeVector { + query, err := h.opts.newQueryFn(params) + if err != nil { + h.logger.Error("Comparison query failed to create") + } + defer query.Close() + // Set context so we can default to M3DB later on + result := query.Exec(context.WithValue(ctx, "UseM3DB", true)) + if result.Err != nil { + h.logger.Error("Comparison query failed to execute") + } else { + if result != nil && !compareResults(res, result, h.queryCheckConfig.ComparePercentThreshold) { + h.queryCheckMetrics.queryCheckMismatch.Inc(1) + h.logger.Info("mismatch", zap.String("query", qry.String())) + } + h.queryCheckMetrics.queryCheckTotal.Inc(1) + } + } + for _, warn := range resultMetadata.Warnings { res.Warnings = append(res.Warnings, errors.New(warn.Message)) } diff --git a/src/query/storage/cache/redis_cache.go b/src/query/storage/cache/redis_cache.go index f7ea482143..7affd674a9 100644 --- a/src/query/storage/cache/redis_cache.go +++ b/src/query/storage/cache/redis_cache.go @@ -27,6 +27,17 @@ const ( CreateAfterTime time.Duration = 100 * time.Millisecond ) +type RedisCacheSpec struct { + // RedisAddress is the Redis address for caching. + RedisCacheAddress string `yaml:"redisCacheAddress"` + + // The % of queries we check against the result using only M3DB + CheckSampleRate float64 `yaml:"checkSampleRate"` + + // The % diff threshold to declare inequality + ComparePercentThreshold float64 `yaml:"comparePercentThreshold"` +} + type RedisCache struct { client radix.Client redisAddress string @@ -94,13 +105,12 @@ func NewRedisCache(redisAddress string, logger *zap.Logger, scope tally.Scope) * return nil } logger.Info("Connection to Redis established", zap.String("address", redisAddress)) - cache := &RedisCache{ + return &RedisCache{ client: pool, redisAddress: redisAddress, logger: logger, cacheMetrics: NewCacheMetrics(scope), } - return cache } // Given a fetch query, converts it into a key for Redis diff --git a/src/query/storage/prometheus/prometheus_storage.go b/src/query/storage/prometheus/prometheus_storage.go index f20303ae9f..d4cddc12eb 100644 --- a/src/query/storage/prometheus/prometheus_storage.go +++ b/src/query/storage/prometheus/prometheus_storage.go @@ -145,9 +145,16 @@ func (q *querier) Select( } // result, err := q.storage.FetchProm(q.ctx, query, fetchOptions) - result, err := cache.WindowGetOrFetch(q.ctx, q.storage, fetchOptions, query, q.cache) - if err != nil { - return promstorage.ErrSeriesSet(NewStorageErr(err)) + useM3DB := q.ctx.Value("UseM3DB") + var result storage.PromResult + var e error + if useM3DB != nil && useM3DB.(bool) { + result, e = q.storage.FetchProm(q.ctx, query, fetchOptions) + } else { + result, e = cache.WindowGetOrFetch(q.ctx, q.storage, fetchOptions, query, q.cache) + } + if e != nil { + return promstorage.ErrSeriesSet(NewStorageErr(e)) } seriesSet := fromQueryResult(sortSeries, result.PromResult, result.Metadata)