Skip to content

Commit

Permalink
Fix for #5620
Browse files Browse the repository at this point in the history
Signed-off-by: Anand Rajagopal <[email protected]>
  • Loading branch information
rajagopalanand committed Nov 1, 2023
1 parent 1dfd5f9 commit c0159e5
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## master / unreleased
* [BUGFIX] Querier: Bug fix for https://github.com/cortexproject/cortex/issues/5620. #5627
* [CHANGE] Ruler: Add `cortex_ruler_rule_group_load_duration_seconds` and `cortex_ruler_rule_group_sync_duration_seconds` metrics. #5609
* [CHANGE] Ruler: Add contextual info and query statistics to log
* [FEATURE] Ruler: Add support for disabling rule groups. #5521
Expand Down
13 changes: 12 additions & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ type QueryableWithFilter interface {
UseQueryable(now time.Time, queryMinT, queryMaxT int64) bool
}

type limiterHolder struct {
limiter *limiter.QueryLimiter
limiterInitializer sync.Once
}

// NewQueryable creates a new Queryable for cortex.
func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, chunkIterFn chunkIteratorFunc, cfg Config, limits *validation.Overrides, tombstonesLoader purger.TombstonesLoader) storage.Queryable {
return storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) {
Expand All @@ -256,6 +261,7 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter,
queryStoreForLabels: cfg.QueryStoreForLabels,
distributor: distributor,
stores: stores,
limiterHolder: &limiterHolder{},
}

return q, nil
Expand All @@ -273,6 +279,7 @@ type querier struct {
queryStoreForLabels bool
distributor QueryableWithFilter
stores []QueryableWithFilter
limiterHolder *limiterHolder
}

func (q querier) setupFromCtx(ctx context.Context) (context.Context, string, int64, int64, storage.Querier, []storage.Querier, error) {
Expand All @@ -281,7 +288,11 @@ func (q querier) setupFromCtx(ctx context.Context) (context.Context, string, int
return ctx, userID, 0, 0, nil, nil, err
}

ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(q.limits.MaxFetchedSeriesPerQuery(userID), q.limits.MaxFetchedChunkBytesPerQuery(userID), q.limits.MaxChunksPerQuery(userID), q.limits.MaxFetchedDataBytesPerQuery(userID)))
q.limiterHolder.limiterInitializer.Do(func() {
q.limiterHolder.limiter = limiter.NewQueryLimiter(q.limits.MaxFetchedSeriesPerQuery(userID), q.limits.MaxFetchedChunkBytesPerQuery(userID), q.limits.MaxChunksPerQuery(userID), q.limits.MaxFetchedDataBytesPerQuery(userID))
})

ctx = limiter.AddQueryLimiterToContext(ctx, q.limiterHolder.limiter)

mint, maxt, err := validateQueryTimeRange(ctx, userID, q.mint, q.maxt, q.limits, q.maxQueryIntoFuture)
if err != nil {
Expand Down
195 changes: 195 additions & 0 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,201 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) {
}
}

type tenantLimit struct {
MaxFetchedSeriesPerQuery int
}

func (t tenantLimit) ByUserID(userID string) *validation.Limits {
return &validation.Limits{
MaxFetchedSeriesPerQuery: t.MaxFetchedSeriesPerQuery,
}
}

func (t tenantLimit) AllByUserID() map[string]*validation.Limits {
defaults := DefaultLimitsConfig()
return map[string]*validation.Limits{
"0": &defaults,
}
}

