diff --git a/CHANGELOG.md b/CHANGELOG.md index 55348abae9..41cf15a0bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## master / unreleased +* [BUGFIX] Querier: Fix querier limiter bug under multiselect. #5627 * [CHANGE] Ruler: Add `cortex_ruler_rule_group_load_duration_seconds` and `cortex_ruler_rule_group_sync_duration_seconds` metrics. #5609 * [CHANGE] Ruler: Add contextual info and query statistics to log * [FEATURE] Ruler: Add support for disabling rule groups. #5521 diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 1455dbe37e..df13fb7ce6 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -242,6 +242,11 @@ type QueryableWithFilter interface { UseQueryable(now time.Time, queryMinT, queryMaxT int64) bool } +type limiterHolder struct { + limiter *limiter.QueryLimiter + limiterInitializer sync.Once +} + // NewQueryable creates a new Queryable for cortex. func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, chunkIterFn chunkIteratorFunc, cfg Config, limits *validation.Overrides, tombstonesLoader purger.TombstonesLoader) storage.Queryable { return storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) { @@ -256,6 +261,7 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, queryStoreForLabels: cfg.QueryStoreForLabels, distributor: distributor, stores: stores, + limiterHolder: &limiterHolder{}, } return q, nil @@ -273,6 +279,7 @@ type querier struct { queryStoreForLabels bool distributor QueryableWithFilter stores []QueryableWithFilter + limiterHolder *limiterHolder } func (q querier) setupFromCtx(ctx context.Context) (context.Context, string, int64, int64, storage.Querier, []storage.Querier, error) { @@ -281,7 +288,11 @@ func (q querier) setupFromCtx(ctx context.Context) (context.Context, string, int return ctx, userID, 0, 0, nil, nil, err } - ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(q.limits.MaxFetchedSeriesPerQuery(userID), q.limits.MaxFetchedChunkBytesPerQuery(userID), q.limits.MaxChunksPerQuery(userID), q.limits.MaxFetchedDataBytesPerQuery(userID))) + q.limiterHolder.limiterInitializer.Do(func() { + q.limiterHolder.limiter = limiter.NewQueryLimiter(q.limits.MaxFetchedSeriesPerQuery(userID), q.limits.MaxFetchedChunkBytesPerQuery(userID), q.limits.MaxChunksPerQuery(userID), q.limits.MaxFetchedDataBytesPerQuery(userID)) + }) + + ctx = limiter.AddQueryLimiterToContext(ctx, q.limiterHolder.limiter) mint, maxt, err := validateQueryTimeRange(ctx, userID, q.mint, q.maxt, q.limits, q.maxQueryIntoFuture) if err != nil { diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 9d1882257a..ed3d6abc29 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -311,6 +311,201 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) { } } +type tenantLimit struct { + MaxFetchedSeriesPerQuery int +} + +func (t tenantLimit) ByUserID(userID string) *validation.Limits { + return &validation.Limits{ + MaxFetchedSeriesPerQuery: t.MaxFetchedSeriesPerQuery, + } +} + +func (t tenantLimit) AllByUserID() map[string]*validation.Limits { + defaults := DefaultLimitsConfig() + return map[string]*validation.Limits{ + "0": &defaults, + } +} + +// By testing limits, we are ensuring queries with multiple selects share the query limiter +func TestLimits(t *testing.T) { + t.Parallel() + start := time.Now().Add(-2 * time.Hour) + end := time.Now() + ctx := user.InjectOrgID(context.Background(), "0") + var cfg Config + flagext.DefaultValues(&cfg) + + const chunks = 1 + + labelsSets := []labels.Labels{ + { + {Name: model.MetricNameLabel, Value: "foo"}, + {Name: "order", Value: "1"}, + }, + { + {Name: model.MetricNameLabel, Value: "foo"}, + {Name: "order", Value: "2"}, + }, + { + {Name: model.MetricNameLabel, Value: "foo"}, + {Name: "orders", Value: "3"}, + }, + { + {Name: model.MetricNameLabel, Value: "bar"}, + {Name: "orders", Value: "4"}, + }, + { + {Name: model.MetricNameLabel, Value: "bar"}, + {Name: "orders", Value: "5"}, + }, + } + + _, samples := mockTSDB(t, labelsSets, model.Time(start.Unix()*1000), int(chunks*samplesPerChunk), sampleRate, chunkOffset, int(samplesPerChunk)) + + streamResponse := client.QueryStreamResponse{ + Timeseries: []cortexpb.TimeSeries{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: model.MetricNameLabel, Value: "foo"}, + {Name: "order", Value: "2"}, + }, + Samples: samples, + }, + { + Labels: []cortexpb.LabelAdapter{ + {Name: model.MetricNameLabel, Value: "foo"}, + {Name: "order", Value: "1"}, + }, + Samples: samples, + }, + { + Labels: []cortexpb.LabelAdapter{ + {Name: model.MetricNameLabel, Value: "foo"}, + {Name: "orders", Value: "3"}, + }, + Samples: samples, + }, + { + Labels: []cortexpb.LabelAdapter{ + {Name: model.MetricNameLabel, Value: "bar"}, + {Name: "orders", Value: "2"}, + }, + Samples: samples, + }, + { + Labels: []cortexpb.LabelAdapter{ + {Name: model.MetricNameLabel, Value: "bar"}, + {Name: "orders", Value: "1"}, + }, + Samples: samples, + }, + }, + } + + distributor := &MockLimitingDistributor{ + response: &streamResponse, + } + + distributorQueryableStreaming := newDistributorQueryable(distributor, true, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, cfg.QueryStoreForLabels) + + tCases := []struct { + name string + description string + distributorQueryable QueryableWithFilter + storeQueriables []QueryableWithFilter + tenantLimit validation.TenantLimits + query string + assert func(t *testing.T, r *promql.Result) + }{ + { + + name: "should result in limit failure for multi-select and an individual select hits the series limit", + description: "query results in multi-select but duplicate finger prints get deduped but still results in # of series greater than limit", + query: "foo + foo", + distributorQueryable: distributorQueryableStreaming, + storeQueriables: []QueryableWithFilter{UseAlwaysQueryable(distributorQueryableStreaming)}, + tenantLimit: &tenantLimit{ + MaxFetchedSeriesPerQuery: 2, + }, + assert: func(t *testing.T, r *promql.Result) { + require.Error(t, r.Err) + }, + }, + { + + name: "should not result in limit failure for multi-select and the query does not hit the series limit", + description: "query results in multi-select but duplicate series finger prints get deduped resulting in # of series within the limit", + query: "foo + foo", + distributorQueryable: distributorQueryableStreaming, + storeQueriables: []QueryableWithFilter{UseAlwaysQueryable(distributorQueryableStreaming)}, + tenantLimit: &tenantLimit{ + MaxFetchedSeriesPerQuery: 3, + }, + assert: func(t *testing.T, r *promql.Result) { + require.NoError(t, r.Err) + }, + }, + { + + name: "should result in limit failure for multi-select and query hits the series limit", + description: "query results in multi-select but each individual select does not hit the limit but cumulatively the query hits the limit", + query: "foo + bar", + distributorQueryable: distributorQueryableStreaming, + storeQueriables: []QueryableWithFilter{UseAlwaysQueryable(distributorQueryableStreaming)}, + tenantLimit: &tenantLimit{ + MaxFetchedSeriesPerQuery: 3, + }, + assert: func(t *testing.T, r *promql.Result) { + require.Error(t, r.Err) + }, + }, + { + + name: "should not result in limit failure for multi-select and query does not hit the series limit", + description: "query results in multi-select and the cumulative limit is >= series", + query: "foo + bar", + distributorQueryable: distributorQueryableStreaming, + storeQueriables: []QueryableWithFilter{UseAlwaysQueryable(distributorQueryableStreaming)}, + tenantLimit: &tenantLimit{ + MaxFetchedSeriesPerQuery: 5, + }, + assert: func(t *testing.T, r *promql.Result) { + require.NoError(t, r.Err) + }, + }, + } + + for i, tc := range tCases { + t.Run(tc.name+fmt.Sprintf(", Test: %d", i), func(t *testing.T) { + wDistributorQueriable := &wrappedSampleAndChunkQueryable{QueryableWithFilter: tc.distributorQueryable} + var wQueriables []QueryableWithFilter + for _, queriable := range tc.storeQueriables { + wQueriables = append(wQueriables, &wrappedSampleAndChunkQueryable{QueryableWithFilter: queriable}) + } + overrides, err := validation.NewOverrides(DefaultLimitsConfig(), tc.tenantLimit) + require.NoError(t, err) + + queryable := NewQueryable(wDistributorQueriable, wQueriables, batch.NewChunkMergeIterator, cfg, overrides, purger.NewNoopTombstonesLoader()) + opts := promql.EngineOpts{ + Logger: log.NewNopLogger(), + MaxSamples: 1e6, + Timeout: 1 * time.Minute, + } + + queryEngine := promql.NewEngine(opts) + + query, err := queryEngine.NewRangeQuery(ctx, queryable, nil, tc.query, start, end, 1*time.Minute) + require.NoError(t, err) + + r := query.Exec(ctx) + + tc.assert(t, r) + }) + } +} + func TestQuerier(t *testing.T) { t.Parallel() var cfg Config diff --git a/pkg/querier/testutils.go b/pkg/querier/testutils.go index 2489f712a0..650d93f727 100644 --- a/pkg/querier/testutils.go +++ b/pkg/querier/testutils.go @@ -2,7 +2,8 @@ package querier import ( "context" - + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/util/limiter" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/scrape" @@ -60,6 +61,46 @@ func (m *MockDistributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricM return args.Get(0).([]scrape.MetricMetadata), args.Error(1) } +type MockLimitingDistributor struct { + MockDistributor + response *client.QueryStreamResponse +} + +func (m *MockLimitingDistributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) { + var ( + queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx) + ) + s := make([][]cortexpb.LabelAdapter, 0, len(m.response.Chunkseries)+len(m.response.Timeseries)) + + response := &client.QueryStreamResponse{} + for _, series := range m.response.Chunkseries { + for _, label := range series.Labels { + for _, matcher := range matchers { + if matcher.Matches(label.Value) { + s = append(s, series.Labels) + response.Chunkseries = append(response.Chunkseries, series) + } + } + } + } + + for _, series := range m.response.Timeseries { + for _, label := range series.Labels { + for _, matcher := range matchers { + if matcher.Matches(label.Value) { + s = append(s, series.Labels) + response.Timeseries = append(response.Timeseries, series) + } + } + } + } + + if limitErr := queryLimiter.AddSeries(s...); limitErr != nil { + return nil, validation.LimitError(limitErr.Error()) + } + return response, nil +} + type TestConfig struct { Cfg Config Distributor Distributor