Skip to content

Commit

Permalink
[PLAT-102919] auto delete overlapped blocks
Browse files Browse the repository at this point in the history
Signed-off-by: Yi Jin <[email protected]>
  • Loading branch information
jnyi committed Mar 12, 2024
1 parent cdd2623 commit bcf7ca5
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 6 deletions.
6 changes: 6 additions & 0 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type compactMetrics struct {
blocksCleaned prometheus.Counter
blockCleanupFailures prometheus.Counter
blocksMarked *prometheus.CounterVec
blocksOverlapped prometheus.Counter
garbageCollectedBlocks prometheus.Counter
}

Expand Down Expand Up @@ -153,6 +154,10 @@ func newCompactMetrics(reg *prometheus.Registry, deleteDelay time.Duration) *com
Name: "thanos_compact_blocks_marked_total",
Help: "Total number of blocks marked in compactor.",
}, []string{"marker", "reason"})
m.blocksOverlapped = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_blocks_overlapped_total",
Help: "Total number of blocks detected overlapped in compactor.",
})
m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason)
m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason)
m.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, "")
Expand Down Expand Up @@ -357,6 +362,7 @@ func runCompact(
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, ""),
compactMetrics.garbageCollectedBlocks,
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason),
compactMetrics.blocksOverlapped,
metadata.HashFunc(conf.hashFunc),
conf.blockFilesConcurrency,
conf.compactBlocksFetchConcurrency,
Expand Down
41 changes: 40 additions & 1 deletion pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ type DefaultGrouper struct {
garbageCollectedBlocks prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompact prometheus.Counter
blocksOverlapped prometheus.Counter
hashFunc metadata.HashFunc
blockFilesConcurrency int
compactBlocksFetchConcurrency int
Expand All @@ -254,6 +255,7 @@ func NewDefaultGrouper(
blocksMarkedForDeletion prometheus.Counter,
garbageCollectedBlocks prometheus.Counter,
blocksMarkedForNoCompact prometheus.Counter,
blocksOverlapped prometheus.Counter,
hashFunc metadata.HashFunc,
blockFilesConcurrency int,
compactBlocksFetchConcurrency int,
Expand Down Expand Up @@ -286,6 +288,7 @@ func NewDefaultGrouper(
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
garbageCollectedBlocks: garbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
blocksOverlapped: blocksOverlapped,
hashFunc: hashFunc,
blockFilesConcurrency: blockFilesConcurrency,
compactBlocksFetchConcurrency: compactBlocksFetchConcurrency,
Expand All @@ -306,6 +309,7 @@ func NewDefaultGrouperWithMetrics(
blocksMarkedForDeletion prometheus.Counter,
garbageCollectedBlocks prometheus.Counter,
blocksMarkedForNoCompact prometheus.Counter,
blocksOverlapped prometheus.Counter,
hashFunc metadata.HashFunc,
blockFilesConcurrency int,
compactBlocksFetchConcurrency int,
Expand All @@ -323,6 +327,7 @@ func NewDefaultGrouperWithMetrics(
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
garbageCollectedBlocks: garbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
blocksOverlapped: blocksOverlapped,
hashFunc: hashFunc,
blockFilesConcurrency: blockFilesConcurrency,
compactBlocksFetchConcurrency: compactBlocksFetchConcurrency,
Expand Down Expand Up @@ -355,6 +360,7 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro
g.garbageCollectedBlocks,
g.blocksMarkedForDeletion,
g.blocksMarkedForNoCompact,
g.blocksOverlapped,
g.hashFunc,
g.blockFilesConcurrency,
g.compactBlocksFetchConcurrency,
Expand Down Expand Up @@ -395,6 +401,7 @@ type Group struct {
groupGarbageCollectedBlocks prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompact prometheus.Counter
blocksOverlapped prometheus.Counter
hashFunc metadata.HashFunc
blockFilesConcurrency int
compactBlocksFetchConcurrency int
Expand All @@ -418,6 +425,7 @@ func NewGroup(
groupGarbageCollectedBlocks prometheus.Counter,
blocksMarkedForDeletion prometheus.Counter,
blocksMarkedForNoCompact prometheus.Counter,
blocksOverlapped prometheus.Counter,
hashFunc metadata.HashFunc,
blockFilesConcurrency int,
compactBlocksFetchConcurrency int,
Expand Down Expand Up @@ -446,6 +454,7 @@ func NewGroup(
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
blocksOverlapped: blocksOverlapped,
hashFunc: hashFunc,
blockFilesConcurrency: blockFilesConcurrency,
compactBlocksFetchConcurrency: compactBlocksFetchConcurrency,
Expand Down Expand Up @@ -1078,6 +1087,34 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket,
return nil
}

func (cg *Group) removeOverlappedBlocks(ctx context.Context, toCompact []*metadata.Meta, dir string, blockDeletableChecker BlockDeletableChecker) error {
if len(toCompact) == 0 {
return nil
}
kept := toCompact[0]
for _, m := range toCompact {
if m.MinTime < kept.MinTime && m.MaxTime > kept.MaxTime {
level.Warn(cg.logger).Log("msg", "found overlapping block in plan that are not the first", "block", m.String())
kept = m
} else if m.MinTime < kept.MinTime || m.MaxTime > kept.MaxTime {
return halt(errors.Errorf("found partially overlapping block: %s", m.String()))
}
}
for _, m := range toCompact {
if m.ULID.Compare(kept.ULID) == 0 {
level.Info(cg.logger).Log("msg", "skip the longest overlapping block", "block", m.String())
continue
}
if err := os.RemoveAll(filepath.Join(dir, m.ULID.String())); err != nil {
return errors.Wrapf(err, "remove old block dir %s", m.String())
}
if blockDeletableChecker.CanDelete(cg, m.ULID) {
return block.Delete(ctx, cg.logger, cg.bkt, m.ULID)
}
}
return nil
}

func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Compactor, blockDeletableChecker BlockDeletableChecker, compactionLifecycleCallback CompactionLifecycleCallback, errChan chan error) (shouldRerun bool, compID ulid.ULID, _ error) {
cg.mtx.Lock()
defer cg.mtx.Unlock()
Expand Down Expand Up @@ -1113,7 +1150,9 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
begin := groupCompactionBegin

if err := compactionLifecycleCallback.PreCompactionCallback(ctx, cg.logger, cg, toCompact); err != nil {
return false, ulid.ULID{}, errors.Wrapf(err, "failed to run pre compaction callback for plan: %s", fmt.Sprintf("%v", toCompact))
level.Error(cg.logger).Log("msg", fmt.Sprintf("failed to run pre compaction callback for plan: %v", toCompact), "err", err)
// instead of halting, we attempt to remove overlapped blocks and only keep the longest one.
return false, ulid.ULID{}, cg.removeOverlappedBlocks(ctx, toCompact, dir, blockDeletableChecker)
}
level.Info(cg.logger).Log("msg", "finished running pre compaction callback; downloading blocks", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds(), "plan", fmt.Sprintf("%v", toCompact))

Expand Down
6 changes: 4 additions & 2 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
blockMarkedForNoCompact := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
blocksOverlapped := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(nil, nil, 48*time.Hour, fetcherConcurrency)
sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks)
testutil.Ok(t, err)
Expand Down Expand Up @@ -140,7 +141,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
testutil.Ok(t, sy.GarbageCollect(ctx))

// Only the level 3 block, the last source block in both resolutions should be left.
grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, blockMarkedForNoCompact, metadata.NoneFunc, 10, 10)
grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, blockMarkedForNoCompact, blocksOverlapped, metadata.NoneFunc, 10, 10)
groups, err := grouper.Groups(sy.Metas())
testutil.Ok(t, err)

Expand Down Expand Up @@ -208,14 +209,15 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
blocksMaredForNoCompact := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
blocksOverlapped := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks)
testutil.Ok(t, err)

comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, mergeFunc)
testutil.Ok(t, err)

planner := NewPlanner(logger, []int64{1000, 3000}, noCompactMarkerFilter)
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, blocksMaredForNoCompact, metadata.NoneFunc, 10, 10)
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, blocksMaredForNoCompact, blocksOverlapped, metadata.NoneFunc, 10, 10)
bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2, true)
testutil.Ok(t, err)

Expand Down
6 changes: 3 additions & 3 deletions pkg/compact/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func TestRetentionProgressCalculate(t *testing.T) {

var bkt objstore.Bucket
temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for compact progress tests"})
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "", 1, 1)
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, temp, "", 1, 1)

type retInput struct {
meta []*metadata.Meta
Expand Down Expand Up @@ -355,7 +355,7 @@ func TestCompactProgressCalculate(t *testing.T) {

var bkt objstore.Bucket
temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for compact progress tests"})
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "", 1, 1)
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, temp, "", 1, 1)

for _, tcase := range []struct {
testName string
Expand Down Expand Up @@ -452,7 +452,7 @@ func TestDownsampleProgressCalculate(t *testing.T) {

var bkt objstore.Bucket
temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for downsample progress tests"})
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "", 1, 1)
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, temp, "", 1, 1)

for _, tcase := range []struct {
testName string
Expand Down

0 comments on commit bcf7ca5

Please sign in to comment.