Skip to content

Commit

Permalink
add matcher cache for proxy store API
Browse files Browse the repository at this point in the history
  • Loading branch information
yuchen-db committed Dec 12, 2024
1 parent 1c69c7e commit 8d8aa94
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 17 deletions.
10 changes: 7 additions & 3 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ func runReceive(
options := []store.ProxyStoreOption{
store.WithProxyStoreDebugLogging(debugLogging),
store.WithoutDedup(),
store.WithMatcherConverter(conf.matcherConverterCacheCapacity, reg),
}

proxy := store.NewProxyStore(
Expand Down Expand Up @@ -932,9 +933,10 @@ type receiveConfig struct {

asyncForwardWorkerCount uint

numTopMetricsPerTenant int
topMetricsMinimumCardinality uint64
topMetricsUpdateInterval time.Duration
numTopMetricsPerTenant int
topMetricsMinimumCardinality uint64
topMetricsUpdateInterval time.Duration
matcherConverterCacheCapacity int

featureList *[]string
}
Expand Down Expand Up @@ -1097,6 +1099,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
Default("10000").Uint64Var(&rc.topMetricsMinimumCardinality)
cmd.Flag("receive.top-metrics-update-interval", "The interval at which the top metrics are updated.").
Default("5m").DurationVar(&rc.topMetricsUpdateInterval)
cmd.Flag("receive.store-matcher-converter-cache-capacity", "The number of label matchers to cache in the matcher converter for the Store API.").
Default("1000").IntVar(&rc.matcherConverterCacheCapacity)
rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings()
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ type ProxyStore struct {
tsdbSelector *TSDBSelector
quorumChunkDedup bool
enableDedup bool
matcherConverter *storepb.MatcherConverter
}

type proxyStoreMetrics struct {
Expand Down Expand Up @@ -162,6 +163,18 @@ func WithoutDedup() ProxyStoreOption {
}
}

func WithMatcherConverter(cacheCapacity int, reg prometheus.Registerer) ProxyStoreOption {
return func(s *ProxyStore) {
matcherConverter, err := storepb.NewMatcherConverter(cacheCapacity, reg)
if err != nil {
level.Error(s.logger).Log("msg", "failed to create matcher converter", "err", err)
return
}
level.Info(s.logger).Log("msg", "created matcher converter", "cache_capacity", cacheCapacity)
s.matcherConverter = matcherConverter
}
}

// NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client.
// Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL).
func NewProxyStore(
Expand Down
107 changes: 93 additions & 14 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ import (
"bytes"
"encoding/binary"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"sort"
"strconv"
"strings"

"github.com/gogo/protobuf/types"
cache "github.com/hashicorp/golang-lru/v2"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -381,31 +384,107 @@ func PromMatchersToMatchers(ms ...*labels.Matcher) ([]LabelMatcher, error) {
return res, nil
}

func MatcherToPromMatcher(m LabelMatcher) (*labels.Matcher, error) {
var t labels.MatchType

switch m.Type {
case LabelMatcher_EQ:
t = labels.MatchEqual
case LabelMatcher_NEQ:
t = labels.MatchNotEqual
case LabelMatcher_RE:
t = labels.MatchRegexp
case LabelMatcher_NRE:
t = labels.MatchNotRegexp
default:
return nil, errors.Errorf("unrecognized label matcher type %d", m.Type)
}
pm, err := labels.NewMatcher(t, m.Name, m.Value)
if err != nil {
return nil, err
}
return pm, nil
}

// MatchersToPromMatchers returns Prometheus matchers from proto matchers.
// NOTE: It allocates memory.
func MatchersToPromMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) {
res := make([]*labels.Matcher, 0, len(ms))
for _, m := range ms {
var t labels.MatchType
m, err := MatcherToPromMatcher(m)
if err != nil {
return nil, err
}
res = append(res, m)
}
return res, nil
}

switch m.Type {
case LabelMatcher_EQ:
t = labels.MatchEqual
case LabelMatcher_NEQ:
t = labels.MatchNotEqual
case LabelMatcher_RE:
t = labels.MatchRegexp
case LabelMatcher_NRE:
t = labels.MatchNotRegexp
default:
return nil, errors.Errorf("unrecognized label matcher type %d", m.Type)
type MatcherConverter struct {
cache *cache.TwoQueueCache[LabelMatcher, *labels.Matcher]
cacheCapacity int
metrics *matcherConverterMetrics
}

type matcherConverterMetrics struct {
cacheHitCount prometheus.Counter
cacheMissCount prometheus.Counter
cacheWriteCount prometheus.Counter
cacheSizeGauge prometheus.Gauge
}

func newMatcherConverterMetrics(reg prometheus.Registerer) *matcherConverterMetrics {
var m matcherConverterMetrics

m.cacheHitCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_store_matcher_converter_cache_hit_total",
Help: "Total number of cache hit.",
})
m.cacheMissCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_store_matcher_converter_cache_miss_total",
Help: "Total number of cache miss.",
})
m.cacheWriteCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_store_matcher_converter_cache_write_total",
Help: "Total number of cache write.",
})
m.cacheSizeGauge = promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_store_matcher_converter_cache_size",
Help: "Current size of the cache.",
})

return &m
}

func NewMatcherConverter(cacheCapacity int, reg prometheus.Registerer) (*MatcherConverter, error) {
c, err := cache.New2Q[LabelMatcher, *labels.Matcher](cacheCapacity)
if err != nil {
return nil, err
}
metrics := newMatcherConverterMetrics(reg)
return &MatcherConverter{cache: c, cacheCapacity: cacheCapacity, metrics: metrics}, nil
}

func (c *MatcherConverter) MatchersToPromMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) {
res := make([]*labels.Matcher, 0, len(ms))
for _, m := range ms {
if pm, ok := c.cache.Get(m); ok {
// cache hit
c.metrics.cacheHitCount.Inc()
res = append(res, pm)
continue
}
m, err := labels.NewMatcher(t, m.Name, m.Value)
// cache miss
c.metrics.cacheMissCount.Inc()
pm, err := MatcherToPromMatcher(m)
if err != nil {
return nil, err
}
res = append(res, m)
c.cache.Add(m, pm)
c.metrics.cacheWriteCount.Inc()
res = append(res, pm)
}
c.metrics.cacheSizeGauge.Set(float64(c.cache.Len()))
return res, nil
}

Expand Down

0 comments on commit 8d8aa94

Please sign in to comment.