Skip to content
This repository has been archived by the owner on Jan 12, 2023. It is now read-only.

Commit

Permalink
Fix issues around multiple registrations (cortexproject#2309)
Browse files Browse the repository at this point in the history
Move all the caches up and don't create new ones for each schema.

Signed-off-by: Goutham Veeramachaneni <[email protected]>
  • Loading branch information
gouthamve authored Mar 20, 2020
1 parent a1fc64d commit fcf6640
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 31 deletions.
4 changes: 1 addition & 3 deletions pkg/chunk/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,7 @@ func testCacheMultiple(t *testing.T, cache cache.Cache, keys []string, chunks []
}

func testChunkFetcher(t *testing.T, c cache.Cache, keys []string, chunks []chunk.Chunk) {
fetcher, err := chunk.NewChunkFetcher(cache.Config{
Cache: c,
}, false, nil)
fetcher, err := chunk.NewChunkFetcher(c, false, nil)
require.NoError(t, err)
defer fetcher.Stop()

Expand Down
5 changes: 4 additions & 1 deletion pkg/chunk/cache/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,16 @@ func NewMemcachedClient(cfg MemcachedClientConfig, name string, r prometheus.Reg
serverList: selector,
hostname: cfg.Host,
service: cfg.Service,
addresses: strings.Split(cfg.Addresses, ","),
provider: dns.NewProvider(util.Logger, dnsProviderRegisterer, dns.GolangResolverType),
quit: make(chan struct{}),

numServers: memcacheServersDiscovered.WithLabelValues(name),
}

if len(cfg.Addresses) > 0 {
newClient.addresses = strings.Split(cfg.Addresses, ",")
}

err := newClient.updateMemcacheServers()
if err != nil {
level.Error(util.Logger).Log("msg", "error setting memcache servers to host", "host", cfg.Host, "err", err)
Expand Down
7 changes: 6 additions & 1 deletion pkg/chunk/cache/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,14 @@ func (m *mockCache) Fetch(ctx context.Context, keys []string) (found []string, b
func (m *mockCache) Stop() {
}

// NewMockCache makes a new MockCache
// NewMockCache makes a new MockCache.
func NewMockCache() Cache {
return &mockCache{
cache: map[string][]byte{},
}
}

// NewNoopCache returns a no-op cache.
func NewNoopCache() Cache {
return NewTiered(nil)
}
4 changes: 2 additions & 2 deletions pkg/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ type store struct {
*Fetcher
}

func newStore(cfg StoreConfig, schema Schema, index IndexClient, chunks Client, limits StoreLimits) (Store, error) {
fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, cfg.chunkCacheStubs, chunks)
func newStore(cfg StoreConfig, schema Schema, index IndexClient, chunks Client, limits StoreLimits, chunksCache cache.Cache) (Store, error) {
fetcher, err := NewChunkFetcher(chunksCache, cfg.chunkCacheStubs, chunks)
if err != nil {
return nil, err
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/chunk/chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,13 @@ func newTestChunkStoreConfig(t require.TestingT, schemaName string, storeCfg Sto
overrides, err := validation.NewOverrides(limits, nil)
require.NoError(t, err)

chunksCache, err := cache.New(storeCfg.ChunkCacheConfig)
require.NoError(t, err)
writeDedupeCache, err := cache.New(storeCfg.WriteDedupeCacheConfig)
require.NoError(t, err)

store := NewCompositeStore()
err = store.AddPeriod(storeCfg, schemaCfg.Configs[0], storage, storage, overrides)
err = store.AddPeriod(storeCfg, schemaCfg.Configs[0], storage, storage, overrides, chunksCache, writeDedupeCache)
require.NoError(t, err)
return store
}
Expand Down
10 changes: 2 additions & 8 deletions pkg/chunk/chunk_store_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,10 @@ type decodeResponse struct {
}

// NewChunkFetcher makes a new ChunkFetcher.
func NewChunkFetcher(cfg cache.Config, cacheStubs bool, storage Client) (*Fetcher, error) {
cfg.Prefix = "chunks"
cache, err := cache.New(cfg)
if err != nil {
return nil, err
}

func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, storage Client) (*Fetcher, error) {
c := &Fetcher{
storage: storage,
cache: cache,
cache: cacher,
cacheStubs: cacheStubs,
decodeRequests: make(chan decodeRequest),
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/chunk/composite_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"

"github.com/cortexproject/cortex/pkg/chunk/cache"
)

// StoreLimits helps get Limits specific to Queries for Stores
Expand Down Expand Up @@ -56,15 +58,15 @@ func NewCompositeStore() CompositeStore {
}

// AddPeriod adds the configuration for a period of time to the CompositeStore
func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index IndexClient, chunks Client, limits StoreLimits) error {
func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index IndexClient, chunks Client, limits StoreLimits, chunksCache, writeDedupeCache cache.Cache) error {
schema := cfg.CreateSchema()
var store Store
var err error
switch cfg.Schema {
case "v9", "v10", "v11":
store, err = newSeriesStore(storeCfg, schema, index, chunks, limits)
store, err = newSeriesStore(storeCfg, schema, index, chunks, limits, chunksCache, writeDedupeCache)
default:
store, err = newStore(storeCfg, schema, index, chunks, limits)
store, err = newStore(storeCfg, schema, index, chunks, limits, chunksCache)
}
if err != nil {
return err
Expand Down
9 changes: 2 additions & 7 deletions pkg/chunk/series_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,8 @@ type seriesStore struct {
writeDedupeCache cache.Cache
}

func newSeriesStore(cfg StoreConfig, schema Schema, index IndexClient, chunks Client, limits StoreLimits) (Store, error) {
fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, cfg.chunkCacheStubs, chunks)
if err != nil {
return nil, err
}

writeDedupeCache, err := cache.New(cfg.WriteDedupeCacheConfig)
func newSeriesStore(cfg StoreConfig, schema Schema, index IndexClient, chunks Client, limits StoreLimits, chunksCache, writeDedupeCache cache.Cache) (Store, error) {
fetcher, err := NewChunkFetcher(chunksCache, cfg.chunkCacheStubs, chunks)
if err != nil {
return nil, err
}
Expand Down
22 changes: 18 additions & 4 deletions pkg/chunk/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,28 @@ func (cfg *Config) Validate() error {

// NewStore makes the storage clients based on the configuration.
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits StoreLimits) (chunk.Store, error) {
tieredCache, err := cache.New(cfg.IndexQueriesCacheConfig)
indexReadCache, err := cache.New(cfg.IndexQueriesCacheConfig)
if err != nil {
return nil, err
}

writeDedupeCache, err := cache.New(storeCfg.WriteDedupeCacheConfig)
if err != nil {
return nil, err
}

chunkCacheCfg := storeCfg.ChunkCacheConfig
chunkCacheCfg.Prefix = "chunks"
chunksCache, err := cache.New(chunkCacheCfg)
if err != nil {
return nil, err
}

// Cache is shared by multiple stores, which means they will try and Stop
// it more than once. Wrap in a StopOnce to prevent this.
tieredCache = cache.StopOnce(tieredCache)
indexReadCache = cache.StopOnce(indexReadCache)
chunksCache = cache.StopOnce(chunksCache)
writeDedupeCache = cache.StopOnce(writeDedupeCache)

err = schemaCfg.Load()
if err != nil {
Expand All @@ -115,7 +129,7 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf
if err != nil {
return nil, errors.Wrap(err, "error creating index client")
}
index = newCachingIndexClient(index, tieredCache, cfg.IndexCacheValidity, limits)
index = newCachingIndexClient(index, indexReadCache, cfg.IndexCacheValidity, limits)

objectStoreType := s.ObjectType
if objectStoreType == "" {
Expand All @@ -126,7 +140,7 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf
return nil, errors.Wrap(err, "error creating object client")
}

err = stores.AddPeriod(storeCfg, s, index, chunks, limits)
err = stores.AddPeriod(storeCfg, s, index, chunks, limits, chunksCache, writeDedupeCache)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/chunk/testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/cache"
promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util/flagext"
Expand Down Expand Up @@ -135,7 +136,7 @@ func SetupTestChunkStore() (chunk.Store, error) {
flagext.DefaultValues(&storeCfg)

store := chunk.NewCompositeStore()
err = store.AddPeriod(storeCfg, schemaCfg.Configs[0], storage, storage, overrides)
err = store.AddPeriod(storeCfg, schemaCfg.Configs[0], storage, storage, overrides, cache.NewNoopCache(), cache.NewNoopCache())
if err != nil {
return nil, err
}
Expand Down

0 comments on commit fcf6640

Please sign in to comment.