Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow compactor to use bucket index when sync metas #5641

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -298,4 +298,13 @@ compactor:
# When enabled, index verification will ignore out of order label names.
# CLI flag: -compactor.accept-malformed-index
[accept_malformed_index: <boolean> | default = false]

# When enabled and bucket index is also enabled in bucket store, bucket index
# metadata fetcher will be used in syncer
# CLI flag: -compactor.bucket-index-metadata-fetcher-enabled
[bucket_index_metadata_fetcher_enabled: <boolean> | default = false]

# When enabled, caching bucket will be used
# CLI flag: -compactor.caching-bucket-enabled
[caching_bucket_enabled: <boolean> | default = false]
```
9 changes: 9 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1990,6 +1990,15 @@ sharding_ring:
# When enabled, index verification will ignore out of order label names.
# CLI flag: -compactor.accept-malformed-index
[accept_malformed_index: <boolean> | default = false]

# When enabled and bucket index is also enabled in bucket store, bucket index
# metadata fetcher will be used in syncer
# CLI flag: -compactor.bucket-index-metadata-fetcher-enabled
[bucket_index_metadata_fetcher_enabled: <boolean> | default = false]

# When enabled, caching bucket will be used
# CLI flag: -compactor.caching-bucket-enabled
[caching_bucket_enabled: <boolean> | default = false]
```

### `configs_config`
Expand Down
109 changes: 76 additions & 33 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"flag"
"fmt"
"github.com/cortexproject/cortex/pkg/storegateway"
"github.com/thanos-io/thanos/pkg/extprom"
"hash/fnv"
"math/rand"
"os"
Expand Down Expand Up @@ -209,6 +211,9 @@ type Config struct {
BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"`

AcceptMalformedIndex bool `yaml:"accept_malformed_index"`

BucketIndexMetadataFetcherEnabled bool `yaml:"bucket_index_metadata_fetcher_enabled"`
CachingBucketEnabled bool `yaml:"caching_bucket_enabled"`
}

// RegisterFlags registers the Compactor flags.
Expand Down Expand Up @@ -247,6 +252,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.BlockVisitMarkerFileUpdateInterval, "compactor.block-visit-marker-file-update-interval", 1*time.Minute, "How frequently block visit marker file should be updated duration compaction.")

f.BoolVar(&cfg.AcceptMalformedIndex, "compactor.accept-malformed-index", false, "When enabled, index verification will ignore out of order label names.")
f.BoolVar(&cfg.BucketIndexMetadataFetcherEnabled, "compactor.bucket-index-metadata-fetcher-enabled", false, "When enabled and bucket index is also enabled in bucket store, bucket index metadata fetcher will be used in syncer")
f.BoolVar(&cfg.CachingBucketEnabled, "compactor.caching-bucket-enabled", false, "When enabled, caching bucket will be used")
}