// By testing limits, we are ensuring queries with multiple selects share the query limiter
func TestLimits(t *testing.T) {
t.Parallel()
start := time.Now().Add(-2 * time.Hour)
end := time.Now()
ctx := user.InjectOrgID(context.Background(), "0")
var cfg Config
flagext.DefaultValues(&cfg)

const chunks = 1

labelsSets := []labels.Labels{
{
{Name: model.MetricNameLabel, Value: "foo"},
{Name: "order", Value: "1"},
},
{
{Name: model.MetricNameLabel, Value: "foo"},
{Name: "order", Value: "2"},
},
{
{Name: model.MetricNameLabel, Value: "foo"},
{Name: "orders", Value: "3"},
},
{
{Name: model.MetricNameLabel, Value: "bar"},
{Name: "orders", Value: "4"},
},
{
{Name: model.MetricNameLabel, Value: "bar"},
{Name: "orders", Value: "5"},
},
}

_, samples := mockTSDB(t, labelsSets, model.Time(start.Unix()*1000), int(chunks*samplesPerChunk), sampleRate, chunkOffset, int(samplesPerChunk))

unorderedResponse := client.QueryStreamResponse{
Timeseries: []cortexpb.TimeSeries{
{
Labels: []cortexpb.LabelAdapter{
{Name: model.MetricNameLabel, Value: "foo"},
{Name: "order", Value: "2"},
},
Samples: samples,
},
{
Labels: []cortexpb.LabelAdapter{
{Name: model.MetricNameLabel, Value: "foo"},
{Name: "order", Value: "1"},
},
Samples: samples,
},
{
Labels: []cortexpb.LabelAdapter{
{Name: model.MetricNameLabel, Value: "foo"},
{Name: "orders", Value: "3"},
},
Samples: samples,
},
{
Labels: []cortexpb.LabelAdapter{
{Name: model.MetricNameLabel, Value: "bar"},
{Name: "orders", Value: "2"},
},
Samples: samples,
},
{
Labels: []cortexpb.LabelAdapter{
{Name: model.MetricNameLabel, Value: "bar"},
{Name: "orders", Value: "1"},
},
Samples: samples,
},
},
}

distributor := &MockLimitingDistributor{
response: &unorderedResponse,
}

distributorQueryableStreaming := newDistributorQueryable(distributor, true, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, cfg.QueryStoreForLabels)

tCases := []struct {
name string
description string
distributorQueryable QueryableWithFilter
storeQueriables []QueryableWithFilter
tenantLimit validation.TenantLimits
query string
assert func(t *testing.T, r *promql.Result)
}{
{

name: "should result in limit failure for multi-select and an individual select hits the series limit",
description: "query results in multi-select but duplicate finger prints get deduped but still results in # of series greater than limit",
query: "sum_over_time(foo[1d:10s]) + avg_over_time(foo[1d:10s])",
distributorQueryable: distributorQueryableStreaming,
storeQueriables: []QueryableWithFilter{UseAlwaysQueryable(distributorQueryableStreaming)},
tenantLimit: &tenantLimit{
MaxFetchedSeriesPerQuery: 2,
},
assert: func(t *testing.T, r *promql.Result) {
require.Error(t, r.Err)
},
},
{

name: "should not result in limit failure for multi-select and the query does not hit the series limit",
description: "query results in multi-select but duplicate series finger prints get deduped resulting in # of series within the limit",
query: "sum_over_time(foo[1d:10s]) + avg_over_time(foo[1d:10s])",
distributorQueryable: distributorQueryableStreaming,
storeQueriables: []QueryableWithFilter{UseAlwaysQueryable(distributorQueryableStreaming)},
tenantLimit: &tenantLimit{
MaxFetchedSeriesPerQuery: 3,
},
assert: func(t *testing.T, r *promql.Result) {
require.NoError(t, r.Err)
},
},
{

name: "should result in limit failure for multi-select and query hits the series limit",
description: "query results in multi-select but each individual select does not hit the limit but cumulatively the query hits the limit",
query: "sum_over_time(foo[1d:10s]) + avg_over_time(bar[1d:10s])",
distributorQueryable: distributorQueryableStreaming,
storeQueriables: []QueryableWithFilter{UseAlwaysQueryable(distributorQueryableStreaming)},
tenantLimit: &tenantLimit{
MaxFetchedSeriesPerQuery: 3,
},
assert: func(t *testing.T, r *promql.Result) {
require.Error(t, r.Err)
},
},
{

name: "should not result in limit failure for multi-select and query does not hit the series limit",
description: "query results in multi-select and the cumulative limit is >= series",
query: "sum_over_time(foo[1d:10s]) + avg_over_time(bar[1d:10s])",
distributorQueryable: distributorQueryableStreaming,
storeQueriables: []QueryableWithFilter{UseAlwaysQueryable(distributorQueryableStreaming)},
tenantLimit: &tenantLimit{
MaxFetchedSeriesPerQuery: 5,
},
assert: func(t *testing.T, r *promql.Result) {
require.NoError(t, r.Err)
},
},
}

for i, tc := range tCases {
t.Run(tc.name+fmt.Sprintf(", Test: %d", i), func(t *testing.T) {
wDistributorQueriable := &wrappedSampleAndChunkQueryable{QueryableWithFilter: tc.distributorQueryable}
var wQueriables []QueryableWithFilter
for _, queriable := range tc.storeQueriables {
wQueriables = append(wQueriables, &wrappedSampleAndChunkQueryable{QueryableWithFilter: queriable})
}
overrides, err := validation.NewOverrides(DefaultLimitsConfig(), tc.tenantLimit)
require.NoError(t, err)

queryable := NewQueryable(wDistributorQueriable, wQueriables, batch.NewChunkMergeIterator, cfg, overrides, purger.NewNoopTombstonesLoader())
opts := promql.EngineOpts{
Logger: log.NewNopLogger(),
MaxSamples: 1e6,
Timeout: 1 * time.Minute,
}

queryEngine := promql.NewEngine(opts)

query, err := queryEngine.NewRangeQuery(ctx, queryable, nil, tc.query, start, end, 1*time.Minute)
require.NoError(t, err)

r := query.Exec(ctx)

tc.assert(t, r)
})
}
}

func TestQuerier(t *testing.T) {
t.Parallel()
var cfg Config
Expand Down
44 changes: 44 additions & 0 deletions pkg/querier/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package querier

import (
"context"
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/util/limiter"
"sync"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -60,6 +63,47 @@ func (m *MockDistributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricM
return args.Get(0).([]scrape.MetricMetadata), args.Error(1)
}

type MockLimitingDistributor struct {
MockDistributor
response *client.QueryStreamResponse
rMutex sync.Mutex
}

func (m *MockLimitingDistributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) {
var (
queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx)
)
s := make([][]cortexpb.LabelAdapter, 0, len(m.response.Chunkseries)+len(m.response.Timeseries))

response := &client.QueryStreamResponse{}
for _, series := range m.response.Chunkseries {
for _, label := range series.Labels {
for _, matcher := range matchers {
if matcher.Matches(label.Value) {
s = append(s, series.Labels)
response.Chunkseries = append(response.Chunkseries, series)
}
}
}
}

for _, series := range m.response.Timeseries {
for _, label := range series.Labels {
for _, matcher := range matchers {
if matcher.Matches(label.Value) {
s = append(s, series.Labels)
response.Timeseries = append(response.Timeseries, series)
}
}
}
}

if limitErr := queryLimiter.AddSeries(s...); limitErr != nil {
return nil, validation.LimitError(limitErr.Error())
}
return response, nil
}

type TestConfig struct {
Cfg Config
Distributor Distributor
Expand Down

0 comments on commit c0159e5

Please sign in to comment.