Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add query sampling for checking #7

Open
wants to merge 1 commit into
base: db_obs_m3coord_cache
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/m3db/m3/src/query/graphite/graphite"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/storage/cache"
"github.com/m3db/m3/src/query/storage/m3"
"github.com/m3db/m3/src/query/storage/m3/consolidators"
"github.com/m3db/m3/src/query/storage/m3/storagemetadata"
Expand Down Expand Up @@ -146,8 +147,8 @@ type Configuration struct {
// ListenAddress is the server listen address.
ListenAddress *string `yaml:"listenAddress"`

// RedisAddress is the Redis address for caching.
RedisCacheAddress *string `yaml:"redisCacheAddress"`
// Specifies the settings for Redis cache-enabled coordinators
RedisCacheSpec *cache.RedisCacheSpec `yaml:"redisCacheSpec"`

// Filter is the read/write/complete tags filter configuration.
Filter FilterConfiguration `yaml:"filter"`
Expand Down
4 changes: 2 additions & 2 deletions src/query/api/v1/handler/prom/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func withEngine(promQLEngineFn options.PromQLEngineFn, instant bool) Option {
func newDefaultOptions(hOpts options.HandlerOptions) opts {
// Check if Redis Address exists, otherwise let it be ""
redisAddress := ""
if hOpts.Config().RedisCacheAddress != nil {
redisAddress = *hOpts.Config().RedisCacheAddress
if hOpts.Config().RedisCacheSpec != nil && hOpts.Config().RedisCacheSpec.RedisCacheAddress != "" {
redisAddress = hOpts.Config().RedisCacheSpec.RedisCacheAddress
}
queryable := prometheus.NewPrometheusQueryable(
prometheus.PrometheusOptions{
Expand Down
95 changes: 95 additions & 0 deletions src/query/api/v1/handler/prom/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ package prom
import (
"context"
"errors"
"math"
"math/rand"
"net/http"
"sort"
"sync"

"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
Expand Down Expand Up @@ -84,14 +87,67 @@ var (
params.Now)
}
}

// Ratio of queries we make a check for
DefaultCheckSampleRate float64 = 0.0
// Threshold in % to determine if there's difference in results (1 means 1% diff)
DefaultComparePercentThreshold float64 = 1.0

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1% difference cross all buckets?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it checks the final aggregated result, so it's a 1% difference in the final result

)

// Compares results a, b to the specified percent threshold
// Results should be vectors
func compareResults(a, b *promql.Result, threshold float64) bool {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What may cause the value different?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there can be some slight floating point precision errors especially with comparison, so I thought a % threshold would be best

if a.Value.Type() != parser.ValueTypeVector || b.Value.Type() != parser.ValueTypeVector {
return false
}
v1 := a.Value.(promql.Vector)
v2 := b.Value.(promql.Vector)

if len(v1) != len(v2) {
return false
} else {
sort.Slice(v1, func(i, j int) bool {
return v1[i].Metric.String() < v1[j].Metric.String()
})
sort.Slice(v2, func(i, j int) bool {
return v2[i].Metric.String() < v2[j].Metric.String()
})

for i := range v1 {
if v1[i].Metric.String() != v2[i].Metric.String() {
return false
}
if v1[i].Point.V == 0 && v2[i].Point.V != 0 {
return false
}
percent_diff := math.Abs(v1[i].Point.V-v2[i].Point.V) / v1[i].Point.V * 100
if percent_diff > threshold {
return false
}
}
}
return true
}

type queryCheckMetrics struct {
queryCheckMismatch tally.Counter
queryCheckTotal tally.Counter
}

type queryCheckConfig struct {
CheckSampleRate float64
ComparePercentThreshold float64
}

type readHandler struct {
hOpts options.HandlerOptions
scope tally.Scope
logger *zap.Logger
opts opts
returnedDataMetrics native.PromReadReturnedDataMetrics

queryCheckConfig queryCheckConfig
queryCheckMetrics queryCheckMetrics
}

func newReadHandler(
Expand All @@ -101,12 +157,29 @@ func newReadHandler(
scope := hOpts.InstrumentOpts().MetricsScope().Tagged(
map[string]string{"handler": "prometheus-read"},
)
queryCheckMetrics := queryCheckMetrics{
queryCheckMismatch: scope.Counter("query_check_mismatches"),
queryCheckTotal: scope.Counter("query_check_total"),
}
checkSampleRate := DefaultCheckSampleRate
comparePercentThreshold := DefaultComparePercentThreshold
// If specified, use config, otherwise default
if hOpts.Config().RedisCacheSpec != nil {
checkSampleRate = hOpts.Config().RedisCacheSpec.CheckSampleRate
comparePercentThreshold = hOpts.Config().RedisCacheSpec.ComparePercentThreshold
}
queryCheckConfig := queryCheckConfig{
CheckSampleRate: checkSampleRate,
ComparePercentThreshold: comparePercentThreshold,
}
return &readHandler{
hOpts: hOpts,
opts: options,
scope: scope,
logger: hOpts.InstrumentOpts().Logger(),
returnedDataMetrics: native.NewPromReadReturnedDataMetrics(scope),
queryCheckMetrics: queryCheckMetrics,
queryCheckConfig: queryCheckConfig,
}, nil
}

Expand Down Expand Up @@ -149,6 +222,7 @@ func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer qry.Close()

res := qry.Exec(ctx)
// h.logger.Info("final result", zap.String("result", res.Value.String()), zap.Float64("base", CheckSampleRate))
if res.Err != nil {
h.logger.Error("error executing query",
zap.Error(res.Err), zap.String("query", params.Query),
Expand Down Expand Up @@ -177,6 +251,27 @@ func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

// Rulemanager results are vector values (list of metric + value)
// Take a random number and check if under rate so we check a proportion of the queries
if rand.Float64() < float64(h.queryCheckConfig.CheckSampleRate) && res.Value.Type() == parser.ValueTypeVector {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a big deal, but do we need float64, why not float32?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this check into redis_cache.go? Put this a this section code into root HTTP method is exposing the storage/cache knowledge to upper layer which is not good.

query, err := h.opts.newQueryFn(params)
if err != nil {
h.logger.Error("Comparison query failed to create")
}
defer query.Close()
// Set context so we can default to M3DB later on
result := query.Exec(context.WithValue(ctx, "UseM3DB", true))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename name to m3dbQueryResult to avoid confusion between res and result

if result.Err != nil {
h.logger.Error("Comparison query failed to execute")
} else {
if result != nil && !compareResults(res, result, h.queryCheckConfig.ComparePercentThreshold) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you know this res if queried from cache instead of m3db if cache miss the hit?

h.queryCheckMetrics.queryCheckMismatch.Inc(1)
h.logger.Info("mismatch", zap.String("query", qry.String()))
}
h.queryCheckMetrics.queryCheckTotal.Inc(1)
}
}

for _, warn := range resultMetadata.Warnings {
res.Warnings = append(res.Warnings, errors.New(warn.Message))
}
Expand Down
14 changes: 12 additions & 2 deletions src/query/storage/cache/redis_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ const (
CreateAfterTime time.Duration = 100 * time.Millisecond
)

type RedisCacheSpec struct {
// RedisAddress is the Redis address for caching.
RedisCacheAddress string `yaml:"redisCacheAddress"`

// The % of queries we check against the result using only M3DB
CheckSampleRate float64 `yaml:"checkSampleRate"`

// The % diff threshold to declare inequality
ComparePercentThreshold float64 `yaml:"comparePercentThreshold"`
}

type RedisCache struct {
client radix.Client
redisAddress string
Expand Down Expand Up @@ -94,13 +105,12 @@ func NewRedisCache(redisAddress string, logger *zap.Logger, scope tally.Scope) *
return nil
}
logger.Info("Connection to Redis established", zap.String("address", redisAddress))
cache := &RedisCache{
return &RedisCache{
client: pool,
redisAddress: redisAddress,
logger: logger,
cacheMetrics: NewCacheMetrics(scope),
}
return cache
}

// Given a fetch query, converts it into a key for Redis
Expand Down
13 changes: 10 additions & 3 deletions src/query/storage/prometheus/prometheus_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,16 @@ func (q *querier) Select(
}

// result, err := q.storage.FetchProm(q.ctx, query, fetchOptions)
result, err := cache.WindowGetOrFetch(q.ctx, q.storage, fetchOptions, query, q.cache)
if err != nil {
return promstorage.ErrSeriesSet(NewStorageErr(err))
useM3DB := q.ctx.Value("UseM3DB")
var result storage.PromResult
var e error
if useM3DB != nil && useM3DB.(bool) {
result, e = q.storage.FetchProm(q.ctx, query, fetchOptions)
} else {
result, e = cache.WindowGetOrFetch(q.ctx, q.storage, fetchOptions, query, q.cache)
}
if e != nil {
return promstorage.ErrSeriesSet(NewStorageErr(e))
}
seriesSet := fromQueryResult(sortSeries, result.PromResult, result.Metadata)

Expand Down