From 5fc34353d3fb05ce90562db36b8cd2d8786c830a Mon Sep 17 00:00:00 2001 From: Yi Jin Date: Thu, 21 Mar 2024 15:29:14 -0700 Subject: [PATCH 1/4] [PLAT-104290] remove overlapping blocks into its own file Signed-off-by: Yi Jin --- pkg/compact/compact.go | 56 ++------------------ pkg/compact/overlapping.go | 92 +++++++++++++++++++++++++++++++++ pkg/compact/overlapping_test.go | 13 +++++ 3 files changed, 108 insertions(+), 53 deletions(-) create mode 100644 pkg/compact/overlapping.go create mode 100644 pkg/compact/overlapping_test.go diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 7f7fae3084..d0a4dff53b 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -1031,53 +1031,6 @@ func (cg *Group) areBlocksOverlapping(include *metadata.Meta, exclude ...*metada return nil } -func (cg *Group) removeOverlappingBlocks(ctx context.Context, toCompact []*metadata.Meta, dir string) error { - if len(toCompact) == 0 { - return nil - } - kept := toCompact[0] - for _, m := range toCompact { - if m.MinTime < kept.MinTime && m.MaxTime > kept.MaxTime { - level.Warn(cg.logger).Log("msg", "found overlapping block in plan that are not the first", - "first", kept.String(), "block", m.String()) - kept = m - } else if (m.MinTime < kept.MinTime && kept.MinTime < m.MaxTime) || - (m.MinTime < kept.MaxTime && kept.MaxTime < m.MaxTime) { - err := errors.Errorf("found partially overlapping block: %s vs %s", m.String(), kept.String()) - if cg.enableVerticalCompaction { - level.Error(cg.logger).Log("msg", "best effort to vertical compact", "err", err) - return nil - } else { - return halt(err) - } - } - } - for _, m := range toCompact { - if m.ULID.Compare(kept.ULID) == 0 || m.Thanos.Source == metadata.ReceiveSource { - level.Info(cg.logger).Log("msg", "keep this overlapping block", "block", m.String(), - "level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels()) - continue - } - cg.overlappingBlocks.Inc() - if err := DeleteBlockNow(ctx, cg.logger, cg.bkt, m, dir); err != nil { - return retry(err) - } - } - return retry(errors.Errorf("found overlapping blocks in plan. Only kept %s", kept.String())) -} - -func DeleteBlockNow(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string) error { - level.Warn(logger).Log("msg", "delete polluted block immediately", "block", m.String(), - "level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels()) - if err := block.Delete(ctx, logger, bkt, m.ULID); err != nil { - return errors.Wrapf(err, "delete overlapping block %s", m.String()) - } - if err := os.RemoveAll(filepath.Join(dir, m.ULID.String())); err != nil { - return errors.Wrapf(err, "remove old block dir %s", m.String()) - } - return nil -} - // RepairIssue347 repairs the https://github.com/prometheus/tsdb/issues/347 issue when having issue347Error. func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, blocksMarkedForDeletion prometheus.Counter, issue347Err error) error { ie, ok := errors.Cause(issue347Err).(Issue347Error) @@ -1171,12 +1124,9 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp begin := groupCompactionBegin if err := compactionLifecycleCallback.PreCompactionCallback(ctx, cg.logger, cg, toCompact); err != nil { - level.Error(cg.logger).Log("msg", fmt.Sprintf("failed to run pre compaction callback for plan: %v", toCompact), "err", err) - // instead of halting, we attempt to remove overlapped blocks and only keep the longest one. - if newErr := cg.removeOverlappingBlocks(ctx, toCompact, dir); newErr != nil { - return false, ulid.ULID{}, newErr - } + return false, ulid.ULID{}, errors.Wrapf(err, "failed to run pre compaction callback for plan: %s", fmt.Sprintf("%v", toCompact)) } + toCompact = FilterNilBlocks(toCompact) level.Info(cg.logger).Log("msg", "finished running pre compaction callback; downloading blocks", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds(), "plan", fmt.Sprintf("%v", toCompact)) begin = time.Now() @@ -1407,7 +1357,7 @@ func NewBucketCompactor( planner, comp, DefaultBlockDeletableChecker{}, - DefaultCompactionLifecycleCallback{}, + NewOverlappingCompactionLifecycleCallback(), compactDir, bkt, concurrency, diff --git a/pkg/compact/overlapping.go b/pkg/compact/overlapping.go new file mode 100644 index 0000000000..1f211ce553 --- /dev/null +++ b/pkg/compact/overlapping.go @@ -0,0 +1,92 @@ +package compact + +import ( + "context" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + "os" + "path/filepath" +) + +type OverlappingCompactionLifecycleCallback struct { + overlappingBlocks prometheus.Counter + metaDir string +} + +func NewOverlappingCompactionLifecycleCallback() *OverlappingCompactionLifecycleCallback { + return &OverlappingCompactionLifecycleCallback{} +} + +// PreCompactionCallback given the assumption that toCompact is sorted by MinTime in ascending order from Planner +// (not guaranteed on MaxTime order), we will detect overlapping blocks and delete them while retaining all others. +func (c *OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx context.Context, logger log.Logger, cg *Group, toCompact []*metadata.Meta) error { + if len(toCompact) == 0 { + return nil + } + previous := 0 + for i, m := range toCompact { + kept := toCompact[previous] + if previous == 0 || m.Thanos.Source == metadata.ReceiveSource || kept.MaxTime <= m.MinTime { + // no overlapping with previous blocks, skip it + previous = i + continue + } else if m.MinTime < kept.MinTime { + // halt when the assumption is broken, need manual investigation + return halt(errors.Errorf("later blocks has smaller minTime than previous block: %s -- %s", kept.String(), m.String())) + } + if kept.MaxTime >= m.MaxTime { + level.Warn(logger).Log("msg", "found overlapping block in plan", + "toKeep", kept.String(), "toDelete", m.String()) + cg.overlappingBlocks.Inc() + if err := DeleteBlockNow(ctx, logger, cg.bkt, m, c.metaDir); err != nil { + return retry(err) + } + toCompact[i] = nil + } else { + err := errors.Errorf("found partially overlapping block: %s -- %s", kept.String(), m.String()) + if cg.enableVerticalCompaction { + level.Error(logger).Log("msg", "best effort to vertical compact", "err", err) + previous = i // move to next block + } else { + return halt(err) + } + } + } + return nil +} + +func (c *OverlappingCompactionLifecycleCallback) PostCompactionCallback(_ context.Context, _ log.Logger, _ *Group, _ ulid.ULID) error { + return nil +} + +func (c *OverlappingCompactionLifecycleCallback) GetBlockPopulator(_ context.Context, _ log.Logger, _ *Group) (tsdb.BlockPopulator, error) { + return tsdb.DefaultBlockPopulator{}, nil +} + +func FilterNilBlocks(blocks []*metadata.Meta) (res []*metadata.Meta) { + for _, b := range blocks { + if b != nil { + res = append(res, b) + } + } + return res +} + +func DeleteBlockNow(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string) error { + level.Warn(logger).Log("msg", "delete polluted block immediately", "block", m.String(), + "level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels()) + if err := block.Delete(ctx, logger, bkt, m.ULID); err != nil { + return errors.Wrapf(err, "delete overlapping block %s", m.String()) + } + if err := os.RemoveAll(filepath.Join(dir, m.ULID.String())); err != nil { + return errors.Wrapf(err, "remove old block dir %s", m.String()) + } + return nil +} diff --git a/pkg/compact/overlapping_test.go b/pkg/compact/overlapping_test.go new file mode 100644 index 0000000000..78f248156c --- /dev/null +++ b/pkg/compact/overlapping_test.go @@ -0,0 +1,13 @@ +package compact + +import ( + "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block/metadata" + "testing" +) + +func TestFilterNilCompact(t *testing.T) { + blocks := []*metadata.Meta{nil, nil} + filtered := FilterNilBlocks(blocks) + testutil.Equals(t, 0, len(filtered)) +} From 843f7128bd1643d2cacedebb662afaaec929a51e Mon Sep 17 00:00:00 2001 From: Yi Jin Date: Thu, 21 Mar 2024 21:29:56 -0700 Subject: [PATCH 2/4] [PLAT-104290] fixed some bugs and added unit tests Signed-off-by: Yi Jin --- pkg/block/fetcher.go | 18 +--- pkg/block/metadata/meta.go | 8 ++ pkg/compact/compact.go | 6 +- pkg/compact/overlapping.go | 70 +++++++------ pkg/compact/overlapping_test.go | 170 +++++++++++++++++++++++++++++++- 5 files changed, 221 insertions(+), 51 deletions(-) diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 0420cef7da..875909e182 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -588,13 +588,7 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filter metas := make(map[ulid.ULID]*metadata.Meta, len(resp.metas)) for id, m := range resp.metas { metas[id] = m - if tenant, ok := m.Thanos.Labels[metadata.TenantLabel]; ok { - numBlocksByTenant[tenant]++ - } else { - level.Error(f.logger).Log("msg", "found blocks without label "+metadata.TenantLabel, - "block", m.String(), "level", m.Compaction.Level, "labels", m.Thanos.GetLabels()) - numBlocksByTenant[metadata.DefaultTenant]++ - } + numBlocksByTenant[m.Thanos.GetTenant()]++ } for tenant, numBlocks := range numBlocksByTenant { @@ -616,15 +610,7 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filter // Therefore, it's skipped to update the gauge. if len(filters) > 0 { for _, m := range metas { - var tenant string - var ok bool - // tenant and replica will have the zero value ("") if the key is not in the map. - if tenant, ok = m.Thanos.Labels[metadata.TenantLabel]; !ok { - level.Error(f.logger).Log("msg", "found blocks without label "+metadata.TenantLabel, - "block", m.String(), "level", m.Compaction.Level, "labels", m.Thanos.GetLabels()) - tenant = metadata.DefaultTenant - } - metrics.Assigned.WithLabelValues(tenant, strconv.Itoa(m.BlockMeta.Compaction.Level)).Inc() + metrics.Assigned.WithLabelValues(m.Thanos.GetTenant(), strconv.Itoa(m.BlockMeta.Compaction.Level)).Inc() } } diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index 649457906b..9244441c2b 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -115,6 +115,14 @@ func (m *Thanos) ParseExtensions(v any) (any, error) { return ConvertExtensions(m.Extensions, v) } +func (m *Thanos) GetTenant() string { + if tenant, ok := m.Labels[TenantLabel]; ok { + return tenant + } else { + return DefaultTenant + } +} + func (m *Thanos) GetLabels() string { b := new(bytes.Buffer) for k, v := range m.Labels { diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index d0a4dff53b..e851016f1e 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -287,7 +287,7 @@ func NewDefaultGrouper( overlappingBlocks: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_compact_group_overlapping_blocks_total", Help: "Total number of blocks that are overlapping and to be deleted.", - }, []string{"resolution", "level"}), + }, []string{"resolution", "tenant"}), blocksMarkedForNoCompact: blocksMarkedForNoCompact, garbageCollectedBlocks: garbageCollectedBlocks, blocksMarkedForDeletion: blocksMarkedForDeletion, @@ -359,7 +359,7 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro g.compactionRunsCompleted.WithLabelValues(resolutionLabel), g.compactionFailures.WithLabelValues(resolutionLabel), g.verticalCompactions.WithLabelValues(resolutionLabel), - g.overlappingBlocks.WithLabelValues(resolutionLabel, fmt.Sprintf("%d", m.Compaction.Level)), + g.overlappingBlocks.WithLabelValues(resolutionLabel, m.Thanos.GetLabels()), g.garbageCollectedBlocks, g.blocksMarkedForDeletion, g.blocksMarkedForNoCompact, @@ -1126,7 +1126,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp if err := compactionLifecycleCallback.PreCompactionCallback(ctx, cg.logger, cg, toCompact); err != nil { return false, ulid.ULID{}, errors.Wrapf(err, "failed to run pre compaction callback for plan: %s", fmt.Sprintf("%v", toCompact)) } - toCompact = FilterNilBlocks(toCompact) + toCompact = FilterRemovedBlocks(toCompact) level.Info(cg.logger).Log("msg", "finished running pre compaction callback; downloading blocks", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds(), "plan", fmt.Sprintf("%v", toCompact)) begin = time.Now() diff --git a/pkg/compact/overlapping.go b/pkg/compact/overlapping.go index 1f211ce553..987510f2cd 100644 --- a/pkg/compact/overlapping.go +++ b/pkg/compact/overlapping.go @@ -1,23 +1,23 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + package compact import ( "context" + "fmt" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" - "os" - "path/filepath" ) type OverlappingCompactionLifecycleCallback struct { - overlappingBlocks prometheus.Counter - metaDir string } func NewOverlappingCompactionLifecycleCallback() *OverlappingCompactionLifecycleCallback { @@ -30,34 +30,45 @@ func (c *OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx conte if len(toCompact) == 0 { return nil } - previous := 0 - for i, m := range toCompact { - kept := toCompact[previous] - if previous == 0 || m.Thanos.Source == metadata.ReceiveSource || kept.MaxTime <= m.MinTime { + prev := 0 + for curr, currB := range toCompact { + prevB := toCompact[prev] + if curr == 0 || currB.Thanos.Source == metadata.ReceiveSource || prevB.MaxTime <= currB.MinTime { // no overlapping with previous blocks, skip it - previous = i + prev = curr continue - } else if m.MinTime < kept.MinTime { + } else if currB.MinTime < prevB.MinTime { // halt when the assumption is broken, need manual investigation - return halt(errors.Errorf("later blocks has smaller minTime than previous block: %s -- %s", kept.String(), m.String())) - } - if kept.MaxTime >= m.MaxTime { - level.Warn(logger).Log("msg", "found overlapping block in plan", - "toKeep", kept.String(), "toDelete", m.String()) - cg.overlappingBlocks.Inc() - if err := DeleteBlockNow(ctx, logger, cg.bkt, m, c.metaDir); err != nil { - return retry(err) - } - toCompact[i] = nil - } else { - err := errors.Errorf("found partially overlapping block: %s -- %s", kept.String(), m.String()) + return halt(errors.Errorf("later blocks has smaller minTime than previous block: %s -- %s", prevB.String(), currB.String())) + } else if prevB.MaxTime < currB.MaxTime && prevB.MinTime != currB.MinTime { + err := errors.Errorf("found partially overlapping block: %s -- %s", prevB.String(), currB.String()) if cg.enableVerticalCompaction { level.Error(logger).Log("msg", "best effort to vertical compact", "err", err) - previous = i // move to next block + prev = curr + continue } else { return halt(err) } + } else if prevB.MinTime == currB.MinTime && prevB.MaxTime == currB.MaxTime { + continue + } + // prev min <= curr min < prev max + toDelete := -1 + if prevB.MaxTime >= currB.MaxTime { + toDelete = curr + level.Warn(logger).Log("msg", "found overlapping block in plan, keep previous block", + "toKeep", prevB.String(), "toDelete", currB.String()) + } else if prevB.MaxTime < currB.MaxTime { + toDelete = prev + prev = curr + level.Warn(logger).Log("msg", "found overlapping block in plan, keep current block", + "toKeep", currB.String(), "toDelete", prevB.String()) } + cg.overlappingBlocks.Inc() + if err := DeleteBlockNow(ctx, logger, cg.bkt, toCompact[toDelete]); err != nil { + return retry(err) + } + toCompact[toDelete] = nil } return nil } @@ -70,7 +81,7 @@ func (c *OverlappingCompactionLifecycleCallback) GetBlockPopulator(_ context.Con return tsdb.DefaultBlockPopulator{}, nil } -func FilterNilBlocks(blocks []*metadata.Meta) (res []*metadata.Meta) { +func FilterRemovedBlocks(blocks []*metadata.Meta) (res []*metadata.Meta) { for _, b := range blocks { if b != nil { res = append(res, b) @@ -79,14 +90,13 @@ func FilterNilBlocks(blocks []*metadata.Meta) (res []*metadata.Meta) { return res } -func DeleteBlockNow(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string) error { +func DeleteBlockNow(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta) error { level.Warn(logger).Log("msg", "delete polluted block immediately", "block", m.String(), - "level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels()) + "level", m.Compaction.Level, "parents", fmt.Sprintf("%v", m.Compaction.Parents), + "resolution", m.Thanos.Downsample.Resolution, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels(), + "series", m.Stats.NumSeries, "samples", m.Stats.NumSamples, "chunks", m.Stats.NumChunks) if err := block.Delete(ctx, logger, bkt, m.ULID); err != nil { return errors.Wrapf(err, "delete overlapping block %s", m.String()) } - if err := os.RemoveAll(filepath.Join(dir, m.ULID.String())); err != nil { - return errors.Wrapf(err, "remove old block dir %s", m.String()) - } return nil } diff --git a/pkg/compact/overlapping_test.go b/pkg/compact/overlapping_test.go index 78f248156c..320fc490f7 100644 --- a/pkg/compact/overlapping_test.go +++ b/pkg/compact/overlapping_test.go @@ -1,13 +1,179 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + package compact import ( + "context" + "testing" + "time" + "github.com/efficientgo/core/testutil" + "github.com/go-kit/log" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block/metadata" - "testing" + "github.com/thanos-io/thanos/pkg/compact/downsample" ) func TestFilterNilCompact(t *testing.T) { blocks := []*metadata.Meta{nil, nil} - filtered := FilterNilBlocks(blocks) + filtered := FilterRemovedBlocks(blocks) testutil.Equals(t, 0, len(filtered)) + + meta := []*metadata.Meta{ + createBlockMeta(6, 1, int64(time.Now().Add(-6*30*24*time.Hour).Unix()*1000), map[string]string{"a": "1"}, downsample.ResLevel0, []uint64{}), + nil, + createBlockMeta(7, 1, int64(time.Now().Add(-4*30*24*time.Hour).Unix()*1000), map[string]string{"b": "2"}, downsample.ResLevel1, []uint64{}), + createBlockMeta(8, 1, int64(time.Now().Add(-7*30*24*time.Hour).Unix()*1000), map[string]string{"a": "1", "b": "2"}, downsample.ResLevel2, []uint64{}), + nil, + } + testutil.Equals(t, 3, len(FilterRemovedBlocks(meta))) +} + +func TestPreCompactionCallback(t *testing.T) { + reg := prometheus.NewRegistry() + logger := log.NewNopLogger() + bkt := objstore.NewInMemBucket() + temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for overlapping blocks"}) + group := &Group{ + logger: log.NewNopLogger(), + bkt: bkt, + overlappingBlocks: temp, + } + labels := map[string]string{"a": "1"} + callback := NewOverlappingCompactionLifecycleCallback() + for _, tcase := range []struct { + testName string + input []*metadata.Meta + enableVerticalCompaction bool + expectedSize int + expectedBlocks []*metadata.Meta + err error + }{ + { + testName: "empty blocks", + }, + { + testName: "no overlapping blocks", + input: []*metadata.Meta{ + createBlockMeta(6, 1, 3, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(7, 3, 5, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(8, 5, 10, labels, downsample.ResLevel0, []uint64{}), + }, + expectedSize: 3, + }, + { + testName: "duplicated blocks", + input: []*metadata.Meta{ + createBlockMeta(6, 1, 7, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(7, 1, 7, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(8, 1, 7, labels, downsample.ResLevel0, []uint64{}), + }, + expectedSize: 3, + }, + { + testName: "receive blocks", + input: []*metadata.Meta{ + createReceiveBlockMeta(6, 1, 7, labels), + createReceiveBlockMeta(7, 1, 7, labels), + createReceiveBlockMeta(8, 1, 7, labels), + }, + expectedSize: 3, + }, + { + testName: "receive + compactor blocks", + input: []*metadata.Meta{ + createReceiveBlockMeta(6, 1, 7, labels), + createBlockMeta(7, 2, 7, labels, downsample.ResLevel0, []uint64{}), + createReceiveBlockMeta(8, 2, 8, labels), + }, + expectedSize: 2, + expectedBlocks: []*metadata.Meta{ + createReceiveBlockMeta(6, 1, 7, labels), + createReceiveBlockMeta(8, 2, 8, labels), + }, + }, + { + testName: "full overlapping blocks", + input: []*metadata.Meta{ + createBlockMeta(6, 1, 10, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(7, 3, 6, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(8, 5, 8, labels, downsample.ResLevel0, []uint64{}), + }, + expectedSize: 1, + expectedBlocks: []*metadata.Meta{ + createBlockMeta(6, 1, 10, labels, downsample.ResLevel0, []uint64{}), + }, + }, + { + testName: "part overlapping blocks", + input: []*metadata.Meta{ + createBlockMeta(1, 1, 2, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(2, 1, 6, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(3, 6, 8, labels, downsample.ResLevel0, []uint64{}), + }, + expectedSize: 2, + expectedBlocks: []*metadata.Meta{ + createBlockMeta(2, 1, 6, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(3, 6, 8, labels, downsample.ResLevel0, []uint64{}), + }, + }, + { + testName: "out of order blocks", + input: []*metadata.Meta{ + createBlockMeta(6, 2, 3, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(7, 0, 5, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(8, 5, 8, labels, downsample.ResLevel0, []uint64{}), + }, + err: halt(errors.Errorf("expect halt error")), + }, + { + testName: "partially overlapping blocks with vertical compaction off", + input: []*metadata.Meta{ + createBlockMeta(6, 2, 4, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(7, 3, 5, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(8, 5, 8, labels, downsample.ResLevel0, []uint64{}), + }, + err: halt(errors.Errorf("expect halt error")), + }, + { + testName: "partially overlapping blocks with vertical compaction on", + input: []*metadata.Meta{ + createBlockMeta(6, 2, 4, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(7, 3, 6, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(8, 5, 8, labels, downsample.ResLevel0, []uint64{}), + }, + enableVerticalCompaction: true, + expectedSize: 3, + }, + } { + if ok := t.Run(tcase.testName, func(t *testing.T) { + group.enableVerticalCompaction = tcase.enableVerticalCompaction + err := callback.PreCompactionCallback(context.Background(), logger, group, tcase.input) + if tcase.err != nil { + testutil.NotOk(t, err) + if IsHaltError(tcase.err) { + testutil.Assert(t, IsHaltError(err), "expected halt error") + } else if IsRetryError(tcase.err) { + testutil.Assert(t, IsRetryError(err), "expected retry error") + } + return + } + testutil.Equals(t, tcase.expectedSize, len(FilterRemovedBlocks(tcase.input))) + if tcase.expectedSize != len(tcase.input) { + testutil.Equals(t, tcase.expectedBlocks, FilterRemovedBlocks(tcase.input)) + } + }); !ok { + return + } + } +} + +func createReceiveBlockMeta(id uint64, minTime, maxTime int64, labels map[string]string) *metadata.Meta { + m := createBlockMeta(id, minTime, maxTime, labels, downsample.ResLevel0, []uint64{}) + m.Thanos.Source = metadata.ReceiveSource + return m } From f86995a56b189e069ee3cfb348809a41dc67e337 Mon Sep 17 00:00:00 2001 From: Yi Jin Date: Thu, 21 Mar 2024 23:00:05 -0700 Subject: [PATCH 3/4] test rollback to previous callbacks Signed-off-by: Yi Jin --- pkg/compact/compact.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index e851016f1e..9ebf6280ce 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -1357,7 +1357,7 @@ func NewBucketCompactor( planner, comp, DefaultBlockDeletableChecker{}, - NewOverlappingCompactionLifecycleCallback(), + DefaultCompactionLifecycleCallback{}, compactDir, bkt, concurrency, From cfc344f420d5ad156c30654f8fe3491ec9619b92 Mon Sep 17 00:00:00 2001 From: Yi Jin Date: Fri, 22 Mar 2024 10:52:31 -0700 Subject: [PATCH 4/4] [PLAT-102919] add a flag to control overlapping block removal behavior Signed-off-by: Yi Jin --- cmd/thanos/compact.go | 8 ++- pkg/block/metadata/meta.go | 10 +-- pkg/block/metadata/meta_test.go | 2 +- pkg/compact/compact.go | 11 ---- pkg/compact/overlapping.go | 36 ++++++++--- pkg/compact/overlapping_test.go | 105 ++++++++++++++++++-------------- 6 files changed, 100 insertions(+), 72 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 8425df4238..90a12850a9 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -369,12 +369,14 @@ func runCompact( compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason), ) blocksCleaner := compact.NewBlocksCleaner(logger, insBkt, ignoreDeletionMarkFilter, deleteDelay, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures) - compactor, err := compact.NewBucketCompactor( + compactor, err := compact.NewBucketCompactorWithCheckerAndCallback( logger, sy, grouper, planner, comp, + compact.DefaultBlockDeletableChecker{}, + compact.NewOverlappingCompactionLifecycleCallback(reg, conf.enableOverlappingRemoval), compactDir, insBkt, conf.compactionConcurrency, @@ -710,6 +712,7 @@ type compactConfig struct { maxBlockIndexSize units.Base2Bytes hashFunc string enableVerticalCompaction bool + enableOverlappingRemoval bool dedupFunc string skipBlockWithOutOfOrderChunks bool progressCalculateInterval time.Duration @@ -786,6 +789,9 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { "NOTE: This flag is ignored and (enabled) when --deduplication.replica-label flag is set."). Hidden().Default("false").BoolVar(&cc.enableVerticalCompaction) + cmd.Flag("compact.enable-overlapping-removal", "In house flag to remove overlapping blocks. Turn this on to fix https://github.com/thanos-io/thanos/issues/6775."). + Default("false").BoolVar(&cc.enableOverlappingRemoval) + cmd.Flag("deduplication.func", "Experimental. Deduplication algorithm for merging overlapping blocks. "+ "Possible values are: \"\", \"penalty\". If no value is specified, the default compact deduplication merger is used, which performs 1:1 deduplication for samples. "+ "When set to penalty, penalty based deduplication algorithm will be used. At least one replica label has to be set via --deduplication.replica-label flag."). diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index 9244441c2b..2a2829b6b4 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -9,12 +9,13 @@ package metadata // this package. import ( - "bytes" "encoding/json" "fmt" "io" "os" "path/filepath" + "sort" + "strings" "github.com/go-kit/log" "github.com/oklog/ulid" @@ -124,11 +125,12 @@ func (m *Thanos) GetTenant() string { } func (m *Thanos) GetLabels() string { - b := new(bytes.Buffer) + res := make([]string, 0, len(m.Labels)) for k, v := range m.Labels { - fmt.Fprintf(b, "%s=%s,", k, v) + res = append(res, fmt.Sprintf("%s=%s", k, v)) } - return b.String() + sort.Strings(res) + return strings.Join(res, ",") } // ConvertExtensions converts extensions with `any` type into specific type `v` diff --git a/pkg/block/metadata/meta_test.go b/pkg/block/metadata/meta_test.go index af8b1cb01a..ca00c7c365 100644 --- a/pkg/block/metadata/meta_test.go +++ b/pkg/block/metadata/meta_test.go @@ -347,7 +347,7 @@ func TestMeta_GetLabels(t *testing.T) { Labels: map[string]string{"a": "b", "c": "d"}, }, } - testutil.Equals(t, "a=b,c=d,", m.Thanos.GetLabels()) + testutil.Equals(t, "a=b,c=d", m.Thanos.GetLabels()) } type TestExtensions struct { diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 9ebf6280ce..52133ec2ea 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -236,7 +236,6 @@ type DefaultGrouper struct { compactionRunsCompleted *prometheus.CounterVec compactionFailures *prometheus.CounterVec verticalCompactions *prometheus.CounterVec - overlappingBlocks *prometheus.CounterVec garbageCollectedBlocks prometheus.Counter blocksMarkedForDeletion prometheus.Counter blocksMarkedForNoCompact prometheus.Counter @@ -284,10 +283,6 @@ func NewDefaultGrouper( Name: "thanos_compact_group_vertical_compactions_total", Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.", }, []string{"resolution"}), - overlappingBlocks: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_compact_group_overlapping_blocks_total", - Help: "Total number of blocks that are overlapping and to be deleted.", - }, []string{"resolution", "tenant"}), blocksMarkedForNoCompact: blocksMarkedForNoCompact, garbageCollectedBlocks: garbageCollectedBlocks, blocksMarkedForDeletion: blocksMarkedForDeletion, @@ -308,7 +303,6 @@ func NewDefaultGrouperWithMetrics( compactionRunsCompleted *prometheus.CounterVec, compactionFailures *prometheus.CounterVec, verticalCompactions *prometheus.CounterVec, - overrlappingBlocks *prometheus.CounterVec, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, blocksMarkedForNoCompact prometheus.Counter, @@ -326,7 +320,6 @@ func NewDefaultGrouperWithMetrics( compactionRunsCompleted: compactionRunsCompleted, compactionFailures: compactionFailures, verticalCompactions: verticalCompactions, - overlappingBlocks: overrlappingBlocks, blocksMarkedForNoCompact: blocksMarkedForNoCompact, garbageCollectedBlocks: garbageCollectedBlocks, blocksMarkedForDeletion: blocksMarkedForDeletion, @@ -359,7 +352,6 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro g.compactionRunsCompleted.WithLabelValues(resolutionLabel), g.compactionFailures.WithLabelValues(resolutionLabel), g.verticalCompactions.WithLabelValues(resolutionLabel), - g.overlappingBlocks.WithLabelValues(resolutionLabel, m.Thanos.GetLabels()), g.garbageCollectedBlocks, g.blocksMarkedForDeletion, g.blocksMarkedForNoCompact, @@ -400,7 +392,6 @@ type Group struct { compactionRunsCompleted prometheus.Counter compactionFailures prometheus.Counter verticalCompactions prometheus.Counter - overlappingBlocks prometheus.Counter groupGarbageCollectedBlocks prometheus.Counter blocksMarkedForDeletion prometheus.Counter blocksMarkedForNoCompact prometheus.Counter @@ -424,7 +415,6 @@ func NewGroup( compactionRunsCompleted prometheus.Counter, compactionFailures prometheus.Counter, verticalCompactions prometheus.Counter, - overlappingBlocks prometheus.Counter, groupGarbageCollectedBlocks prometheus.Counter, blocksMarkedForDeletion prometheus.Counter, blocksMarkedForNoCompact prometheus.Counter, @@ -453,7 +443,6 @@ func NewGroup( compactionRunsCompleted: compactionRunsCompleted, compactionFailures: compactionFailures, verticalCompactions: verticalCompactions, - overlappingBlocks: overlappingBlocks, groupGarbageCollectedBlocks: groupGarbageCollectedBlocks, blocksMarkedForDeletion: blocksMarkedForDeletion, blocksMarkedForNoCompact: blocksMarkedForNoCompact, diff --git a/pkg/compact/overlapping.go b/pkg/compact/overlapping.go index 987510f2cd..adc363ba63 100644 --- a/pkg/compact/overlapping.go +++ b/pkg/compact/overlapping.go @@ -11,6 +11,8 @@ import ( "github.com/go-kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block" @@ -18,15 +20,24 @@ import ( ) type OverlappingCompactionLifecycleCallback struct { + overlappingBlocks prometheus.Counter } -func NewOverlappingCompactionLifecycleCallback() *OverlappingCompactionLifecycleCallback { - return &OverlappingCompactionLifecycleCallback{} +func NewOverlappingCompactionLifecycleCallback(reg *prometheus.Registry, enabled bool) CompactionLifecycleCallback { + if enabled { + return OverlappingCompactionLifecycleCallback{ + overlappingBlocks: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_compact_group_overlapping_blocks_total", + Help: "Total number of blocks that are overlapping and to be deleted.", + }), + } + } + return DefaultCompactionLifecycleCallback{} } // PreCompactionCallback given the assumption that toCompact is sorted by MinTime in ascending order from Planner // (not guaranteed on MaxTime order), we will detect overlapping blocks and delete them while retaining all others. -func (c *OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx context.Context, logger log.Logger, cg *Group, toCompact []*metadata.Meta) error { +func (o OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx context.Context, logger log.Logger, cg *Group, toCompact []*metadata.Meta) error { if len(toCompact) == 0 { return nil } @@ -34,11 +45,11 @@ func (c *OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx conte for curr, currB := range toCompact { prevB := toCompact[prev] if curr == 0 || currB.Thanos.Source == metadata.ReceiveSource || prevB.MaxTime <= currB.MinTime { - // no overlapping with previous blocks, skip it + // no overlapping with previous blocks, skip it prev = curr continue } else if currB.MinTime < prevB.MinTime { - // halt when the assumption is broken, need manual investigation + // halt when the assumption is broken, the input toCompact isn't sorted by minTime, need manual investigation return halt(errors.Errorf("later blocks has smaller minTime than previous block: %s -- %s", prevB.String(), currB.String())) } else if prevB.MaxTime < currB.MaxTime && prevB.MinTime != currB.MinTime { err := errors.Errorf("found partially overlapping block: %s -- %s", prevB.String(), currB.String()) @@ -50,7 +61,14 @@ func (c *OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx conte return halt(err) } } else if prevB.MinTime == currB.MinTime && prevB.MaxTime == currB.MaxTime { - continue + if prevB.Stats.NumSeries != currB.Stats.NumSeries || prevB.Stats.NumSamples != currB.Stats.NumSamples { + level.Warn(logger).Log("msg", "found same time range but different stats, keep both blocks", + "prev", prevB.String(), "prevSeries", prevB.Stats.NumSeries, "prevSamples", prevB.Stats.NumSamples, + "curr", currB.String(), "currSeries", currB.Stats.NumSeries, "currSamples", currB.Stats.NumSamples, + ) + prev = curr + continue + } } // prev min <= curr min < prev max toDelete := -1 @@ -64,7 +82,7 @@ func (c *OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx conte level.Warn(logger).Log("msg", "found overlapping block in plan, keep current block", "toKeep", currB.String(), "toDelete", prevB.String()) } - cg.overlappingBlocks.Inc() + o.overlappingBlocks.Inc() if err := DeleteBlockNow(ctx, logger, cg.bkt, toCompact[toDelete]); err != nil { return retry(err) } @@ -73,11 +91,11 @@ func (c *OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx conte return nil } -func (c *OverlappingCompactionLifecycleCallback) PostCompactionCallback(_ context.Context, _ log.Logger, _ *Group, _ ulid.ULID) error { +func (o OverlappingCompactionLifecycleCallback) PostCompactionCallback(_ context.Context, _ log.Logger, _ *Group, _ ulid.ULID) error { return nil } -func (c *OverlappingCompactionLifecycleCallback) GetBlockPopulator(_ context.Context, _ log.Logger, _ *Group) (tsdb.BlockPopulator, error) { +func (o OverlappingCompactionLifecycleCallback) GetBlockPopulator(_ context.Context, _ log.Logger, _ *Group) (tsdb.BlockPopulator, error) { return tsdb.DefaultBlockPopulator{}, nil } diff --git a/pkg/compact/overlapping_test.go b/pkg/compact/overlapping_test.go index 320fc490f7..ab6d1051bb 100644 --- a/pkg/compact/overlapping_test.go +++ b/pkg/compact/overlapping_test.go @@ -6,13 +6,11 @@ package compact import ( "context" "testing" - "time" "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact/downsample" @@ -24,10 +22,10 @@ func TestFilterNilCompact(t *testing.T) { testutil.Equals(t, 0, len(filtered)) meta := []*metadata.Meta{ - createBlockMeta(6, 1, int64(time.Now().Add(-6*30*24*time.Hour).Unix()*1000), map[string]string{"a": "1"}, downsample.ResLevel0, []uint64{}), + createCustomBlockMeta(6, 1, 3, metadata.CompactorSource, 1), nil, - createBlockMeta(7, 1, int64(time.Now().Add(-4*30*24*time.Hour).Unix()*1000), map[string]string{"b": "2"}, downsample.ResLevel1, []uint64{}), - createBlockMeta(8, 1, int64(time.Now().Add(-7*30*24*time.Hour).Unix()*1000), map[string]string{"a": "1", "b": "2"}, downsample.ResLevel2, []uint64{}), + createCustomBlockMeta(7, 3, 5, metadata.CompactorSource, 2), + createCustomBlockMeta(8, 5, 10, metadata.CompactorSource, 3), nil, } testutil.Equals(t, 3, len(FilterRemovedBlocks(meta))) @@ -37,14 +35,11 @@ func TestPreCompactionCallback(t *testing.T) { reg := prometheus.NewRegistry() logger := log.NewNopLogger() bkt := objstore.NewInMemBucket() - temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for overlapping blocks"}) group := &Group{ - logger: log.NewNopLogger(), - bkt: bkt, - overlappingBlocks: temp, + logger: log.NewNopLogger(), + bkt: bkt, } - labels := map[string]string{"a": "1"} - callback := NewOverlappingCompactionLifecycleCallback() + callback := NewOverlappingCompactionLifecycleCallback(reg, true) for _, tcase := range []struct { testName string input []*metadata.Meta @@ -59,92 +54,108 @@ func TestPreCompactionCallback(t *testing.T) { { testName: "no overlapping blocks", input: []*metadata.Meta{ - createBlockMeta(6, 1, 3, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(7, 3, 5, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(8, 5, 10, labels, downsample.ResLevel0, []uint64{}), + createCustomBlockMeta(6, 1, 3, metadata.CompactorSource, 1), + createCustomBlockMeta(7, 3, 5, metadata.CompactorSource, 1), + createCustomBlockMeta(8, 5, 10, metadata.CompactorSource, 1), }, expectedSize: 3, }, { testName: "duplicated blocks", input: []*metadata.Meta{ - createBlockMeta(6, 1, 7, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(7, 1, 7, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(8, 1, 7, labels, downsample.ResLevel0, []uint64{}), + createCustomBlockMeta(6, 1, 7, metadata.CompactorSource, 1), + createCustomBlockMeta(7, 1, 7, metadata.CompactorSource, 1), + createCustomBlockMeta(8, 1, 7, metadata.CompactorSource, 1), + }, + expectedSize: 1, + expectedBlocks: []*metadata.Meta{ + createCustomBlockMeta(6, 1, 7, metadata.CompactorSource, 1), + }, + }, + { + testName: "overlap non dup blocks", + input: []*metadata.Meta{ + createCustomBlockMeta(6, 1, 7, metadata.CompactorSource, 1), + createCustomBlockMeta(7, 1, 7, metadata.CompactorSource, 2), + createCustomBlockMeta(8, 1, 7, metadata.CompactorSource, 2), + }, + expectedSize: 2, + expectedBlocks: []*metadata.Meta{ + createCustomBlockMeta(6, 1, 7, metadata.CompactorSource, 1), + createCustomBlockMeta(7, 1, 7, metadata.CompactorSource, 2), }, - expectedSize: 3, }, { testName: "receive blocks", input: []*metadata.Meta{ - createReceiveBlockMeta(6, 1, 7, labels), - createReceiveBlockMeta(7, 1, 7, labels), - createReceiveBlockMeta(8, 1, 7, labels), + createCustomBlockMeta(6, 1, 7, metadata.ReceiveSource, 1), + createCustomBlockMeta(7, 1, 7, metadata.ReceiveSource, 2), + createCustomBlockMeta(8, 1, 7, metadata.ReceiveSource, 3), }, expectedSize: 3, }, { testName: "receive + compactor blocks", input: []*metadata.Meta{ - createReceiveBlockMeta(6, 1, 7, labels), - createBlockMeta(7, 2, 7, labels, downsample.ResLevel0, []uint64{}), - createReceiveBlockMeta(8, 2, 8, labels), + createCustomBlockMeta(6, 1, 7, metadata.ReceiveSource, 1), + createCustomBlockMeta(7, 2, 7, metadata.CompactorSource, 1), + createCustomBlockMeta(8, 2, 8, metadata.ReceiveSource, 1), }, expectedSize: 2, expectedBlocks: []*metadata.Meta{ - createReceiveBlockMeta(6, 1, 7, labels), - createReceiveBlockMeta(8, 2, 8, labels), + createCustomBlockMeta(6, 1, 7, metadata.ReceiveSource, 1), + createCustomBlockMeta(8, 2, 8, metadata.ReceiveSource, 1), }, }, { testName: "full overlapping blocks", input: []*metadata.Meta{ - createBlockMeta(6, 1, 10, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(7, 3, 6, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(8, 5, 8, labels, downsample.ResLevel0, []uint64{}), + createCustomBlockMeta(6, 1, 10, metadata.CompactorSource, 1), + createCustomBlockMeta(7, 3, 6, metadata.CompactorSource, 1), + createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1), }, expectedSize: 1, expectedBlocks: []*metadata.Meta{ - createBlockMeta(6, 1, 10, labels, downsample.ResLevel0, []uint64{}), + createCustomBlockMeta(6, 1, 10, metadata.CompactorSource, 1), }, }, { testName: "part overlapping blocks", input: []*metadata.Meta{ - createBlockMeta(1, 1, 2, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(2, 1, 6, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(3, 6, 8, labels, downsample.ResLevel0, []uint64{}), + createCustomBlockMeta(1, 1, 2, metadata.CompactorSource, 1), + createCustomBlockMeta(2, 1, 6, metadata.CompactorSource, 1), + createCustomBlockMeta(3, 6, 8, metadata.CompactorSource, 1), }, expectedSize: 2, expectedBlocks: []*metadata.Meta{ - createBlockMeta(2, 1, 6, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(3, 6, 8, labels, downsample.ResLevel0, []uint64{}), + createCustomBlockMeta(2, 1, 6, metadata.CompactorSource, 1), + createCustomBlockMeta(3, 6, 8, metadata.CompactorSource, 1), }, }, { testName: "out of order blocks", input: []*metadata.Meta{ - createBlockMeta(6, 2, 3, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(7, 0, 5, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(8, 5, 8, labels, downsample.ResLevel0, []uint64{}), + createCustomBlockMeta(6, 2, 3, metadata.CompactorSource, 1), + createCustomBlockMeta(7, 0, 5, metadata.CompactorSource, 1), + createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1), }, err: halt(errors.Errorf("expect halt error")), }, { testName: "partially overlapping blocks with vertical compaction off", input: []*metadata.Meta{ - createBlockMeta(6, 2, 4, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(7, 3, 5, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(8, 5, 8, labels, downsample.ResLevel0, []uint64{}), + createCustomBlockMeta(6, 2, 4, metadata.CompactorSource, 1), + createCustomBlockMeta(7, 3, 6, metadata.CompactorSource, 1), + createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1), }, err: halt(errors.Errorf("expect halt error")), }, { testName: "partially overlapping blocks with vertical compaction on", input: []*metadata.Meta{ - createBlockMeta(6, 2, 4, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(7, 3, 6, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(8, 5, 8, labels, downsample.ResLevel0, []uint64{}), + createCustomBlockMeta(6, 2, 4, metadata.CompactorSource, 1), + createCustomBlockMeta(7, 3, 6, metadata.CompactorSource, 1), + createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1), }, enableVerticalCompaction: true, expectedSize: 3, @@ -172,8 +183,10 @@ func TestPreCompactionCallback(t *testing.T) { } } -func createReceiveBlockMeta(id uint64, minTime, maxTime int64, labels map[string]string) *metadata.Meta { +func createCustomBlockMeta(id uint64, minTime, maxTime int64, source metadata.SourceType, numSeries uint64) *metadata.Meta { + labels := map[string]string{"a": "1"} m := createBlockMeta(id, minTime, maxTime, labels, downsample.ResLevel0, []uint64{}) - m.Thanos.Source = metadata.ReceiveSource + m.Thanos.Source = source + m.Stats.NumSeries = numSeries return m }