Skip to content

Commit

Permalink
Merge pull request #23 from jnyi/PLAT-104290
Browse files Browse the repository at this point in the history
[PLAT-104290] auto detect/remove overlapping blocks to be compacted to avoid compactor panics
  • Loading branch information
jnyi authored Mar 22, 2024
2 parents 4ed8a45 + cfc344f commit 3e50e25
Show file tree
Hide file tree
Showing 7 changed files with 338 additions and 85 deletions.
8 changes: 7 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,12 +369,14 @@ func runCompact(
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason),
)
blocksCleaner := compact.NewBlocksCleaner(logger, insBkt, ignoreDeletionMarkFilter, deleteDelay, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures)
compactor, err := compact.NewBucketCompactor(
compactor, err := compact.NewBucketCompactorWithCheckerAndCallback(
logger,
sy,
grouper,
planner,
comp,
compact.DefaultBlockDeletableChecker{},
compact.NewOverlappingCompactionLifecycleCallback(reg, conf.enableOverlappingRemoval),
compactDir,
insBkt,
conf.compactionConcurrency,
Expand Down Expand Up @@ -710,6 +712,7 @@ type compactConfig struct {
maxBlockIndexSize units.Base2Bytes
hashFunc string
enableVerticalCompaction bool
enableOverlappingRemoval bool
dedupFunc string
skipBlockWithOutOfOrderChunks bool
progressCalculateInterval time.Duration
Expand Down Expand Up @@ -786,6 +789,9 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
"NOTE: This flag is ignored and (enabled) when --deduplication.replica-label flag is set.").
Hidden().Default("false").BoolVar(&cc.enableVerticalCompaction)

cmd.Flag("compact.enable-overlapping-removal", "In house flag to remove overlapping blocks. Turn this on to fix https://github.com/thanos-io/thanos/issues/6775.").
Default("false").BoolVar(&cc.enableOverlappingRemoval)

cmd.Flag("deduplication.func", "Experimental. Deduplication algorithm for merging overlapping blocks. "+
"Possible values are: \"\", \"penalty\". If no value is specified, the default compact deduplication merger is used, which performs 1:1 deduplication for samples. "+
"When set to penalty, penalty based deduplication algorithm will be used. At least one replica label has to be set via --deduplication.replica-label flag.").
Expand Down
18 changes: 2 additions & 16 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,13 +588,7 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filter
metas := make(map[ulid.ULID]*metadata.Meta, len(resp.metas))
for id, m := range resp.metas {
metas[id] = m
if tenant, ok := m.Thanos.Labels[metadata.TenantLabel]; ok {
numBlocksByTenant[tenant]++
} else {
level.Error(f.logger).Log("msg", "found blocks without label "+metadata.TenantLabel,
"block", m.String(), "level", m.Compaction.Level, "labels", m.Thanos.GetLabels())
numBlocksByTenant[metadata.DefaultTenant]++
}
numBlocksByTenant[m.Thanos.GetTenant()]++
}

for tenant, numBlocks := range numBlocksByTenant {
Expand All @@ -616,15 +610,7 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filter
// Therefore, it's skipped to update the gauge.
if len(filters) > 0 {
for _, m := range metas {
var tenant string
var ok bool
// tenant and replica will have the zero value ("") if the key is not in the map.
if tenant, ok = m.Thanos.Labels[metadata.TenantLabel]; !ok {
level.Error(f.logger).Log("msg", "found blocks without label "+metadata.TenantLabel,
"block", m.String(), "level", m.Compaction.Level, "labels", m.Thanos.GetLabels())
tenant = metadata.DefaultTenant
}
metrics.Assigned.WithLabelValues(tenant, strconv.Itoa(m.BlockMeta.Compaction.Level)).Inc()
metrics.Assigned.WithLabelValues(m.Thanos.GetTenant(), strconv.Itoa(m.BlockMeta.Compaction.Level)).Inc()
}
}

Expand Down
18 changes: 14 additions & 4 deletions pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ package metadata
// this package.

import (
"bytes"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"strings"

"github.com/go-kit/log"
"github.com/oklog/ulid"
Expand Down Expand Up @@ -115,12 +116,21 @@ func (m *Thanos) ParseExtensions(v any) (any, error) {
return ConvertExtensions(m.Extensions, v)
}

func (m *Thanos) GetTenant() string {
if tenant, ok := m.Labels[TenantLabel]; ok {
return tenant
} else {
return DefaultTenant
}
}

func (m *Thanos) GetLabels() string {
b := new(bytes.Buffer)
res := make([]string, 0, len(m.Labels))
for k, v := range m.Labels {
fmt.Fprintf(b, "%s=%s,", k, v)
res = append(res, fmt.Sprintf("%s=%s", k, v))
}
return b.String()
sort.Strings(res)
return strings.Join(res, ",")
}

// ConvertExtensions converts extensions with `any` type into specific type `v`
Expand Down
2 changes: 1 addition & 1 deletion pkg/block/metadata/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func TestMeta_GetLabels(t *testing.T) {
Labels: map[string]string{"a": "b", "c": "d"},
},
}
testutil.Equals(t, "a=b,c=d,", m.Thanos.GetLabels())
testutil.Equals(t, "a=b,c=d", m.Thanos.GetLabels())
}

type TestExtensions struct {
Expand Down
65 changes: 2 additions & 63 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ type DefaultGrouper struct {
compactionRunsCompleted *prometheus.CounterVec
compactionFailures *prometheus.CounterVec
verticalCompactions *prometheus.CounterVec
overlappingBlocks *prometheus.CounterVec
garbageCollectedBlocks prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompact prometheus.Counter
Expand Down Expand Up @@ -284,10 +283,6 @@ func NewDefaultGrouper(
Name: "thanos_compact_group_vertical_compactions_total",
Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.",
}, []string{"resolution"}),
overlappingBlocks: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_compact_group_overlapping_blocks_total",
Help: "Total number of blocks that are overlapping and to be deleted.",
}, []string{"resolution", "level"}),
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
garbageCollectedBlocks: garbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
Expand All @@ -308,7 +303,6 @@ func NewDefaultGrouperWithMetrics(
compactionRunsCompleted *prometheus.CounterVec,
compactionFailures *prometheus.CounterVec,
verticalCompactions *prometheus.CounterVec,
overrlappingBlocks *prometheus.CounterVec,
blocksMarkedForDeletion prometheus.Counter,
garbageCollectedBlocks prometheus.Counter,
blocksMarkedForNoCompact prometheus.Counter,
Expand All @@ -326,7 +320,6 @@ func NewDefaultGrouperWithMetrics(
compactionRunsCompleted: compactionRunsCompleted,
compactionFailures: compactionFailures,
verticalCompactions: verticalCompactions,
overlappingBlocks: overrlappingBlocks,
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
garbageCollectedBlocks: garbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
Expand Down Expand Up @@ -359,7 +352,6 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro
g.compactionRunsCompleted.WithLabelValues(resolutionLabel),
g.compactionFailures.WithLabelValues(resolutionLabel),
g.verticalCompactions.WithLabelValues(resolutionLabel),
g.overlappingBlocks.WithLabelValues(resolutionLabel, fmt.Sprintf("%d", m.Compaction.Level)),
g.garbageCollectedBlocks,
g.blocksMarkedForDeletion,
g.blocksMarkedForNoCompact,
Expand Down Expand Up @@ -400,7 +392,6 @@ type Group struct {
compactionRunsCompleted prometheus.Counter
compactionFailures prometheus.Counter
verticalCompactions prometheus.Counter
overlappingBlocks prometheus.Counter
groupGarbageCollectedBlocks prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompact prometheus.Counter
Expand All @@ -424,7 +415,6 @@ func NewGroup(
compactionRunsCompleted prometheus.Counter,
compactionFailures prometheus.Counter,
verticalCompactions prometheus.Counter,
overlappingBlocks prometheus.Counter,
groupGarbageCollectedBlocks prometheus.Counter,
blocksMarkedForDeletion prometheus.Counter,
blocksMarkedForNoCompact prometheus.Counter,
Expand Down Expand Up @@ -453,7 +443,6 @@ func NewGroup(
compactionRunsCompleted: compactionRunsCompleted,
compactionFailures: compactionFailures,
verticalCompactions: verticalCompactions,
overlappingBlocks: overlappingBlocks,
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
Expand Down Expand Up @@ -1031,53 +1020,6 @@ func (cg *Group) areBlocksOverlapping(include *metadata.Meta, exclude ...*metada
return nil
}

func (cg *Group) removeOverlappingBlocks(ctx context.Context, toCompact []*metadata.Meta, dir string) error {
if len(toCompact) == 0 {
return nil
}
kept := toCompact[0]
for _, m := range toCompact {
if m.MinTime < kept.MinTime && m.MaxTime > kept.MaxTime {
level.Warn(cg.logger).Log("msg", "found overlapping block in plan that are not the first",
"first", kept.String(), "block", m.String())
kept = m
} else if (m.MinTime < kept.MinTime && kept.MinTime < m.MaxTime) ||
(m.MinTime < kept.MaxTime && kept.MaxTime < m.MaxTime) {
err := errors.Errorf("found partially overlapping block: %s vs %s", m.String(), kept.String())
if cg.enableVerticalCompaction {
level.Error(cg.logger).Log("msg", "best effort to vertical compact", "err", err)
return nil
} else {
return halt(err)
}
}
}
for _, m := range toCompact {
if m.ULID.Compare(kept.ULID) == 0 || m.Thanos.Source == metadata.ReceiveSource {
level.Info(cg.logger).Log("msg", "keep this overlapping block", "block", m.String(),
"level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels())
continue
}
cg.overlappingBlocks.Inc()
if err := DeleteBlockNow(ctx, cg.logger, cg.bkt, m, dir); err != nil {
return retry(err)
}
}
return retry(errors.Errorf("found overlapping blocks in plan. Only kept %s", kept.String()))
}

func DeleteBlockNow(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string) error {
level.Warn(logger).Log("msg", "delete polluted block immediately", "block", m.String(),
"level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels())
if err := block.Delete(ctx, logger, bkt, m.ULID); err != nil {
return errors.Wrapf(err, "delete overlapping block %s", m.String())
}
if err := os.RemoveAll(filepath.Join(dir, m.ULID.String())); err != nil {
return errors.Wrapf(err, "remove old block dir %s", m.String())
}
return nil
}

// RepairIssue347 repairs the https://github.com/prometheus/tsdb/issues/347 issue when having issue347Error.
func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, blocksMarkedForDeletion prometheus.Counter, issue347Err error) error {
ie, ok := errors.Cause(issue347Err).(Issue347Error)
Expand Down Expand Up @@ -1171,12 +1113,9 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
begin := groupCompactionBegin

if err := compactionLifecycleCallback.PreCompactionCallback(ctx, cg.logger, cg, toCompact); err != nil {
level.Error(cg.logger).Log("msg", fmt.Sprintf("failed to run pre compaction callback for plan: %v", toCompact), "err", err)
// instead of halting, we attempt to remove overlapped blocks and only keep the longest one.
if newErr := cg.removeOverlappingBlocks(ctx, toCompact, dir); newErr != nil {
return false, ulid.ULID{}, newErr
}
return false, ulid.ULID{}, errors.Wrapf(err, "failed to run pre compaction callback for plan: %s", fmt.Sprintf("%v", toCompact))
}
toCompact = FilterRemovedBlocks(toCompact)
level.Info(cg.logger).Log("msg", "finished running pre compaction callback; downloading blocks", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds(), "plan", fmt.Sprintf("%v", toCompact))

begin = time.Now()
Expand Down
120 changes: 120 additions & 0 deletions pkg/compact/overlapping.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package compact

import (
"context"
"fmt"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
)

type OverlappingCompactionLifecycleCallback struct {
overlappingBlocks prometheus.Counter
}

func NewOverlappingCompactionLifecycleCallback(reg *prometheus.Registry, enabled bool) CompactionLifecycleCallback {
if enabled {
return OverlappingCompactionLifecycleCallback{
overlappingBlocks: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_group_overlapping_blocks_total",
Help: "Total number of blocks that are overlapping and to be deleted.",
}),
}
}
return DefaultCompactionLifecycleCallback{}
}

// PreCompactionCallback given the assumption that toCompact is sorted by MinTime in ascending order from Planner
// (not guaranteed on MaxTime order), we will detect overlapping blocks and delete them while retaining all others.
func (o OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx context.Context, logger log.Logger, cg *Group, toCompact []*metadata.Meta) error {
if len(toCompact) == 0 {
return nil
}
prev := 0
for curr, currB := range toCompact {
prevB := toCompact[prev]
if curr == 0 || currB.Thanos.Source == metadata.ReceiveSource || prevB.MaxTime <= currB.MinTime {
// no overlapping with previous blocks, skip it
prev = curr
continue
} else if currB.MinTime < prevB.MinTime {
// halt when the assumption is broken, the input toCompact isn't sorted by minTime, need manual investigation
return halt(errors.Errorf("later blocks has smaller minTime than previous block: %s -- %s", prevB.String(), currB.String()))
} else if prevB.MaxTime < currB.MaxTime && prevB.MinTime != currB.MinTime {
err := errors.Errorf("found partially overlapping block: %s -- %s", prevB.String(), currB.String())
if cg.enableVerticalCompaction {
level.Error(logger).Log("msg", "best effort to vertical compact", "err", err)
prev = curr
continue
} else {
return halt(err)
}
} else if prevB.MinTime == currB.MinTime && prevB.MaxTime == currB.MaxTime {
if prevB.Stats.NumSeries != currB.Stats.NumSeries || prevB.Stats.NumSamples != currB.Stats.NumSamples {
level.Warn(logger).Log("msg", "found same time range but different stats, keep both blocks",
"prev", prevB.String(), "prevSeries", prevB.Stats.NumSeries, "prevSamples", prevB.Stats.NumSamples,
"curr", currB.String(), "currSeries", currB.Stats.NumSeries, "currSamples", currB.Stats.NumSamples,
)
prev = curr
continue
}
}
// prev min <= curr min < prev max
toDelete := -1
if prevB.MaxTime >= currB.MaxTime {
toDelete = curr
level.Warn(logger).Log("msg", "found overlapping block in plan, keep previous block",
"toKeep", prevB.String(), "toDelete", currB.String())
} else if prevB.MaxTime < currB.MaxTime {
toDelete = prev
prev = curr
level.Warn(logger).Log("msg", "found overlapping block in plan, keep current block",
"toKeep", currB.String(), "toDelete", prevB.String())
}
o.overlappingBlocks.Inc()
if err := DeleteBlockNow(ctx, logger, cg.bkt, toCompact[toDelete]); err != nil {
return retry(err)
}
toCompact[toDelete] = nil
}
return nil
}

func (o OverlappingCompactionLifecycleCallback) PostCompactionCallback(_ context.Context, _ log.Logger, _ *Group, _ ulid.ULID) error {
return nil
}

func (o OverlappingCompactionLifecycleCallback) GetBlockPopulator(_ context.Context, _ log.Logger, _ *Group) (tsdb.BlockPopulator, error) {
return tsdb.DefaultBlockPopulator{}, nil
}

func FilterRemovedBlocks(blocks []*metadata.Meta) (res []*metadata.Meta) {
for _, b := range blocks {
if b != nil {
res = append(res, b)
}
}
return res
}

func DeleteBlockNow(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta) error {
level.Warn(logger).Log("msg", "delete polluted block immediately", "block", m.String(),
"level", m.Compaction.Level, "parents", fmt.Sprintf("%v", m.Compaction.Parents),
"resolution", m.Thanos.Downsample.Resolution, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels(),
"series", m.Stats.NumSeries, "samples", m.Stats.NumSamples, "chunks", m.Stats.NumChunks)
if err := block.Delete(ctx, logger, bkt, m.ULID); err != nil {
return errors.Wrapf(err, "delete overlapping block %s", m.String())
}
return nil
}
Loading

0 comments on commit 3e50e25

Please sign in to comment.