Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLAT-104290] auto detect/remove overlapping blocks to be compacted to avoid compactor panics #23

Merged
merged 4 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,12 +369,14 @@ func runCompact(
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason),
)
blocksCleaner := compact.NewBlocksCleaner(logger, insBkt, ignoreDeletionMarkFilter, deleteDelay, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures)
compactor, err := compact.NewBucketCompactor(
compactor, err := compact.NewBucketCompactorWithCheckerAndCallback(
logger,
sy,
grouper,
planner,
comp,
compact.DefaultBlockDeletableChecker{},
compact.NewOverlappingCompactionLifecycleCallback(reg, conf.enableOverlappingRemoval),
compactDir,
insBkt,
conf.compactionConcurrency,
Expand Down Expand Up @@ -710,6 +712,7 @@ type compactConfig struct {
maxBlockIndexSize units.Base2Bytes
hashFunc string
enableVerticalCompaction bool
enableOverlappingRemoval bool
dedupFunc string
skipBlockWithOutOfOrderChunks bool
progressCalculateInterval time.Duration
Expand Down Expand Up @@ -786,6 +789,9 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
"NOTE: This flag is ignored and (enabled) when --deduplication.replica-label flag is set.").
Hidden().Default("false").BoolVar(&cc.enableVerticalCompaction)

cmd.Flag("compact.enable-overlapping-removal", "In house flag to remove overlapping blocks. Turn this on to fix https://github.com/thanos-io/thanos/issues/6775.").
Default("false").BoolVar(&cc.enableOverlappingRemoval)

cmd.Flag("deduplication.func", "Experimental. Deduplication algorithm for merging overlapping blocks. "+
"Possible values are: \"\", \"penalty\". If no value is specified, the default compact deduplication merger is used, which performs 1:1 deduplication for samples. "+
"When set to penalty, penalty based deduplication algorithm will be used. At least one replica label has to be set via --deduplication.replica-label flag.").
Expand Down
18 changes: 2 additions & 16 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,13 +588,7 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filter
metas := make(map[ulid.ULID]*metadata.Meta, len(resp.metas))
for id, m := range resp.metas {
metas[id] = m
if tenant, ok := m.Thanos.Labels[metadata.TenantLabel]; ok {
numBlocksByTenant[tenant]++
} else {
level.Error(f.logger).Log("msg", "found blocks without label "+metadata.TenantLabel,
Copy link
Collaborator Author

@jnyi jnyi Mar 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is verbose and not useful in testing

"block", m.String(), "level", m.Compaction.Level, "labels", m.Thanos.GetLabels())
numBlocksByTenant[metadata.DefaultTenant]++
}
numBlocksByTenant[m.Thanos.GetTenant()]++
}

for tenant, numBlocks := range numBlocksByTenant {
Expand All @@ -616,15 +610,7 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filter
// Therefore, it's skipped to update the gauge.
if len(filters) > 0 {
for _, m := range metas {
var tenant string
var ok bool
// tenant and replica will have the zero value ("") if the key is not in the map.
if tenant, ok = m.Thanos.Labels[metadata.TenantLabel]; !ok {
level.Error(f.logger).Log("msg", "found blocks without label "+metadata.TenantLabel,
"block", m.String(), "level", m.Compaction.Level, "labels", m.Thanos.GetLabels())
tenant = metadata.DefaultTenant
}
metrics.Assigned.WithLabelValues(tenant, strconv.Itoa(m.BlockMeta.Compaction.Level)).Inc()
metrics.Assigned.WithLabelValues(m.Thanos.GetTenant(), strconv.Itoa(m.BlockMeta.Compaction.Level)).Inc()
}
}

Expand Down
18 changes: 14 additions & 4 deletions pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ package metadata
// this package.

import (
"bytes"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"strings"

"github.com/go-kit/log"
"github.com/oklog/ulid"
Expand Down Expand Up @@ -115,12 +116,21 @@ func (m *Thanos) ParseExtensions(v any) (any, error) {
return ConvertExtensions(m.Extensions, v)
}

func (m *Thanos) GetTenant() string {
if tenant, ok := m.Labels[TenantLabel]; ok {
return tenant
} else {
return DefaultTenant
}
}

func (m *Thanos) GetLabels() string {
b := new(bytes.Buffer)
res := make([]string, 0, len(m.Labels))
for k, v := range m.Labels {
fmt.Fprintf(b, "%s=%s,", k, v)
res = append(res, fmt.Sprintf("%s=%s", k, v))
}
return b.String()
sort.Strings(res)
return strings.Join(res, ",")
}

// ConvertExtensions converts extensions with `any` type into specific type `v`
Expand Down
2 changes: 1 addition & 1 deletion pkg/block/metadata/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func TestMeta_GetLabels(t *testing.T) {
Labels: map[string]string{"a": "b", "c": "d"},
},
}
testutil.Equals(t, "a=b,c=d,", m.Thanos.GetLabels())
testutil.Equals(t, "a=b,c=d", m.Thanos.GetLabels())
}

type TestExtensions struct {
Expand Down
65 changes: 2 additions & 63 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ type DefaultGrouper struct {
compactionRunsCompleted *prometheus.CounterVec
compactionFailures *prometheus.CounterVec
verticalCompactions *prometheus.CounterVec
overlappingBlocks *prometheus.CounterVec
garbageCollectedBlocks prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompact prometheus.Counter
Expand Down Expand Up @@ -284,10 +283,6 @@ func NewDefaultGrouper(
Name: "thanos_compact_group_vertical_compactions_total",
Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.",
}, []string{"resolution"}),
overlappingBlocks: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this metric to its own source file, this will avoid conflicts when merge from upstream.

Name: "thanos_compact_group_overlapping_blocks_total",
Help: "Total number of blocks that are overlapping and to be deleted.",
}, []string{"resolution", "level"}),
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
garbageCollectedBlocks: garbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
Expand All @@ -308,7 +303,6 @@ func NewDefaultGrouperWithMetrics(
compactionRunsCompleted *prometheus.CounterVec,
compactionFailures *prometheus.CounterVec,
verticalCompactions *prometheus.CounterVec,
overrlappingBlocks *prometheus.CounterVec,
blocksMarkedForDeletion prometheus.Counter,
garbageCollectedBlocks prometheus.Counter,
blocksMarkedForNoCompact prometheus.Counter,
Expand All @@ -326,7 +320,6 @@ func NewDefaultGrouperWithMetrics(
compactionRunsCompleted: compactionRunsCompleted,
compactionFailures: compactionFailures,
verticalCompactions: verticalCompactions,
overlappingBlocks: overrlappingBlocks,
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
garbageCollectedBlocks: garbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
Expand Down Expand Up @@ -359,7 +352,6 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro
g.compactionRunsCompleted.WithLabelValues(resolutionLabel),
g.compactionFailures.WithLabelValues(resolutionLabel),
g.verticalCompactions.WithLabelValues(resolutionLabel),
g.overlappingBlocks.WithLabelValues(resolutionLabel, fmt.Sprintf("%d", m.Compaction.Level)),
g.garbageCollectedBlocks,
g.blocksMarkedForDeletion,
g.blocksMarkedForNoCompact,
Expand Down Expand Up @@ -400,7 +392,6 @@ type Group struct {
compactionRunsCompleted prometheus.Counter
compactionFailures prometheus.Counter
verticalCompactions prometheus.Counter
overlappingBlocks prometheus.Counter
groupGarbageCollectedBlocks prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompact prometheus.Counter
Expand All @@ -424,7 +415,6 @@ func NewGroup(
compactionRunsCompleted prometheus.Counter,
compactionFailures prometheus.Counter,
verticalCompactions prometheus.Counter,
overlappingBlocks prometheus.Counter,
groupGarbageCollectedBlocks prometheus.Counter,
blocksMarkedForDeletion prometheus.Counter,
blocksMarkedForNoCompact prometheus.Counter,
Expand Down Expand Up @@ -453,7 +443,6 @@ func NewGroup(
compactionRunsCompleted: compactionRunsCompleted,
compactionFailures: compactionFailures,
verticalCompactions: verticalCompactions,
overlappingBlocks: overlappingBlocks,
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
Expand Down Expand Up @@ -1031,53 +1020,6 @@ func (cg *Group) areBlocksOverlapping(include *metadata.Meta, exclude ...*metada
return nil
}

func (cg *Group) removeOverlappingBlocks(ctx context.Context, toCompact []*metadata.Meta, dir string) error {
if len(toCompact) == 0 {
return nil
}
kept := toCompact[0]
for _, m := range toCompact {
if m.MinTime < kept.MinTime && m.MaxTime > kept.MaxTime {
level.Warn(cg.logger).Log("msg", "found overlapping block in plan that are not the first",
"first", kept.String(), "block", m.String())
kept = m
} else if (m.MinTime < kept.MinTime && kept.MinTime < m.MaxTime) ||
(m.MinTime < kept.MaxTime && kept.MaxTime < m.MaxTime) {
err := errors.Errorf("found partially overlapping block: %s vs %s", m.String(), kept.String())
if cg.enableVerticalCompaction {
level.Error(cg.logger).Log("msg", "best effort to vertical compact", "err", err)
return nil
} else {
return halt(err)
}
}
}
for _, m := range toCompact {
if m.ULID.Compare(kept.ULID) == 0 || m.Thanos.Source == metadata.ReceiveSource {
level.Info(cg.logger).Log("msg", "keep this overlapping block", "block", m.String(),
"level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels())
continue
}
cg.overlappingBlocks.Inc()
if err := DeleteBlockNow(ctx, cg.logger, cg.bkt, m, dir); err != nil {
return retry(err)
}
}
return retry(errors.Errorf("found overlapping blocks in plan. Only kept %s", kept.String()))
}

func DeleteBlockNow(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string) error {
level.Warn(logger).Log("msg", "delete polluted block immediately", "block", m.String(),
"level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels())
if err := block.Delete(ctx, logger, bkt, m.ULID); err != nil {
return errors.Wrapf(err, "delete overlapping block %s", m.String())
}
if err := os.RemoveAll(filepath.Join(dir, m.ULID.String())); err != nil {
return errors.Wrapf(err, "remove old block dir %s", m.String())
}
return nil
}

// RepairIssue347 repairs the https://github.com/prometheus/tsdb/issues/347 issue when having issue347Error.
func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, blocksMarkedForDeletion prometheus.Counter, issue347Err error) error {
ie, ok := errors.Cause(issue347Err).(Issue347Error)
Expand Down Expand Up @@ -1171,12 +1113,9 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
begin := groupCompactionBegin

if err := compactionLifecycleCallback.PreCompactionCallback(ctx, cg.logger, cg, toCompact); err != nil {
level.Error(cg.logger).Log("msg", fmt.Sprintf("failed to run pre compaction callback for plan: %v", toCompact), "err", err)
// instead of halting, we attempt to remove overlapped blocks and only keep the longest one.
if newErr := cg.removeOverlappingBlocks(ctx, toCompact, dir); newErr != nil {
return false, ulid.ULID{}, newErr
}
return false, ulid.ULID{}, errors.Wrapf(err, "failed to run pre compaction callback for plan: %s", fmt.Sprintf("%v", toCompact))
}
toCompact = FilterRemovedBlocks(toCompact)
level.Info(cg.logger).Log("msg", "finished running pre compaction callback; downloading blocks", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds(), "plan", fmt.Sprintf("%v", toCompact))

begin = time.Now()
Expand Down
120 changes: 120 additions & 0 deletions pkg/compact/overlapping.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package compact

import (
"context"
"fmt"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
)

type OverlappingCompactionLifecycleCallback struct {
overlappingBlocks prometheus.Counter
}

func NewOverlappingCompactionLifecycleCallback(reg *prometheus.Registry, enabled bool) CompactionLifecycleCallback {
if enabled {
return OverlappingCompactionLifecycleCallback{
overlappingBlocks: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_group_overlapping_blocks_total",
Help: "Total number of blocks that are overlapping and to be deleted.",
}),
}
}
return DefaultCompactionLifecycleCallback{}
}

// PreCompactionCallback given the assumption that toCompact is sorted by MinTime in ascending order from Planner
// (not guaranteed on MaxTime order), we will detect overlapping blocks and delete them while retaining all others.
func (o OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx context.Context, logger log.Logger, cg *Group, toCompact []*metadata.Meta) error {
if len(toCompact) == 0 {
return nil
}
prev := 0
for curr, currB := range toCompact {
prevB := toCompact[prev]
if curr == 0 || currB.Thanos.Source == metadata.ReceiveSource || prevB.MaxTime <= currB.MinTime {
// no overlapping with previous blocks, skip it
prev = curr
continue
} else if currB.MinTime < prevB.MinTime {
// halt when the assumption is broken, the input toCompact isn't sorted by minTime, need manual investigation
return halt(errors.Errorf("later blocks has smaller minTime than previous block: %s -- %s", prevB.String(), currB.String()))
} else if prevB.MaxTime < currB.MaxTime && prevB.MinTime != currB.MinTime {
err := errors.Errorf("found partially overlapping block: %s -- %s", prevB.String(), currB.String())
if cg.enableVerticalCompaction {
level.Error(logger).Log("msg", "best effort to vertical compact", "err", err)
prev = curr
continue
} else {
return halt(err)
}
} else if prevB.MinTime == currB.MinTime && prevB.MaxTime == currB.MaxTime {
if prevB.Stats.NumSeries != currB.Stats.NumSeries || prevB.Stats.NumSamples != currB.Stats.NumSamples {
level.Warn(logger).Log("msg", "found same time range but different stats, keep both blocks",
"prev", prevB.String(), "prevSeries", prevB.Stats.NumSeries, "prevSamples", prevB.Stats.NumSamples,
"curr", currB.String(), "currSeries", currB.Stats.NumSeries, "currSamples", currB.Stats.NumSamples,
)
prev = curr
continue
}
}
// prev min <= curr min < prev max
toDelete := -1
if prevB.MaxTime >= currB.MaxTime {
toDelete = curr
level.Warn(logger).Log("msg", "found overlapping block in plan, keep previous block",
"toKeep", prevB.String(), "toDelete", currB.String())
} else if prevB.MaxTime < currB.MaxTime {
toDelete = prev
prev = curr
level.Warn(logger).Log("msg", "found overlapping block in plan, keep current block",
"toKeep", currB.String(), "toDelete", prevB.String())
}
o.overlappingBlocks.Inc()
if err := DeleteBlockNow(ctx, logger, cg.bkt, toCompact[toDelete]); err != nil {
return retry(err)
}
toCompact[toDelete] = nil
}
return nil
}

func (o OverlappingCompactionLifecycleCallback) PostCompactionCallback(_ context.Context, _ log.Logger, _ *Group, _ ulid.ULID) error {
return nil
}

func (o OverlappingCompactionLifecycleCallback) GetBlockPopulator(_ context.Context, _ log.Logger, _ *Group) (tsdb.BlockPopulator, error) {
return tsdb.DefaultBlockPopulator{}, nil
}

func FilterRemovedBlocks(blocks []*metadata.Meta) (res []*metadata.Meta) {
for _, b := range blocks {
if b != nil {
res = append(res, b)
}
}
return res
}

func DeleteBlockNow(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta) error {
level.Warn(logger).Log("msg", "delete polluted block immediately", "block", m.String(),
"level", m.Compaction.Level, "parents", fmt.Sprintf("%v", m.Compaction.Parents),
"resolution", m.Thanos.Downsample.Resolution, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels(),
"series", m.Stats.NumSeries, "samples", m.Stats.NumSamples, "chunks", m.Stats.NumChunks)
if err := block.Delete(ctx, logger, bkt, m.ULID); err != nil {
return errors.Wrapf(err, "delete overlapping block %s", m.String())
}
return nil
}
Loading
Loading