func (cfg *Config) Validate(limits validation.Limits) error {
Expand Down Expand Up @@ -320,6 +327,9 @@ type Compactor struct {
ringSubservices *services.Manager
ringSubservicesWatcher *services.FailureWatcher

//sharding strategy
shardingStrategy storegateway.ShardingStrategy

// Metrics.
compactionRunsStarted prometheus.Counter
compactionRunsInterrupted prometheus.Counter
Expand Down Expand Up @@ -474,6 +484,30 @@ func newCompactor(
if len(compactorCfg.DisabledTenants) > 0 {
level.Info(c.logger).Log("msg", "compactor using disabled users", "disabled", strings.Join(compactorCfg.DisabledTenants, ", "))
}
var err error
if c.compactorCfg.ShardingEnabled {
lifecyclerCfg := c.compactorCfg.ShardingRing.ToLifecyclerConfig()
c.ringLifecycler, err = ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", ringKey, true, false, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer))
if err != nil {
return nil, errors.Wrap(err, "unable to initialize compactor ring lifecycler")
}

c.ring, err = ring.New(lifecyclerCfg.RingConfig, "compactor", ringKey, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer))
if err != nil {
return nil, errors.Wrap(err, "unable to initialize compactor ring")
}
// Instance the right strategy.
switch c.compactorCfg.ShardingStrategy {
case util.ShardingStrategyDefault:
c.shardingStrategy = NewDefaultShardingStrategy(c.ring, c.ringLifecycler.Addr, logger, c.allowedTenants)
case util.ShardingStrategyShuffle:
c.shardingStrategy = NewShuffleShardingStrategy(c.ring, lifecyclerCfg.ID, lifecyclerCfg.Addr, limits, logger, c.allowedTenants, c.compactorCfg.ShardingRing.ZoneStableShuffleSharding)
default:
return nil, errInvalidShardingStrategy
}
} else {
c.shardingStrategy = storegateway.NewNoShardingStrategy(logger, c.allowedTenants)
}

c.Service = services.NewBasicService(c.starting, c.running, c.stopping)

Expand Down Expand Up @@ -502,6 +536,14 @@ func (c *Compactor) starting(ctx context.Context) error {
// Wrap the bucket client to write block deletion marks in the global location too.
c.bucketClient = bucketindex.BucketWithGlobalMarkers(c.bucketClient)

// Wrap the bucket client with caching layer if caching bucket is enabled.
if c.compactorCfg.CachingBucketEnabled {
c.bucketClient, err = cortex_tsdb.CreateCachingBucket(c.storageCfg.BucketStore.ChunksCache, c.storageCfg.BucketStore.MetadataCache, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer))
if err != nil {
return errors.Wrapf(err, "create caching bucket")
}
}

// Create the users scanner.
c.usersScanner = cortex_tsdb.NewUsersScanner(c.bucketClient, c.ownUserForCleanUp, c.parentLogger)

Expand All @@ -516,17 +558,6 @@ func (c *Compactor) starting(ctx context.Context) error {

// Initialize the compactors ring if sharding is enabled.
if c.compactorCfg.ShardingEnabled {
lifecyclerCfg := c.compactorCfg.ShardingRing.ToLifecyclerConfig()
c.ringLifecycler, err = ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", ringKey, true, false, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer))
if err != nil {
return errors.Wrap(err, "unable to initialize compactor ring lifecycler")
}

c.ring, err = ring.New(lifecyclerCfg.RingConfig, "compactor", ringKey, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer))
if err != nil {
return errors.Wrap(err, "unable to initialize compactor ring")
}

c.ringSubservices, err = services.NewManager(c.ringLifecycler, c.ring)
if err == nil {
c.ringSubservicesWatcher = services.NewFailureWatcher()
Expand Down Expand Up @@ -570,7 +601,6 @@ func (c *Compactor) starting(ctx context.Context) error {
}
}
}

// Ensure an initial cleanup occurred before starting the compactor.
if err := services.StartAndAwaitRunning(ctx, c.blocksCleaner); err != nil {
c.ringSubservices.StopAsync()
Expand Down Expand Up @@ -789,28 +819,41 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
// Filters out blocks with no compaction maker; blocks can be marked as no compaction for reasons like
// out of order chunks or index file too big.
noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(ulogger, bucket, c.compactorCfg.MetaSyncConcurrency)

fetcher, err := block.NewMetaFetcher(
ulogger,
c.compactorCfg.MetaSyncConcurrency,
bucket,
c.metaSyncDirForUser(userID),
reg,
// List of filters to apply (order matters).
[]block.MetadataFilter{
// Remove the ingester ID because we don't shard blocks anymore, while still
// honoring the shard ID if sharding was done in the past.
NewLabelRemoverFilter([]string{cortex_tsdb.IngesterIDExternalLabel}),
block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg),
ignoreDeletionMarkFilter,
deduplicateBlocksFilter,
noCompactMarkerFilter,
},
)
if err != nil {
return err
var fetcher block.MetadataFetcher
var err error
filters := []block.MetadataFilter{
// Remove the ingester ID because we don't shard blocks anymore, while still
// honoring the shard ID if sharding was done in the past.
NewLabelRemoverFilter([]string{cortex_tsdb.IngesterIDExternalLabel}),
block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg),
ignoreDeletionMarkFilter,
deduplicateBlocksFilter,
noCompactMarkerFilter,
}
if c.storageCfg.BucketStore.BucketIndex.Enabled && c.compactorCfg.BucketIndexMetadataFetcherEnabled {
fetcher = storegateway.NewBucketIndexMetadataFetcher(
userID,
c.bucketClient,
c.shardingStrategy,
c.limits,
ulogger,
reg,
filters,
)
} else {
fetcher, err = block.NewMetaFetcher(
ulogger,
c.compactorCfg.MetaSyncConcurrency,
bucket,
c.metaSyncDirForUser(userID),
reg,
// List of filters to apply (order matters).
filters,
)
if err != nil {
return err
}
}

syncer, err := compact.NewMetaSyncer(
ulogger,
reg,
Expand Down
4 changes: 3 additions & 1 deletion pkg/compactor/compactor_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ type RingConfig struct {

WaitActiveInstanceTimeout time.Duration `yaml:"wait_active_instance_timeout"`

ObservePeriod time.Duration `yaml:"-"`
ObservePeriod time.Duration `yaml:"-"`
ZoneStableShuffleSharding bool `yaml:"zone_stable_shuffle_sharding" doc:"hidden"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand Down Expand Up @@ -70,6 +71,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {

// Timeout durations
f.DurationVar(&cfg.WaitActiveInstanceTimeout, "compactor.ring.wait-active-instance-timeout", 10*time.Minute, "Timeout for waiting on compactor to become ACTIVE in the ring.")
f.BoolVar(&cfg.ZoneStableShuffleSharding, "compactor.ring.zone-stable-shuffle-sharding", false, "If true, use zone stable shuffle sharding algorithm. Otherwise, use the default shuffle sharding algorithm.")
}

// ToLifecyclerConfig returns a LifecyclerConfig based on the compactor
Expand Down
169 changes: 169 additions & 0 deletions pkg/compactor/sharding_strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package compactor

import (
"context"
"github.com/cortexproject/cortex/pkg/util/validation"
"hash/fnv"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
)

const (
shardExcludedMeta = "shard-excluded"
)

func filterDisallowedTenants(userIDs []string, logger log.Logger, allowedTenants *util.AllowedTenants) []string {
filteredUserIDs := []string{}
for _, userID := range userIDs {
if !allowedTenants.IsAllowed(userID) {
level.Debug(logger).Log("msg", "ignoring storage gateway for user, not allowed", "user", userID)
continue
}

filteredUserIDs = append(filteredUserIDs, userID)
}

return filteredUserIDs
}

// NoShardingStrategy is a no-op strategy. When this strategy is used, no tenant/block is filtered out.
type NoShardingStrategy struct {
logger log.Logger
allowedTenants *util.AllowedTenants
}

func NewNoShardingStrategy(logger log.Logger, allowedTenants *util.AllowedTenants) *NoShardingStrategy {
return &NoShardingStrategy{
logger: logger,
allowedTenants: allowedTenants,
}
}

func (s *NoShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string {
return filterDisallowedTenants(userIDs, s.logger, s.allowedTenants)
}

func (s *NoShardingStrategy) FilterBlocks(_ context.Context, _ string, _ map[ulid.ULID]*metadata.Meta, _ map[ulid.ULID]struct{}, _ block.GaugeVec) error {
return nil
}

// DefaultShardingStrategy is a sharding strategy based on the hash ring formed by store-gateways.
// Not go-routine safe.
type DefaultShardingStrategy struct {
r *ring.Ring
instanceAddr string
logger log.Logger
allowedTenants *util.AllowedTenants
}

// NewDefaultShardingStrategy creates DefaultShardingStrategy.
func NewDefaultShardingStrategy(r *ring.Ring, instanceAddr string, logger log.Logger, allowedTenants *util.AllowedTenants) *DefaultShardingStrategy {
return &DefaultShardingStrategy{
r: r,
instanceAddr: instanceAddr,
logger: logger,

allowedTenants: allowedTenants,
}
}

// FilterUsers implements ShardingStrategy.
func (s *DefaultShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string {
var filteredIDs []string
for _, userID := range filterDisallowedTenants(userIDs, s.logger, s.allowedTenants) {
// Hash the user ID.
hasher := fnv.New32a()
_, _ = hasher.Write([]byte(userID))
userHash := hasher.Sum32()
// Check whether this compactor instance owns the user.
rs, err := s.r.Get(userHash, RingOp, nil, nil, nil)
if err != nil {
continue
}
if len(rs.Instances) != 1 {
continue
}
if rs.Instances[0].Addr == s.instanceAddr {
filteredIDs = append(filteredIDs, userID)
}
}
return filteredIDs
}

// FilterBlocks implements ShardingStrategy.
func (s *DefaultShardingStrategy) FilterBlocks(_ context.Context, _ string, metas map[ulid.ULID]*metadata.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec) error {
return nil
}

// ShuffleShardingStrategy is a shuffle sharding strategy, based on the hash ring formed by store-gateways,
// where each tenant blocks are sharded across a subset of store-gateway instances.
type ShuffleShardingStrategy struct {
r *ring.Ring
instanceID string
instanceAddr string
limits *validation.Overrides
logger log.Logger

zoneStableShuffleSharding bool
allowedTenants *util.AllowedTenants
}

// NewShuffleShardingStrategy makes a new ShuffleShardingStrategy.
func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits *validation.Overrides, logger log.Logger, allowedTenants *util.AllowedTenants, zoneStableShuffleSharding bool) *ShuffleShardingStrategy {
return &ShuffleShardingStrategy{
r: r,
instanceID: instanceID,
instanceAddr: instanceAddr,
limits: limits,
logger: logger,

zoneStableShuffleSharding: zoneStableShuffleSharding,
allowedTenants: allowedTenants,
}
}

// FilterUsers implements ShardingStrategy.
func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string {
var filteredIDs []string
for _, userID := range filterDisallowedTenants(userIDs, s.logger, s.allowedTenants) {
subRing := GetShuffleShardingSubring(s.r, userID, s.limits, s.zoneStableShuffleSharding)

// Include the user only if it belongs to this store-gateway shard.
if subRing.HasInstance(s.instanceID) {
filteredIDs = append(filteredIDs, userID)
}
}

return filteredIDs
}

// FilterBlocks implements ShardingStrategy.
func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string, metas map[ulid.ULID]*metadata.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec) error {
return nil
}

// GetShuffleShardingSubring returns the subring to be used for a given user. This function
// should be used both by store-gateway and querier in order to guarantee the same logic is used.
func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits *validation.Overrides, zoneStableShuffleSharding bool) ring.ReadRing {
shardSize := limits.CompactorTenantShardSize(userID)

// A shard size of 0 means shuffle sharding is disabled for this specific user,
// so we just return the full ring so that blocks will be sharded across all store-gateways.
if shardSize <= 0 {
return ring
}

if zoneStableShuffleSharding {
// Zone stability is required for store gateway when shuffle shard, see
// https://github.com/cortexproject/cortex/issues/5467 for more details.
return ring.ShuffleShardWithZoneStability(userID, shardSize)
}
return ring.ShuffleShard(userID, shardSize)
}
Loading