forked from thanos-io/thanos
-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[PLAT-104290] remove overlapping blocks into its own file
Signed-off-by: Yi Jin <[email protected]>
- Loading branch information
Showing
3 changed files
with
105 additions
and
53 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
package compact | ||
|
||
import ( | ||
"context" | ||
"github.com/go-kit/log" | ||
"github.com/go-kit/log/level" | ||
"github.com/oklog/ulid" | ||
"github.com/pkg/errors" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/prometheus/tsdb" | ||
"github.com/thanos-io/objstore" | ||
"github.com/thanos-io/thanos/pkg/block" | ||
"github.com/thanos-io/thanos/pkg/block/metadata" | ||
"os" | ||
"path/filepath" | ||
) | ||
|
||
type OverlappingCompactionLifecycleCallback struct { | ||
overlappingBlocks prometheus.Counter | ||
metaDir string | ||
} | ||
|
||
func NewOverlappingCompactionLifecycleCallback() *OverlappingCompactionLifecycleCallback { | ||
return &OverlappingCompactionLifecycleCallback{} | ||
} | ||
|
||
// PreCompactionCallback given the assumption that toCompact is sorted by MinTime in ascending order from Planner | ||
// (not guaranteed on MaxTime order), we will detect overlapping blocks and delete them while retaining all others. | ||
func (c *OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx context.Context, logger log.Logger, cg *Group, toCompact []*metadata.Meta) error { | ||
if len(toCompact) == 0 { | ||
return nil | ||
} | ||
previous := 0 | ||
for i, m := range toCompact { | ||
kept := toCompact[previous] | ||
if m.Thanos.Source == metadata.ReceiveSource || kept.MaxTime <= m.MinTime { | ||
// no overlapping with previous blocks, skip it | ||
previous = i | ||
continue | ||
} | ||
if kept.MaxTime >= m.MaxTime { | ||
level.Warn(logger).Log("msg", "found overlapping block in plan", | ||
"toKeep", kept.String(), "toDelete", m.String()) | ||
cg.overlappingBlocks.Inc() | ||
if err := DeleteBlockNow(ctx, logger, cg.bkt, m, c.metaDir); err != nil { | ||
return retry(err) | ||
} | ||
toCompact[i] = nil | ||
} else { | ||
err := errors.Errorf("found partially overlapping block: %s vs %s", kept.String(), m.String()) | ||
if cg.enableVerticalCompaction { | ||
level.Error(logger).Log("msg", "best effort to vertical compact", "err", err) | ||
previous = i // move to next block | ||
} else { | ||
return halt(err) | ||
} | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (c *OverlappingCompactionLifecycleCallback) PostCompactionCallback(_ context.Context, _ log.Logger, _ *Group, _ ulid.ULID) error { | ||
return nil | ||
} | ||
|
||
func (c *OverlappingCompactionLifecycleCallback) GetBlockPopulator(_ context.Context, _ log.Logger, _ *Group) (tsdb.BlockPopulator, error) { | ||
return tsdb.DefaultBlockPopulator{}, nil | ||
} | ||
|
||
func FilterNilBlocks(blocks []*metadata.Meta) (res []*metadata.Meta) { | ||
for _, b := range blocks { | ||
if b != nil { | ||
res = append(res, b) | ||
} | ||
} | ||
return res | ||
} | ||
|
||
func DeleteBlockNow(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string) error { | ||
level.Warn(logger).Log("msg", "delete polluted block immediately", "block", m.String(), | ||
"level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels()) | ||
if err := block.Delete(ctx, logger, bkt, m.ULID); err != nil { | ||
return errors.Wrapf(err, "delete overlapping block %s", m.String()) | ||
} | ||
if err := os.RemoveAll(filepath.Join(dir, m.ULID.String())); err != nil { | ||
return errors.Wrapf(err, "remove old block dir %s", m.String()) | ||
} | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
package compact | ||
|
||
import ( | ||
"github.com/efficientgo/core/testutil" | ||
"github.com/thanos-io/thanos/pkg/block/metadata" | ||
"testing" | ||
) | ||
|
||
func TestFilterNilCompact(t *testing.T) { | ||
blocks := []*metadata.Meta{nil, nil} | ||
filtered := FilterNilBlocks(blocks) | ||
testutil.Equals(t, 0, len(filtered)) | ||
} |