diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index ee8158de1f..c8cc755667 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -107,6 +107,7 @@ type compactMetrics struct { blocksCleaned prometheus.Counter blockCleanupFailures prometheus.Counter blocksMarked *prometheus.CounterVec + blocksOverlapped prometheus.Counter garbageCollectedBlocks prometheus.Counter } @@ -153,6 +154,10 @@ func newCompactMetrics(reg *prometheus.Registry, deleteDelay time.Duration) *com Name: "thanos_compact_blocks_marked_total", Help: "Total number of blocks marked in compactor.", }, []string{"marker", "reason"}) + m.blocksOverlapped = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_compact_blocks_overlapped_total", + Help: "Total number of blocks detected overlapped in compactor.", + }) m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason) m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason) m.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, "") @@ -357,6 +362,7 @@ func runCompact( compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, ""), compactMetrics.garbageCollectedBlocks, compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason), + compactMetrics.blocksOverlapped, metadata.HashFunc(conf.hashFunc), conf.blockFilesConcurrency, conf.compactBlocksFetchConcurrency, diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index ae0d48bffd..9129bb3efe 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -239,6 +239,7 @@ type DefaultGrouper struct { garbageCollectedBlocks prometheus.Counter blocksMarkedForDeletion prometheus.Counter blocksMarkedForNoCompact prometheus.Counter + blocksOverlapped prometheus.Counter hashFunc metadata.HashFunc blockFilesConcurrency int compactBlocksFetchConcurrency int @@ -254,6 +255,7 @@ func NewDefaultGrouper( blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, blocksMarkedForNoCompact prometheus.Counter, + blocksOverlapped prometheus.Counter, hashFunc metadata.HashFunc, blockFilesConcurrency int, compactBlocksFetchConcurrency int, @@ -286,6 +288,7 @@ func NewDefaultGrouper( blocksMarkedForNoCompact: blocksMarkedForNoCompact, garbageCollectedBlocks: garbageCollectedBlocks, blocksMarkedForDeletion: blocksMarkedForDeletion, + blocksOverlapped: blocksOverlapped, hashFunc: hashFunc, blockFilesConcurrency: blockFilesConcurrency, compactBlocksFetchConcurrency: compactBlocksFetchConcurrency, @@ -306,6 +309,7 @@ func NewDefaultGrouperWithMetrics( blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, blocksMarkedForNoCompact prometheus.Counter, + blocksOverlapped prometheus.Counter, hashFunc metadata.HashFunc, blockFilesConcurrency int, compactBlocksFetchConcurrency int, @@ -323,6 +327,7 @@ func NewDefaultGrouperWithMetrics( blocksMarkedForNoCompact: blocksMarkedForNoCompact, garbageCollectedBlocks: garbageCollectedBlocks, blocksMarkedForDeletion: blocksMarkedForDeletion, + blocksOverlapped: blocksOverlapped, hashFunc: hashFunc, blockFilesConcurrency: blockFilesConcurrency, compactBlocksFetchConcurrency: compactBlocksFetchConcurrency, @@ -355,6 +360,7 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro g.garbageCollectedBlocks, g.blocksMarkedForDeletion, g.blocksMarkedForNoCompact, + g.blocksOverlapped, g.hashFunc, g.blockFilesConcurrency, g.compactBlocksFetchConcurrency, @@ -395,6 +401,7 @@ type Group struct { groupGarbageCollectedBlocks prometheus.Counter blocksMarkedForDeletion prometheus.Counter blocksMarkedForNoCompact prometheus.Counter + blocksOverlapped prometheus.Counter hashFunc metadata.HashFunc blockFilesConcurrency int compactBlocksFetchConcurrency int @@ -418,6 +425,7 @@ func NewGroup( groupGarbageCollectedBlocks prometheus.Counter, blocksMarkedForDeletion prometheus.Counter, blocksMarkedForNoCompact prometheus.Counter, + blocksOverlapped prometheus.Counter, hashFunc metadata.HashFunc, blockFilesConcurrency int, compactBlocksFetchConcurrency int, @@ -446,6 +454,7 @@ func NewGroup( groupGarbageCollectedBlocks: groupGarbageCollectedBlocks, blocksMarkedForDeletion: blocksMarkedForDeletion, blocksMarkedForNoCompact: blocksMarkedForNoCompact, + blocksOverlapped: blocksOverlapped, hashFunc: hashFunc, blockFilesConcurrency: blockFilesConcurrency, compactBlocksFetchConcurrency: compactBlocksFetchConcurrency, @@ -1078,6 +1087,34 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, return nil } +func (cg *Group) removeOverlappedBlocks(ctx context.Context, toCompact []*metadata.Meta, dir string, blockDeletableChecker BlockDeletableChecker) error { + if len(toCompact) == 0 { + return nil + } + kept := toCompact[0] + for _, m := range toCompact { + if m.MinTime < kept.MinTime && m.MaxTime > kept.MaxTime { + level.Warn(cg.logger).Log("msg", "found overlapping block in plan that are not the first", "block", m.String()) + kept = m + } else if m.MinTime < kept.MinTime || m.MaxTime > kept.MaxTime { + return halt(errors.Errorf("found partially overlapping block: %s", m.String())) + } + } + for _, m := range toCompact { + if m.ULID.Compare(kept.ULID) == 0 { + level.Info(cg.logger).Log("msg", "skip the longest overlapping block", "block", m.String()) + continue + } + if err := os.RemoveAll(filepath.Join(dir, m.ULID.String())); err != nil { + return errors.Wrapf(err, "remove old block dir %s", m.String()) + } + if blockDeletableChecker.CanDelete(cg, m.ULID) { + return block.Delete(ctx, cg.logger, cg.bkt, m.ULID) + } + } + return nil +} + func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Compactor, blockDeletableChecker BlockDeletableChecker, compactionLifecycleCallback CompactionLifecycleCallback, errChan chan error) (shouldRerun bool, compID ulid.ULID, _ error) { cg.mtx.Lock() defer cg.mtx.Unlock() @@ -1113,7 +1150,9 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp begin := groupCompactionBegin if err := compactionLifecycleCallback.PreCompactionCallback(ctx, cg.logger, cg, toCompact); err != nil { - return false, ulid.ULID{}, errors.Wrapf(err, "failed to run pre compaction callback for plan: %s", fmt.Sprintf("%v", toCompact)) + level.Error(cg.logger).Log("msg", fmt.Sprintf("failed to run pre compaction callback for plan: %v", toCompact), "err", err) + // instead of halting, we attempt to remove overlapped blocks and only keep the longest one. + return false, ulid.ULID{}, cg.removeOverlappedBlocks(ctx, toCompact, dir, blockDeletableChecker) } level.Info(cg.logger).Log("msg", "finished running pre compaction callback; downloading blocks", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds(), "plan", fmt.Sprintf("%v", toCompact)) diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index f1e01ec4f4..7fe4fd24c3 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -104,6 +104,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) blockMarkedForNoCompact := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + blocksOverlapped := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(nil, nil, 48*time.Hour, fetcherConcurrency) sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks) testutil.Ok(t, err) @@ -140,7 +141,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { testutil.Ok(t, sy.GarbageCollect(ctx)) // Only the level 3 block, the last source block in both resolutions should be left. - grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, blockMarkedForNoCompact, metadata.NoneFunc, 10, 10) + grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, blockMarkedForNoCompact, blocksOverlapped, metadata.NoneFunc, 10, 10) groups, err := grouper.Groups(sy.Metas()) testutil.Ok(t, err) @@ -208,6 +209,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) blocksMaredForNoCompact := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + blocksOverlapped := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks) testutil.Ok(t, err) @@ -215,7 +217,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg testutil.Ok(t, err) planner := NewPlanner(logger, []int64{1000, 3000}, noCompactMarkerFilter) - grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, blocksMaredForNoCompact, metadata.NoneFunc, 10, 10) + grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, blocksMaredForNoCompact, blocksOverlapped, metadata.NoneFunc, 10, 10) bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2, true) testutil.Ok(t, err) diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go index 1d3764907c..3fbda2d766 100644 --- a/pkg/compact/compact_test.go +++ b/pkg/compact/compact_test.go @@ -212,7 +212,7 @@ func TestRetentionProgressCalculate(t *testing.T) { var bkt objstore.Bucket temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for compact progress tests"}) - grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "", 1, 1) + grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, temp, "", 1, 1) type retInput struct { meta []*metadata.Meta @@ -355,7 +355,7 @@ func TestCompactProgressCalculate(t *testing.T) { var bkt objstore.Bucket temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for compact progress tests"}) - grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "", 1, 1) + grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, temp, "", 1, 1) for _, tcase := range []struct { testName string @@ -452,7 +452,7 @@ func TestDownsampleProgressCalculate(t *testing.T) { var bkt objstore.Bucket temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for downsample progress tests"}) - grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "", 1, 1) + grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, temp, "", 1, 1) for _, tcase := range []struct { testName string