diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 0460947724..d320ce12f6 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -358,6 +358,7 @@ func runReceive( options := []store.ProxyStoreOption{ store.WithProxyStoreDebugLogging(debugLogging), store.WithoutDedup(), + store.WithMatcherConverter(conf.matcherConverterCacheCapacity, reg), } proxy := store.NewProxyStore( @@ -932,9 +933,10 @@ type receiveConfig struct { asyncForwardWorkerCount uint - numTopMetricsPerTenant int - topMetricsMinimumCardinality uint64 - topMetricsUpdateInterval time.Duration + numTopMetricsPerTenant int + topMetricsMinimumCardinality uint64 + topMetricsUpdateInterval time.Duration + matcherConverterCacheCapacity int featureList *[]string } @@ -1097,6 +1099,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { Default("10000").Uint64Var(&rc.topMetricsMinimumCardinality) cmd.Flag("receive.top-metrics-update-interval", "The interval at which the top metrics are updated."). Default("5m").DurationVar(&rc.topMetricsUpdateInterval) + cmd.Flag("receive.store-matcher-converter-cache-capacity", "The number of label matchers to cache in the matcher converter for the Store API."). + Default("1000").IntVar(&rc.matcherConverterCacheCapacity) rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings() } diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 488bcf1cde..2d92ba8c91 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -99,6 +99,7 @@ type ProxyStore struct { tsdbSelector *TSDBSelector quorumChunkDedup bool enableDedup bool + matcherConverter *storepb.MatcherConverter } type proxyStoreMetrics struct { @@ -162,6 +163,18 @@ func WithoutDedup() ProxyStoreOption { } } +func WithMatcherConverter(cacheCapacity int, reg prometheus.Registerer) ProxyStoreOption { + return func(s *ProxyStore) { + matcherConverter, err := storepb.NewMatcherConverter(cacheCapacity, reg) + if err != nil { + level.Error(s.logger).Log("msg", "failed to create matcher converter", "err", err) + return + } + level.Info(s.logger).Log("msg", "created matcher converter", "cache_capacity", cacheCapacity) + s.matcherConverter = matcherConverter + } +} + // NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client. // Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL). func NewProxyStore( diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index 650074c9d6..3c65de3563 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -7,11 +7,14 @@ import ( "bytes" "encoding/binary" "fmt" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "sort" "strconv" "strings" "github.com/gogo/protobuf/types" + cache "github.com/hashicorp/golang-lru/v2" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" "google.golang.org/grpc/codes" @@ -381,31 +384,107 @@ func PromMatchersToMatchers(ms ...*labels.Matcher) ([]LabelMatcher, error) { return res, nil } +func MatcherToPromMatcher(m LabelMatcher) (*labels.Matcher, error) { + var t labels.MatchType + + switch m.Type { + case LabelMatcher_EQ: + t = labels.MatchEqual + case LabelMatcher_NEQ: + t = labels.MatchNotEqual + case LabelMatcher_RE: + t = labels.MatchRegexp + case LabelMatcher_NRE: + t = labels.MatchNotRegexp + default: + return nil, errors.Errorf("unrecognized label matcher type %d", m.Type) + } + pm, err := labels.NewMatcher(t, m.Name, m.Value) + if err != nil { + return nil, err + } + return pm, nil +} + // MatchersToPromMatchers returns Prometheus matchers from proto matchers. // NOTE: It allocates memory. func MatchersToPromMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) { res := make([]*labels.Matcher, 0, len(ms)) for _, m := range ms { - var t labels.MatchType + m, err := MatcherToPromMatcher(m) + if err != nil { + return nil, err + } + res = append(res, m) + } + return res, nil +} - switch m.Type { - case LabelMatcher_EQ: - t = labels.MatchEqual - case LabelMatcher_NEQ: - t = labels.MatchNotEqual - case LabelMatcher_RE: - t = labels.MatchRegexp - case LabelMatcher_NRE: - t = labels.MatchNotRegexp - default: - return nil, errors.Errorf("unrecognized label matcher type %d", m.Type) +type MatcherConverter struct { + cache *cache.TwoQueueCache[LabelMatcher, *labels.Matcher] + cacheCapacity int + metrics *matcherConverterMetrics +} + +type matcherConverterMetrics struct { + cacheHitCount prometheus.Counter + cacheMissCount prometheus.Counter + cacheWriteCount prometheus.Counter + cacheSizeGauge prometheus.Gauge +} + +func newMatcherConverterMetrics(reg prometheus.Registerer) *matcherConverterMetrics { + var m matcherConverterMetrics + + m.cacheHitCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_store_matcher_converter_cache_hit_total", + Help: "Total number of cache hit.", + }) + m.cacheMissCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_store_matcher_converter_cache_miss_total", + Help: "Total number of cache miss.", + }) + m.cacheWriteCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_store_matcher_converter_cache_write_total", + Help: "Total number of cache write.", + }) + m.cacheSizeGauge = promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "thanos_store_matcher_converter_cache_size", + Help: "Current size of the cache.", + }) + + return &m +} + +func NewMatcherConverter(cacheCapacity int, reg prometheus.Registerer) (*MatcherConverter, error) { + c, err := cache.New2Q[LabelMatcher, *labels.Matcher](cacheCapacity) + if err != nil { + return nil, err + } + metrics := newMatcherConverterMetrics(reg) + return &MatcherConverter{cache: c, cacheCapacity: cacheCapacity, metrics: metrics}, nil +} + +func (c *MatcherConverter) MatchersToPromMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) { + res := make([]*labels.Matcher, 0, len(ms)) + for _, m := range ms { + if pm, ok := c.cache.Get(m); ok { + // cache hit + c.metrics.cacheHitCount.Inc() + res = append(res, pm) + continue } - m, err := labels.NewMatcher(t, m.Name, m.Value) + // cache miss + c.metrics.cacheMissCount.Inc() + pm, err := MatcherToPromMatcher(m) if err != nil { return nil, err } - res = append(res, m) + c.cache.Add(m, pm) + c.metrics.cacheWriteCount.Inc() + res = append(res, pm) } + c.metrics.cacheSizeGauge.Set(float64(c.cache.Len())) return res, nil }