Skip to content

Commit

Permalink
[PLAT-104290] fixed some bugs and added unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Yi Jin <[email protected]>
  • Loading branch information
jnyi committed Mar 22, 2024
1 parent 5fc3435 commit 843f712
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 51 deletions.
18 changes: 2 additions & 16 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,13 +588,7 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filter
metas := make(map[ulid.ULID]*metadata.Meta, len(resp.metas))
for id, m := range resp.metas {
metas[id] = m
if tenant, ok := m.Thanos.Labels[metadata.TenantLabel]; ok {
numBlocksByTenant[tenant]++
} else {
level.Error(f.logger).Log("msg", "found blocks without label "+metadata.TenantLabel,
"block", m.String(), "level", m.Compaction.Level, "labels", m.Thanos.GetLabels())
numBlocksByTenant[metadata.DefaultTenant]++
}
numBlocksByTenant[m.Thanos.GetTenant()]++
}

for tenant, numBlocks := range numBlocksByTenant {
Expand All @@ -616,15 +610,7 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filter
// Therefore, it's skipped to update the gauge.
if len(filters) > 0 {
for _, m := range metas {
var tenant string
var ok bool
// tenant and replica will have the zero value ("") if the key is not in the map.
if tenant, ok = m.Thanos.Labels[metadata.TenantLabel]; !ok {
level.Error(f.logger).Log("msg", "found blocks without label "+metadata.TenantLabel,
"block", m.String(), "level", m.Compaction.Level, "labels", m.Thanos.GetLabels())
tenant = metadata.DefaultTenant
}
metrics.Assigned.WithLabelValues(tenant, strconv.Itoa(m.BlockMeta.Compaction.Level)).Inc()
metrics.Assigned.WithLabelValues(m.Thanos.GetTenant(), strconv.Itoa(m.BlockMeta.Compaction.Level)).Inc()
}
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ func (m *Thanos) ParseExtensions(v any) (any, error) {
return ConvertExtensions(m.Extensions, v)
}

func (m *Thanos) GetTenant() string {
if tenant, ok := m.Labels[TenantLabel]; ok {
return tenant
} else {
return DefaultTenant
}
}

func (m *Thanos) GetLabels() string {
b := new(bytes.Buffer)
for k, v := range m.Labels {
Expand Down
6 changes: 3 additions & 3 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func NewDefaultGrouper(
overlappingBlocks: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_compact_group_overlapping_blocks_total",
Help: "Total number of blocks that are overlapping and to be deleted.",
}, []string{"resolution", "level"}),
}, []string{"resolution", "tenant"}),
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
garbageCollectedBlocks: garbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
Expand Down Expand Up @@ -359,7 +359,7 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro
g.compactionRunsCompleted.WithLabelValues(resolutionLabel),
g.compactionFailures.WithLabelValues(resolutionLabel),
g.verticalCompactions.WithLabelValues(resolutionLabel),
g.overlappingBlocks.WithLabelValues(resolutionLabel, fmt.Sprintf("%d", m.Compaction.Level)),
g.overlappingBlocks.WithLabelValues(resolutionLabel, m.Thanos.GetLabels()),
g.garbageCollectedBlocks,
g.blocksMarkedForDeletion,
g.blocksMarkedForNoCompact,
Expand Down Expand Up @@ -1126,7 +1126,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
if err := compactionLifecycleCallback.PreCompactionCallback(ctx, cg.logger, cg, toCompact); err != nil {
return false, ulid.ULID{}, errors.Wrapf(err, "failed to run pre compaction callback for plan: %s", fmt.Sprintf("%v", toCompact))
}
toCompact = FilterNilBlocks(toCompact)
toCompact = FilterRemovedBlocks(toCompact)
level.Info(cg.logger).Log("msg", "finished running pre compaction callback; downloading blocks", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds(), "plan", fmt.Sprintf("%v", toCompact))

begin = time.Now()
Expand Down
70 changes: 40 additions & 30 deletions pkg/compact/overlapping.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package compact

import (
"context"
"fmt"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"os"
"path/filepath"
)

type OverlappingCompactionLifecycleCallback struct {
overlappingBlocks prometheus.Counter
metaDir string
}

func NewOverlappingCompactionLifecycleCallback() *OverlappingCompactionLifecycleCallback {
Expand All @@ -30,34 +30,45 @@ func (c *OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx conte
if len(toCompact) == 0 {
return nil
}
previous := 0
for i, m := range toCompact {
kept := toCompact[previous]
if previous == 0 || m.Thanos.Source == metadata.ReceiveSource || kept.MaxTime <= m.MinTime {
prev := 0
for curr, currB := range toCompact {
prevB := toCompact[prev]
if curr == 0 || currB.Thanos.Source == metadata.ReceiveSource || prevB.MaxTime <= currB.MinTime {
// no overlapping with previous blocks, skip it
previous = i
prev = curr
continue
} else if m.MinTime < kept.MinTime {
} else if currB.MinTime < prevB.MinTime {
// halt when the assumption is broken, need manual investigation
return halt(errors.Errorf("later blocks has smaller minTime than previous block: %s -- %s", kept.String(), m.String()))
}
if kept.MaxTime >= m.MaxTime {
level.Warn(logger).Log("msg", "found overlapping block in plan",
"toKeep", kept.String(), "toDelete", m.String())
cg.overlappingBlocks.Inc()
if err := DeleteBlockNow(ctx, logger, cg.bkt, m, c.metaDir); err != nil {
return retry(err)
}
toCompact[i] = nil
} else {
err := errors.Errorf("found partially overlapping block: %s -- %s", kept.String(), m.String())
return halt(errors.Errorf("later blocks has smaller minTime than previous block: %s -- %s", prevB.String(), currB.String()))
} else if prevB.MaxTime < currB.MaxTime && prevB.MinTime != currB.MinTime {
err := errors.Errorf("found partially overlapping block: %s -- %s", prevB.String(), currB.String())
if cg.enableVerticalCompaction {
level.Error(logger).Log("msg", "best effort to vertical compact", "err", err)
previous = i // move to next block
prev = curr
continue
} else {
return halt(err)
}
} else if prevB.MinTime == currB.MinTime && prevB.MaxTime == currB.MaxTime {
continue
}
// prev min <= curr min < prev max
toDelete := -1
if prevB.MaxTime >= currB.MaxTime {
toDelete = curr
level.Warn(logger).Log("msg", "found overlapping block in plan, keep previous block",
"toKeep", prevB.String(), "toDelete", currB.String())
} else if prevB.MaxTime < currB.MaxTime {
toDelete = prev
prev = curr
level.Warn(logger).Log("msg", "found overlapping block in plan, keep current block",
"toKeep", currB.String(), "toDelete", prevB.String())
}
cg.overlappingBlocks.Inc()
if err := DeleteBlockNow(ctx, logger, cg.bkt, toCompact[toDelete]); err != nil {
return retry(err)
}
toCompact[toDelete] = nil
}
return nil
}
Expand All @@ -70,7 +81,7 @@ func (c *OverlappingCompactionLifecycleCallback) GetBlockPopulator(_ context.Con
return tsdb.DefaultBlockPopulator{}, nil
}

func FilterNilBlocks(blocks []*metadata.Meta) (res []*metadata.Meta) {
func FilterRemovedBlocks(blocks []*metadata.Meta) (res []*metadata.Meta) {
for _, b := range blocks {
if b != nil {
res = append(res, b)
Expand All @@ -79,14 +90,13 @@ func FilterNilBlocks(blocks []*metadata.Meta) (res []*metadata.Meta) {
return res
}

func DeleteBlockNow(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string) error {
func DeleteBlockNow(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta) error {
level.Warn(logger).Log("msg", "delete polluted block immediately", "block", m.String(),
"level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels())
"level", m.Compaction.Level, "parents", fmt.Sprintf("%v", m.Compaction.Parents),
"resolution", m.Thanos.Downsample.Resolution, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels(),
"series", m.Stats.NumSeries, "samples", m.Stats.NumSamples, "chunks", m.Stats.NumChunks)
if err := block.Delete(ctx, logger, bkt, m.ULID); err != nil {
return errors.Wrapf(err, "delete overlapping block %s", m.String())
}
if err := os.RemoveAll(filepath.Join(dir, m.ULID.String())); err != nil {
return errors.Wrapf(err, "remove old block dir %s", m.String())
}
return nil
}
170 changes: 168 additions & 2 deletions pkg/compact/overlapping_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,179 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package compact

import (
"context"
"testing"
"time"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block/metadata"
"testing"
"github.com/thanos-io/thanos/pkg/compact/downsample"
)

func TestFilterNilCompact(t *testing.T) {
blocks := []*metadata.Meta{nil, nil}
filtered := FilterNilBlocks(blocks)
filtered := FilterRemovedBlocks(blocks)
testutil.Equals(t, 0, len(filtered))

meta := []*metadata.Meta{
createBlockMeta(6, 1, int64(time.Now().Add(-6*30*24*time.Hour).Unix()*1000), map[string]string{"a": "1"}, downsample.ResLevel0, []uint64{}),
nil,
createBlockMeta(7, 1, int64(time.Now().Add(-4*30*24*time.Hour).Unix()*1000), map[string]string{"b": "2"}, downsample.ResLevel1, []uint64{}),
createBlockMeta(8, 1, int64(time.Now().Add(-7*30*24*time.Hour).Unix()*1000), map[string]string{"a": "1", "b": "2"}, downsample.ResLevel2, []uint64{}),
nil,
}
testutil.Equals(t, 3, len(FilterRemovedBlocks(meta)))
}

func TestPreCompactionCallback(t *testing.T) {
reg := prometheus.NewRegistry()
logger := log.NewNopLogger()
bkt := objstore.NewInMemBucket()
temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for overlapping blocks"})
group := &Group{
logger: log.NewNopLogger(),
bkt: bkt,
overlappingBlocks: temp,
}
labels := map[string]string{"a": "1"}
callback := NewOverlappingCompactionLifecycleCallback()
for _, tcase := range []struct {
testName string
input []*metadata.Meta
enableVerticalCompaction bool
expectedSize int
expectedBlocks []*metadata.Meta
err error
}{
{
testName: "empty blocks",
},
{
testName: "no overlapping blocks",
input: []*metadata.Meta{
createBlockMeta(6, 1, 3, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(7, 3, 5, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(8, 5, 10, labels, downsample.ResLevel0, []uint64{}),
},
expectedSize: 3,
},
{
testName: "duplicated blocks",
input: []*metadata.Meta{
createBlockMeta(6, 1, 7, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(7, 1, 7, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(8, 1, 7, labels, downsample.ResLevel0, []uint64{}),
},
expectedSize: 3,
},
{
testName: "receive blocks",
input: []*metadata.Meta{
createReceiveBlockMeta(6, 1, 7, labels),
createReceiveBlockMeta(7, 1, 7, labels),
createReceiveBlockMeta(8, 1, 7, labels),
},
expectedSize: 3,
},
{
testName: "receive + compactor blocks",
input: []*metadata.Meta{
createReceiveBlockMeta(6, 1, 7, labels),
createBlockMeta(7, 2, 7, labels, downsample.ResLevel0, []uint64{}),
createReceiveBlockMeta(8, 2, 8, labels),
},
expectedSize: 2,
expectedBlocks: []*metadata.Meta{
createReceiveBlockMeta(6, 1, 7, labels),
createReceiveBlockMeta(8, 2, 8, labels),
},
},
{
testName: "full overlapping blocks",
input: []*metadata.Meta{
createBlockMeta(6, 1, 10, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(7, 3, 6, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(8, 5, 8, labels, downsample.ResLevel0, []uint64{}),
},
expectedSize: 1,
expectedBlocks: []*metadata.Meta{
createBlockMeta(6, 1, 10, labels, downsample.ResLevel0, []uint64{}),
},
},
{
testName: "part overlapping blocks",
input: []*metadata.Meta{
createBlockMeta(1, 1, 2, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(2, 1, 6, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(3, 6, 8, labels, downsample.ResLevel0, []uint64{}),
},
expectedSize: 2,
expectedBlocks: []*metadata.Meta{
createBlockMeta(2, 1, 6, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(3, 6, 8, labels, downsample.ResLevel0, []uint64{}),
},
},
{
testName: "out of order blocks",
input: []*metadata.Meta{
createBlockMeta(6, 2, 3, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(7, 0, 5, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(8, 5, 8, labels, downsample.ResLevel0, []uint64{}),
},
err: halt(errors.Errorf("expect halt error")),
},
{
testName: "partially overlapping blocks with vertical compaction off",
input: []*metadata.Meta{
createBlockMeta(6, 2, 4, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(7, 3, 5, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(8, 5, 8, labels, downsample.ResLevel0, []uint64{}),
},
err: halt(errors.Errorf("expect halt error")),
},
{
testName: "partially overlapping blocks with vertical compaction on",
input: []*metadata.Meta{
createBlockMeta(6, 2, 4, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(7, 3, 6, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(8, 5, 8, labels, downsample.ResLevel0, []uint64{}),
},
enableVerticalCompaction: true,
expectedSize: 3,
},
} {
if ok := t.Run(tcase.testName, func(t *testing.T) {
group.enableVerticalCompaction = tcase.enableVerticalCompaction
err := callback.PreCompactionCallback(context.Background(), logger, group, tcase.input)
if tcase.err != nil {
testutil.NotOk(t, err)
if IsHaltError(tcase.err) {
testutil.Assert(t, IsHaltError(err), "expected halt error")
} else if IsRetryError(tcase.err) {
testutil.Assert(t, IsRetryError(err), "expected retry error")
}
return
}
testutil.Equals(t, tcase.expectedSize, len(FilterRemovedBlocks(tcase.input)))
if tcase.expectedSize != len(tcase.input) {
testutil.Equals(t, tcase.expectedBlocks, FilterRemovedBlocks(tcase.input))
}
}); !ok {
return
}
}
}

func createReceiveBlockMeta(id uint64, minTime, maxTime int64, labels map[string]string) *metadata.Meta {
m := createBlockMeta(id, minTime, maxTime, labels, downsample.ResLevel0, []uint64{})
m.Thanos.Source = metadata.ReceiveSource
return m
}

0 comments on commit 843f712

Please sign in to comment.