diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 0420cef7dac..0f93300e61f 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -591,8 +591,6 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filter if tenant, ok := m.Thanos.Labels[metadata.TenantLabel]; ok { numBlocksByTenant[tenant]++ } else { - level.Error(f.logger).Log("msg", "found blocks without label "+metadata.TenantLabel, - "block", m.String(), "level", m.Compaction.Level, "labels", m.Thanos.GetLabels()) numBlocksByTenant[metadata.DefaultTenant]++ } } @@ -620,8 +618,6 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filter var ok bool // tenant and replica will have the zero value ("") if the key is not in the map. if tenant, ok = m.Thanos.Labels[metadata.TenantLabel]; !ok { - level.Error(f.logger).Log("msg", "found blocks without label "+metadata.TenantLabel, - "block", m.String(), "level", m.Compaction.Level, "labels", m.Thanos.GetLabels()) tenant = metadata.DefaultTenant } metrics.Assigned.WithLabelValues(tenant, strconv.Itoa(m.BlockMeta.Compaction.Level)).Inc() diff --git a/pkg/compact/overlapping.go b/pkg/compact/overlapping.go index 1f211ce5539..c621d705472 100644 --- a/pkg/compact/overlapping.go +++ b/pkg/compact/overlapping.go @@ -1,23 +1,23 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + package compact import ( "context" + "fmt" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" - "os" - "path/filepath" ) type OverlappingCompactionLifecycleCallback struct { - overlappingBlocks prometheus.Counter - metaDir string } func NewOverlappingCompactionLifecycleCallback() *OverlappingCompactionLifecycleCallback { @@ -30,34 +30,45 @@ func (c *OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx conte if len(toCompact) == 0 { return nil } - previous := 0 - for i, m := range toCompact { - kept := toCompact[previous] - if previous == 0 || m.Thanos.Source == metadata.ReceiveSource || kept.MaxTime <= m.MinTime { + prev := 0 + for curr, currB := range toCompact { + prevB := toCompact[prev] + if curr == 0 || currB.Thanos.Source == metadata.ReceiveSource || prevB.MaxTime <= currB.MinTime { // no overlapping with previous blocks, skip it - previous = i + prev = curr continue - } else if m.MinTime < kept.MinTime { + } else if currB.MinTime < prevB.MinTime { // halt when the assumption is broken, need manual investigation - return halt(errors.Errorf("later blocks has smaller minTime than previous block: %s -- %s", kept.String(), m.String())) - } - if kept.MaxTime >= m.MaxTime { - level.Warn(logger).Log("msg", "found overlapping block in plan", - "toKeep", kept.String(), "toDelete", m.String()) - cg.overlappingBlocks.Inc() - if err := DeleteBlockNow(ctx, logger, cg.bkt, m, c.metaDir); err != nil { - return retry(err) - } - toCompact[i] = nil - } else { - err := errors.Errorf("found partially overlapping block: %s -- %s", kept.String(), m.String()) + return halt(errors.Errorf("later blocks has smaller minTime than previous block: %s -- %s", prevB.String(), currB.String())) + } else if prevB.MaxTime < currB.MaxTime && prevB.MinTime != currB.MinTime { + err := errors.Errorf("found partially overlapping block: %s -- %s", prevB.String(), currB.String()) if cg.enableVerticalCompaction { level.Error(logger).Log("msg", "best effort to vertical compact", "err", err) - previous = i // move to next block + prev = curr + continue } else { return halt(err) } + } else if prevB.MinTime == currB.MinTime && prevB.MaxTime == currB.MaxTime { + continue + } + // prev min <= curr min < prev max + toDelete := -1 + if prevB.MaxTime >= currB.MaxTime { + toDelete = curr + level.Warn(logger).Log("msg", "found overlapping block in plan, keep previous block", + "toKeep", prevB.String(), "toDelete", currB.String()) + } else if prevB.MaxTime < currB.MaxTime { + toDelete = prev + prev = curr + level.Warn(logger).Log("msg", "found overlapping block in plan, keep current block", + "toKeep", currB.String(), "toDelete", prevB.String()) } + cg.overlappingBlocks.Inc() + if err := DeleteBlockNow(ctx, logger, cg.bkt, toCompact[toDelete]); err != nil { + return retry(err) + } + toCompact[toDelete] = nil } return nil } @@ -79,14 +90,13 @@ func FilterNilBlocks(blocks []*metadata.Meta) (res []*metadata.Meta) { return res } -func DeleteBlockNow(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string) error { +func DeleteBlockNow(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta) error { level.Warn(logger).Log("msg", "delete polluted block immediately", "block", m.String(), - "level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels()) + "level", m.Compaction.Level, "parents", fmt.Sprintf("%v", m.Compaction.Parents), + "resolution", m.Thanos.Downsample.Resolution, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels(), + "series", m.Stats.NumSeries, "samples", m.Stats.NumSamples, "chunks", m.Stats.NumChunks) if err := block.Delete(ctx, logger, bkt, m.ULID); err != nil { return errors.Wrapf(err, "delete overlapping block %s", m.String()) } - if err := os.RemoveAll(filepath.Join(dir, m.ULID.String())); err != nil { - return errors.Wrapf(err, "remove old block dir %s", m.String()) - } return nil } diff --git a/pkg/compact/overlapping_test.go b/pkg/compact/overlapping_test.go index 78f248156c7..42eda514e1c 100644 --- a/pkg/compact/overlapping_test.go +++ b/pkg/compact/overlapping_test.go @@ -1,13 +1,179 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + package compact import ( + "context" + "testing" + "time" + "github.com/efficientgo/core/testutil" + "github.com/go-kit/log" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block/metadata" - "testing" + "github.com/thanos-io/thanos/pkg/compact/downsample" ) func TestFilterNilCompact(t *testing.T) { blocks := []*metadata.Meta{nil, nil} filtered := FilterNilBlocks(blocks) testutil.Equals(t, 0, len(filtered)) + + meta := []*metadata.Meta{ + createBlockMeta(6, 1, int64(time.Now().Add(-6*30*24*time.Hour).Unix()*1000), map[string]string{"a": "1"}, downsample.ResLevel0, []uint64{}), + nil, + createBlockMeta(7, 1, int64(time.Now().Add(-4*30*24*time.Hour).Unix()*1000), map[string]string{"b": "2"}, downsample.ResLevel1, []uint64{}), + createBlockMeta(8, 1, int64(time.Now().Add(-7*30*24*time.Hour).Unix()*1000), map[string]string{"a": "1", "b": "2"}, downsample.ResLevel2, []uint64{}), + nil, + } + testutil.Equals(t, 3, len(FilterNilBlocks(meta))) +} + +func TestPreCompactionCallback(t *testing.T) { + reg := prometheus.NewRegistry() + logger := log.NewNopLogger() + bkt := objstore.NewInMemBucket() + temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for overlapping blocks"}) + group := &Group{ + logger: log.NewNopLogger(), + bkt: bkt, + overlappingBlocks: temp, + } + labels := map[string]string{"a": "1"} + callback := NewOverlappingCompactionLifecycleCallback() + for _, tcase := range []struct { + testName string + input []*metadata.Meta + enableVerticalCompaction bool + expectedSize int + expectedBlocks []*metadata.Meta + err error + }{ + { + testName: "empty blocks", + }, + { + testName: "no overlapping blocks", + input: []*metadata.Meta{ + createBlockMeta(6, 1, 3, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(7, 3, 5, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(8, 5, 10, labels, downsample.ResLevel0, []uint64{}), + }, + expectedSize: 3, + }, + { + testName: "duplicated blocks", + input: []*metadata.Meta{ + createBlockMeta(6, 1, 7, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(7, 1, 7, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(8, 1, 7, labels, downsample.ResLevel0, []uint64{}), + }, + expectedSize: 3, + }, + { + testName: "receive blocks", + input: []*metadata.Meta{ + createReceiveBlockMeta(6, 1, 7, labels), + createReceiveBlockMeta(7, 1, 7, labels), + createReceiveBlockMeta(8, 1, 7, labels), + }, + expectedSize: 3, + }, + { + testName: "receive + compactor blocks", + input: []*metadata.Meta{ + createReceiveBlockMeta(6, 1, 7, labels), + createBlockMeta(7, 2, 7, labels, downsample.ResLevel0, []uint64{}), + createReceiveBlockMeta(8, 2, 8, labels), + }, + expectedSize: 2, + expectedBlocks: []*metadata.Meta{ + createReceiveBlockMeta(6, 1, 7, labels), + createReceiveBlockMeta(8, 2, 8, labels), + }, + }, + { + testName: "full overlapping blocks", + input: []*metadata.Meta{ + createBlockMeta(6, 1, 10, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(7, 3, 6, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(8, 5, 8, labels, downsample.ResLevel0, []uint64{}), + }, + expectedSize: 1, + expectedBlocks: []*metadata.Meta{ + createBlockMeta(6, 1, 10, labels, downsample.ResLevel0, []uint64{}), + }, + }, + { + testName: "part overlapping blocks", + input: []*metadata.Meta{ + createBlockMeta(1, 1, 2, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(2, 1, 6, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(3, 6, 8, labels, downsample.ResLevel0, []uint64{}), + }, + expectedSize: 2, + expectedBlocks: []*metadata.Meta{ + createBlockMeta(2, 1, 6, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(3, 6, 8, labels, downsample.ResLevel0, []uint64{}), + }, + }, + { + testName: "out of order blocks", + input: []*metadata.Meta{ + createBlockMeta(6, 2, 3, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(7, 0, 5, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(8, 5, 8, labels, downsample.ResLevel0, []uint64{}), + }, + err: halt(errors.Errorf("expect halt error")), + }, + { + testName: "partially overlapping blocks with vertical compaction off", + input: []*metadata.Meta{ + createBlockMeta(6, 2, 4, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(7, 3, 5, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(8, 5, 8, labels, downsample.ResLevel0, []uint64{}), + }, + err: halt(errors.Errorf("expect halt error")), + }, + { + testName: "partially overlapping blocks with vertical compaction on", + input: []*metadata.Meta{ + createBlockMeta(6, 2, 4, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(7, 3, 6, labels, downsample.ResLevel0, []uint64{}), + createBlockMeta(8, 5, 8, labels, downsample.ResLevel0, []uint64{}), + }, + enableVerticalCompaction: true, + expectedSize: 3, + }, + } { + if ok := t.Run(tcase.testName, func(t *testing.T) { + group.enableVerticalCompaction = tcase.enableVerticalCompaction + err := callback.PreCompactionCallback(context.Background(), logger, group, tcase.input) + if tcase.err != nil { + testutil.NotOk(t, err) + if IsHaltError(tcase.err) { + testutil.Assert(t, IsHaltError(err), "expected halt error") + } else if IsRetryError(tcase.err) { + testutil.Assert(t, IsRetryError(err), "expected retry error") + } + return + } + testutil.Equals(t, tcase.expectedSize, len(FilterNilBlocks(tcase.input))) + if tcase.expectedSize != len(tcase.input) { + testutil.Equals(t, tcase.expectedBlocks, FilterNilBlocks(tcase.input)) + } + }); !ok { + return + } + } +} + +func createReceiveBlockMeta(id uint64, minTime, maxTime int64, labels map[string]string) *metadata.Meta { + m := createBlockMeta(id, minTime, maxTime, labels, downsample.ResLevel0, []uint64{}) + m.Thanos.Source = metadata.ReceiveSource + return m }