diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index 374c192bd7..16810ebea0 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -298,4 +298,13 @@ compactor: # When enabled, index verification will ignore out of order label names. # CLI flag: -compactor.accept-malformed-index [accept_malformed_index: | default = false] + + # When enabled and bucket index is also enabled in bucket store, bucket index + # metadata fetcher will be used in syncer + # CLI flag: -compactor.bucket-index-metadata-fetcher-enabled + [bucket_index_metadata_fetcher_enabled: | default = false] + + # When enabled, caching bucket will be used + # CLI flag: -compactor.caching-bucket-enabled + [caching_bucket_enabled: | default = false] ``` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index fbcc159477..fdd5c64345 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1990,6 +1990,15 @@ sharding_ring: # When enabled, index verification will ignore out of order label names. # CLI flag: -compactor.accept-malformed-index [accept_malformed_index: | default = false] + +# When enabled and bucket index is also enabled in bucket store, bucket index +# metadata fetcher will be used in syncer +# CLI flag: -compactor.bucket-index-metadata-fetcher-enabled +[bucket_index_metadata_fetcher_enabled: | default = false] + +# When enabled, caching bucket will be used +# CLI flag: -compactor.caching-bucket-enabled +[caching_bucket_enabled: | default = false] ``` ### `configs_config` diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 3f6757fbbc..d32baf2bac 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -4,6 +4,8 @@ import ( "context" "flag" "fmt" + "github.com/cortexproject/cortex/pkg/storegateway" + "github.com/thanos-io/thanos/pkg/extprom" "hash/fnv" "math/rand" "os" @@ -209,6 +211,9 @@ type Config struct { BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"` AcceptMalformedIndex bool `yaml:"accept_malformed_index"` + + BucketIndexMetadataFetcherEnabled bool `yaml:"bucket_index_metadata_fetcher_enabled"` + CachingBucketEnabled bool `yaml:"caching_bucket_enabled"` } // RegisterFlags registers the Compactor flags. @@ -247,6 +252,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.BlockVisitMarkerFileUpdateInterval, "compactor.block-visit-marker-file-update-interval", 1*time.Minute, "How frequently block visit marker file should be updated duration compaction.") f.BoolVar(&cfg.AcceptMalformedIndex, "compactor.accept-malformed-index", false, "When enabled, index verification will ignore out of order label names.") + f.BoolVar(&cfg.BucketIndexMetadataFetcherEnabled, "compactor.bucket-index-metadata-fetcher-enabled", false, "When enabled and bucket index is also enabled in bucket store, bucket index metadata fetcher will be used in syncer") + f.BoolVar(&cfg.CachingBucketEnabled, "compactor.caching-bucket-enabled", false, "When enabled, caching bucket will be used") } func (cfg *Config) Validate(limits validation.Limits) error { @@ -320,6 +327,9 @@ type Compactor struct { ringSubservices *services.Manager ringSubservicesWatcher *services.FailureWatcher + //sharding strategy + shardingStrategy storegateway.ShardingStrategy + // Metrics. compactionRunsStarted prometheus.Counter compactionRunsInterrupted prometheus.Counter @@ -474,6 +484,30 @@ func newCompactor( if len(compactorCfg.DisabledTenants) > 0 { level.Info(c.logger).Log("msg", "compactor using disabled users", "disabled", strings.Join(compactorCfg.DisabledTenants, ", ")) } + var err error + if c.compactorCfg.ShardingEnabled { + lifecyclerCfg := c.compactorCfg.ShardingRing.ToLifecyclerConfig() + c.ringLifecycler, err = ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", ringKey, true, false, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer)) + if err != nil { + return nil, errors.Wrap(err, "unable to initialize compactor ring lifecycler") + } + + c.ring, err = ring.New(lifecyclerCfg.RingConfig, "compactor", ringKey, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer)) + if err != nil { + return nil, errors.Wrap(err, "unable to initialize compactor ring") + } + // Instance the right strategy. + switch c.compactorCfg.ShardingStrategy { + case util.ShardingStrategyDefault: + c.shardingStrategy = NewDefaultShardingStrategy(c.ring, c.ringLifecycler.Addr, logger, c.allowedTenants) + case util.ShardingStrategyShuffle: + c.shardingStrategy = NewShuffleShardingStrategy(c.ring, lifecyclerCfg.ID, lifecyclerCfg.Addr, limits, logger, c.allowedTenants, c.compactorCfg.ShardingRing.ZoneStableShuffleSharding) + default: + return nil, errInvalidShardingStrategy + } + } else { + c.shardingStrategy = storegateway.NewNoShardingStrategy(logger, c.allowedTenants) + } c.Service = services.NewBasicService(c.starting, c.running, c.stopping) @@ -502,6 +536,14 @@ func (c *Compactor) starting(ctx context.Context) error { // Wrap the bucket client to write block deletion marks in the global location too. c.bucketClient = bucketindex.BucketWithGlobalMarkers(c.bucketClient) + // Wrap the bucket client with caching layer if caching bucket is enabled. + if c.compactorCfg.CachingBucketEnabled { + c.bucketClient, err = cortex_tsdb.CreateCachingBucket(c.storageCfg.BucketStore.ChunksCache, c.storageCfg.BucketStore.MetadataCache, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer)) + if err != nil { + return errors.Wrapf(err, "create caching bucket") + } + } + // Create the users scanner. c.usersScanner = cortex_tsdb.NewUsersScanner(c.bucketClient, c.ownUserForCleanUp, c.parentLogger) @@ -516,17 +558,6 @@ func (c *Compactor) starting(ctx context.Context) error { // Initialize the compactors ring if sharding is enabled. if c.compactorCfg.ShardingEnabled { - lifecyclerCfg := c.compactorCfg.ShardingRing.ToLifecyclerConfig() - c.ringLifecycler, err = ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", ringKey, true, false, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer)) - if err != nil { - return errors.Wrap(err, "unable to initialize compactor ring lifecycler") - } - - c.ring, err = ring.New(lifecyclerCfg.RingConfig, "compactor", ringKey, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer)) - if err != nil { - return errors.Wrap(err, "unable to initialize compactor ring") - } - c.ringSubservices, err = services.NewManager(c.ringLifecycler, c.ring) if err == nil { c.ringSubservicesWatcher = services.NewFailureWatcher() @@ -570,7 +601,6 @@ func (c *Compactor) starting(ctx context.Context) error { } } } - // Ensure an initial cleanup occurred before starting the compactor. if err := services.StartAndAwaitRunning(ctx, c.blocksCleaner); err != nil { c.ringSubservices.StopAsync() @@ -789,28 +819,41 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { // Filters out blocks with no compaction maker; blocks can be marked as no compaction for reasons like // out of order chunks or index file too big. noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(ulogger, bucket, c.compactorCfg.MetaSyncConcurrency) - - fetcher, err := block.NewMetaFetcher( - ulogger, - c.compactorCfg.MetaSyncConcurrency, - bucket, - c.metaSyncDirForUser(userID), - reg, - // List of filters to apply (order matters). - []block.MetadataFilter{ - // Remove the ingester ID because we don't shard blocks anymore, while still - // honoring the shard ID if sharding was done in the past. - NewLabelRemoverFilter([]string{cortex_tsdb.IngesterIDExternalLabel}), - block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg), - ignoreDeletionMarkFilter, - deduplicateBlocksFilter, - noCompactMarkerFilter, - }, - ) - if err != nil { - return err + var fetcher block.MetadataFetcher + var err error + filters := []block.MetadataFilter{ + // Remove the ingester ID because we don't shard blocks anymore, while still + // honoring the shard ID if sharding was done in the past. + NewLabelRemoverFilter([]string{cortex_tsdb.IngesterIDExternalLabel}), + block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg), + ignoreDeletionMarkFilter, + deduplicateBlocksFilter, + noCompactMarkerFilter, + } + if c.storageCfg.BucketStore.BucketIndex.Enabled && c.compactorCfg.BucketIndexMetadataFetcherEnabled { + fetcher = storegateway.NewBucketIndexMetadataFetcher( + userID, + c.bucketClient, + c.shardingStrategy, + c.limits, + ulogger, + reg, + filters, + ) + } else { + fetcher, err = block.NewMetaFetcher( + ulogger, + c.compactorCfg.MetaSyncConcurrency, + bucket, + c.metaSyncDirForUser(userID), + reg, + // List of filters to apply (order matters). + filters, + ) + if err != nil { + return err + } } - syncer, err := compact.NewMetaSyncer( ulogger, reg, diff --git a/pkg/compactor/compactor_ring.go b/pkg/compactor/compactor_ring.go index 2e236cefbe..5a6bf764f4 100644 --- a/pkg/compactor/compactor_ring.go +++ b/pkg/compactor/compactor_ring.go @@ -39,7 +39,8 @@ type RingConfig struct { WaitActiveInstanceTimeout time.Duration `yaml:"wait_active_instance_timeout"` - ObservePeriod time.Duration `yaml:"-"` + ObservePeriod time.Duration `yaml:"-"` + ZoneStableShuffleSharding bool `yaml:"zone_stable_shuffle_sharding" doc:"hidden"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -70,6 +71,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { // Timeout durations f.DurationVar(&cfg.WaitActiveInstanceTimeout, "compactor.ring.wait-active-instance-timeout", 10*time.Minute, "Timeout for waiting on compactor to become ACTIVE in the ring.") + f.BoolVar(&cfg.ZoneStableShuffleSharding, "compactor.ring.zone-stable-shuffle-sharding", false, "If true, use zone stable shuffle sharding algorithm. Otherwise, use the default shuffle sharding algorithm.") } // ToLifecyclerConfig returns a LifecyclerConfig based on the compactor diff --git a/pkg/compactor/sharding_strategy.go b/pkg/compactor/sharding_strategy.go new file mode 100644 index 0000000000..da79106060 --- /dev/null +++ b/pkg/compactor/sharding_strategy.go @@ -0,0 +1,169 @@ +package compactor + +import ( + "context" + "github.com/cortexproject/cortex/pkg/util/validation" + "hash/fnv" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/oklog/ulid" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/util" +) + +const ( + shardExcludedMeta = "shard-excluded" +) + +func filterDisallowedTenants(userIDs []string, logger log.Logger, allowedTenants *util.AllowedTenants) []string { + filteredUserIDs := []string{} + for _, userID := range userIDs { + if !allowedTenants.IsAllowed(userID) { + level.Debug(logger).Log("msg", "ignoring storage gateway for user, not allowed", "user", userID) + continue + } + + filteredUserIDs = append(filteredUserIDs, userID) + } + + return filteredUserIDs +} + +// NoShardingStrategy is a no-op strategy. When this strategy is used, no tenant/block is filtered out. +type NoShardingStrategy struct { + logger log.Logger + allowedTenants *util.AllowedTenants +} + +func NewNoShardingStrategy(logger log.Logger, allowedTenants *util.AllowedTenants) *NoShardingStrategy { + return &NoShardingStrategy{ + logger: logger, + allowedTenants: allowedTenants, + } +} + +func (s *NoShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string { + return filterDisallowedTenants(userIDs, s.logger, s.allowedTenants) +} + +func (s *NoShardingStrategy) FilterBlocks(_ context.Context, _ string, _ map[ulid.ULID]*metadata.Meta, _ map[ulid.ULID]struct{}, _ block.GaugeVec) error { + return nil +} + +// DefaultShardingStrategy is a sharding strategy based on the hash ring formed by store-gateways. +// Not go-routine safe. +type DefaultShardingStrategy struct { + r *ring.Ring + instanceAddr string + logger log.Logger + allowedTenants *util.AllowedTenants +} + +// NewDefaultShardingStrategy creates DefaultShardingStrategy. +func NewDefaultShardingStrategy(r *ring.Ring, instanceAddr string, logger log.Logger, allowedTenants *util.AllowedTenants) *DefaultShardingStrategy { + return &DefaultShardingStrategy{ + r: r, + instanceAddr: instanceAddr, + logger: logger, + + allowedTenants: allowedTenants, + } +} + +// FilterUsers implements ShardingStrategy. +func (s *DefaultShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string { + var filteredIDs []string + for _, userID := range filterDisallowedTenants(userIDs, s.logger, s.allowedTenants) { + // Hash the user ID. + hasher := fnv.New32a() + _, _ = hasher.Write([]byte(userID)) + userHash := hasher.Sum32() + // Check whether this compactor instance owns the user. + rs, err := s.r.Get(userHash, RingOp, nil, nil, nil) + if err != nil { + continue + } + if len(rs.Instances) != 1 { + continue + } + if rs.Instances[0].Addr == s.instanceAddr { + filteredIDs = append(filteredIDs, userID) + } + } + return filteredIDs +} + +// FilterBlocks implements ShardingStrategy. +func (s *DefaultShardingStrategy) FilterBlocks(_ context.Context, _ string, metas map[ulid.ULID]*metadata.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec) error { + return nil +} + +// ShuffleShardingStrategy is a shuffle sharding strategy, based on the hash ring formed by store-gateways, +// where each tenant blocks are sharded across a subset of store-gateway instances. +type ShuffleShardingStrategy struct { + r *ring.Ring + instanceID string + instanceAddr string + limits *validation.Overrides + logger log.Logger + + zoneStableShuffleSharding bool + allowedTenants *util.AllowedTenants +} + +// NewShuffleShardingStrategy makes a new ShuffleShardingStrategy. +func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits *validation.Overrides, logger log.Logger, allowedTenants *util.AllowedTenants, zoneStableShuffleSharding bool) *ShuffleShardingStrategy { + return &ShuffleShardingStrategy{ + r: r, + instanceID: instanceID, + instanceAddr: instanceAddr, + limits: limits, + logger: logger, + + zoneStableShuffleSharding: zoneStableShuffleSharding, + allowedTenants: allowedTenants, + } +} + +// FilterUsers implements ShardingStrategy. +func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string { + var filteredIDs []string + for _, userID := range filterDisallowedTenants(userIDs, s.logger, s.allowedTenants) { + subRing := GetShuffleShardingSubring(s.r, userID, s.limits, s.zoneStableShuffleSharding) + + // Include the user only if it belongs to this store-gateway shard. + if subRing.HasInstance(s.instanceID) { + filteredIDs = append(filteredIDs, userID) + } + } + + return filteredIDs +} + +// FilterBlocks implements ShardingStrategy. +func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string, metas map[ulid.ULID]*metadata.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec) error { + return nil +} + +// GetShuffleShardingSubring returns the subring to be used for a given user. This function +// should be used both by store-gateway and querier in order to guarantee the same logic is used. +func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits *validation.Overrides, zoneStableShuffleSharding bool) ring.ReadRing { + shardSize := limits.CompactorTenantShardSize(userID) + + // A shard size of 0 means shuffle sharding is disabled for this specific user, + // so we just return the full ring so that blocks will be sharded across all store-gateways. + if shardSize <= 0 { + return ring + } + + if zoneStableShuffleSharding { + // Zone stability is required for store gateway when shuffle shard, see + // https://github.com/cortexproject/cortex/issues/5467 for more details. + return ring.ShuffleShardWithZoneStability(userID, shardSize) + } + return ring.ShuffleShard(userID, shardSize) +}