diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 98f9699a43..f2a8290afe 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -436,6 +436,12 @@ func runCompact( level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h]) } + retentionByTenant, err := compact.ParesRetentionPolicyByTenant(logger, *conf.retentionTenants) + if err != nil { + level.Error(logger).Log("msg", "failed to parse retention policy by tenant", "err", err) + return err + } + var cleanMtx sync.Mutex // TODO(GiedriusS): we could also apply retention policies here but the logic would be a bit more complex. cleanPartialMarked := func() error { @@ -534,6 +540,10 @@ func runCompact( return errors.Wrap(err, "sync before retention") } + if err := compact.ApplyRetentionPolicyByTenant(ctx, logger, insBkt, sy.Metas(), retentionByTenant, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, metadata.TenantRetentionExpired)); err != nil { + return errors.Wrap(err, "retention by tenant failed") + } + if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, insBkt, sy.Metas(), retentionByResolution, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, "")); err != nil { return errors.Wrap(err, "retention failed") } @@ -726,6 +736,7 @@ type compactConfig struct { objStore extflag.PathOrContent consistencyDelay time.Duration retentionRaw, retentionFiveMin, retentionOneHr model.Duration + retentionTenants *[]string wait bool waitInterval time.Duration disableDownsampling bool @@ -781,6 +792,7 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { Default("0d").SetValue(&cc.retentionFiveMin) cmd.Flag("retention.resolution-1h", "How long to retain samples of resolution 2 (1 hour) in bucket. Setting this to 0d will retain samples of this resolution forever"). Default("0d").SetValue(&cc.retentionOneHr) + cc.retentionTenants = cmd.Flag("retention.tenant", "How long to retain samples in bucket per tenant. Setting this to 0d will retain samples of this resolution forever").Strings() // TODO(kakkoyun, pgough): https://github.com/thanos-io/thanos/issues/2266. cmd.Flag("wait", "Do not exit after all compactions have been processed and wait for new work."). diff --git a/pkg/block/metadata/markers.go b/pkg/block/metadata/markers.go index 0a351a5fab..416178eb74 100644 --- a/pkg/block/metadata/markers.go +++ b/pkg/block/metadata/markers.go @@ -81,6 +81,8 @@ const ( OutOfOrderChunksNoCompactReason = "block-index-out-of-order-chunk" // DownsampleVerticalCompactionNoCompactReason is a reason to not compact overlapping downsampled blocks as it does not make sense e.g. how to vertically compact the average. DownsampleVerticalCompactionNoCompactReason = "downsample-vertical-compaction" + // TenantRetentionExpired is a reason to delete block as it's per tenant retention is expired. + TenantRetentionExpired = "tenant-retention-expired" ) // NoCompactMark marker stores reason of block being excluded from compaction if needed. diff --git a/pkg/compact/retention.go b/pkg/compact/retention.go index 40ea0a1a72..3d95640241 100644 --- a/pkg/compact/retention.go +++ b/pkg/compact/retention.go @@ -6,6 +6,7 @@ package compact import ( "context" "fmt" + "regexp" "time" "github.com/go-kit/log" @@ -13,12 +14,19 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" ) +const ( + // tenantRetentionRegex is the regex pattern for parsing tenant retention. + // valid format is `:(|d)` where > 0. + tenantRetentionRegex = `^([\w-]+):((\d{4}-\d{2}-\d{2})|(\d+d))$` +) + // ApplyRetentionPolicyByResolution removes blocks depending on the specified retentionByResolution based on blocks MaxTime. // A value of 0 disables the retention for its resolution. func ApplyRetentionPolicyByResolution( @@ -47,3 +55,79 @@ func ApplyRetentionPolicyByResolution( level.Info(logger).Log("msg", "optional retention apply done") return nil } + +type RetentionPolicy struct { + CutoffDate time.Time + RetentionDuration time.Duration +} + +func (r RetentionPolicy) isExpired(blockMaxTime time.Time) bool { + if r.CutoffDate.IsZero() { + return time.Now().After(blockMaxTime.Add(r.RetentionDuration)) + } + return r.CutoffDate.After(blockMaxTime) +} + +func ParesRetentionPolicyByTenant(logger log.Logger, retentionTenants []string) (map[string]RetentionPolicy, error) { + pattern := regexp.MustCompile(tenantRetentionRegex) + retentionByTenant := make(map[string]RetentionPolicy, len(retentionTenants)) + for _, tenantRetention := range retentionTenants { + matches := pattern.FindStringSubmatch(tenantRetention) + invalidFormat := errors.Errorf("invalid retention format for tenant: %s, must be `:(|d)`", tenantRetention) + if len(matches) != 5 { + return nil, errors.Wrapf(invalidFormat, "matched size %d", len(matches)) + } + tenant := matches[1] + var policy RetentionPolicy + if _, ok := retentionByTenant[tenant]; ok { + return nil, errors.Errorf("duplicate retention policy for tenant: %s", tenant) + } + if cutoffDate, err := time.Parse(time.DateOnly, matches[3]); matches[3] != "" { + if err != nil { + return nil, errors.Wrapf(invalidFormat, "error parsing cutoff date: %v", err) + } + policy.CutoffDate = cutoffDate + } + if duration, err := model.ParseDuration(matches[4]); matches[4] != "" { + if err != nil { + return nil, errors.Wrapf(invalidFormat, "error parsing duration: %v", err) + } else if duration == 0 { + return nil, errors.Wrapf(invalidFormat, "duration must be greater than 0") + } + policy.RetentionDuration = time.Duration(duration) + } + level.Info(logger).Log("msg", "retention policy for tenant is enabled", "tenant", tenant, "retention policy", fmt.Sprintf("%v", policy)) + retentionByTenant[tenant] = policy + } + return retentionByTenant, nil +} + +// ApplyRetentionPolicyByTenant removes blocks depending on the specified retentionByTenant based on blocks MaxTime. +func ApplyRetentionPolicyByTenant( + ctx context.Context, + logger log.Logger, + bkt objstore.Bucket, + metas map[ulid.ULID]*metadata.Meta, + retentionByTenant map[string]RetentionPolicy, + blocksMarkedForDeletion prometheus.Counter) error { + if len(retentionByTenant) == 0 { + level.Info(logger).Log("msg", "tenant retention is disabled due to no policy") + return nil + } + level.Info(logger).Log("msg", "start tenant retention") + for id, m := range metas { + policy, ok := retentionByTenant[m.Thanos.GetTenant()] + if !ok { + continue + } + maxTime := time.Unix(m.MaxTime/1000, 0) + if policy.isExpired(maxTime) { + level.Info(logger).Log("msg", "applying retention: marking block for deletion", "id", id, "maxTime", maxTime.String()) + if err := block.MarkForDeletion(ctx, logger, bkt, id, fmt.Sprintf("block exceeding retention of %v", policy), blocksMarkedForDeletion); err != nil { + return errors.Wrap(err, "delete block") + } + } + } + level.Info(logger).Log("msg", "tenant retention apply done") + return nil +} diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go index d883f23fea..e4e404b8e4 100644 --- a/pkg/compact/retention_test.go +++ b/pkg/compact/retention_test.go @@ -306,3 +306,266 @@ func uploadMockBlock(t *testing.T, bkt objstore.Bucket, id string, minTime, maxT testutil.Ok(t, bkt.Upload(context.Background(), id+"/chunks/000002", strings.NewReader("@test-data@"))) testutil.Ok(t, bkt.Upload(context.Background(), id+"/chunks/000003", strings.NewReader("@test-data@"))) } + +func TestParseRetentionPolicyByTenant(t *testing.T) { + t.Parallel() + + for _, tt := range []struct { + name string + retentionTenants []string + expected map[string]compact.RetentionPolicy + expectedErr bool + }{ + { + "empty", + []string{}, + map[string]compact.RetentionPolicy{}, + false, + }, + { + "valid", + []string{"tenant-1:2021-01-01", "tenant-2:11d"}, + map[string]compact.RetentionPolicy{ + "tenant-1": { + CutoffDate: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + RetentionDuration: time.Duration(0), + }, + "tenant-2": { + CutoffDate: time.Time{}, + RetentionDuration: 11 * 24 * time.Hour, + }, + }, + false, + }, + { + "invalid tenant", + []string{"tenant1:2021-01-01", "tenant#2:1d"}, + nil, + true, + }, + { + "invalid date", + []string{"tenant1:2021-010-01", "tenant2:1d"}, + nil, + true, + }, + { + "invalid duration", + []string{"tenant1:2021-01-01", "tenant2:1w"}, + nil, + true, + }, + { + "invalid duration which is 0", + []string{"tenant1:2021-01-01", "tenant2:0d"}, + nil, + true, + }, + } { + t.Run(tt.name, func(t *testing.T) { + got, err := compact.ParesRetentionPolicyByTenant(log.NewNopLogger(), tt.retentionTenants) + if (err != nil) != tt.expectedErr { + t.Errorf("ParseRetentionPolicyByTenant() error = %v, wantErr %v", err, tt.expectedErr) + return + } + testutil.Equals(t, got, tt.expected) + }) + } +} + +func TestApplyRetentionPolicyByTenant(t *testing.T) { + t.Parallel() + + type testBlock struct { + id, tenant string + minTime time.Time + maxTime time.Time + } + + logger := log.NewNopLogger() + ctx := context.TODO() + + for _, tt := range []struct { + name string + blocks []testBlock + retentionByTenant map[string]compact.RetentionPolicy + want []string + wantErr bool + }{ + { + "empty bucket", + []testBlock{}, + map[string]compact.RetentionPolicy{}, + []string{}, + false, + }, + { + "tenant retention disabled", + []testBlock{ + { + "01CPHBEX20729MJQZXE3W0BW48", + "tenant-1", + time.Now().Add(-3 * 24 * time.Hour), + time.Now().Add(-2 * 24 * time.Hour), + }, + { + "01CPHBEX20729MJQZXE3W0BW49", + "tenant-2", + time.Now().Add(-3 * 24 * time.Hour), + time.Now().Add(-2 * 24 * time.Hour), + }, + }, + map[string]compact.RetentionPolicy{}, + []string{ + "01CPHBEX20729MJQZXE3W0BW48/", + "01CPHBEX20729MJQZXE3W0BW49/", + }, + false, + }, + { + "tenant retention with duration", + []testBlock{ + { + "01CPHBEX20729MJQZXE3W0BW48", + "tenant-1", + time.Now().Add(-3 * 24 * time.Hour), + time.Now().Add(-2 * 24 * time.Hour), + }, + { + "01CPHBEX20729MJQZXE3W0BW49", + "tenant-1", + time.Now().Add(-2 * 24 * time.Hour), + time.Now().Add(-24 * time.Hour), + }, + { + "01CPHBEX20729MJQZXE3W0BW50", + "tenant-2", + time.Now().Add(-24 * time.Hour), + time.Now().Add(-23 * time.Hour), + }, + { + "01CPHBEX20729MJQZXE3W0BW51", + "tenant-2", + time.Now().Add(-23 * time.Hour), + time.Now().Add(-6 * time.Hour), + }, + }, + map[string]compact.RetentionPolicy{ + "tenant-2": { + CutoffDate: time.Time{}, + RetentionDuration: 10 * time.Hour, + }, + }, + []string{ + "01CPHBEX20729MJQZXE3W0BW48/", + "01CPHBEX20729MJQZXE3W0BW49/", + "01CPHBEX20729MJQZXE3W0BW51/", + }, + false, + }, + { + "tenant retention with cutoff date", + []testBlock{ + { + "01CPHBEX20729MJQZXE3W0BW48", + "tenant-1", + time.Now().Add(-3 * 24 * time.Hour), + time.Now().Add(-2 * 24 * time.Hour), + }, + { + "01CPHBEX20729MJQZXE3W0BW49", + "tenant-1", + time.Now().Add(-2 * 24 * time.Hour), + time.Now().Add(-24 * time.Hour), + }, + { + "01CPHBEX20729MJQZXE3W0BW50", + "tenant-2", + time.Date(2024, 11, 1, 0, 0, 0, 0, time.UTC), + time.Date(2024, 11, 1, 0, 0, 0, 0, time.UTC), + }, + { + "01CPHBEX20729MJQZXE3W0BW51", + "tenant-2", + time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), + }, + }, + map[string]compact.RetentionPolicy{ + "tenant-2": { + CutoffDate: time.Date(2024, 10, 1, 0, 0, 0, 0, time.UTC), + RetentionDuration: 0, + }, + }, + []string{ + "01CPHBEX20729MJQZXE3W0BW48/", + "01CPHBEX20729MJQZXE3W0BW49/", + "01CPHBEX20729MJQZXE3W0BW50/", + }, + false, + }, + } { + t.Run(tt.name, func(t *testing.T) { + bkt := objstore.WithNoopInstr(objstore.NewInMemBucket()) + for _, b := range tt.blocks { + uploadTenantBlock(t, bkt, b.id, b.tenant, b.minTime, b.maxTime) + } + + baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt) + metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, baseBlockIDsFetcher, "", nil, nil) + testutil.Ok(t, err) + + blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + + metas, _, err := metaFetcher.Fetch(ctx) + testutil.Ok(t, err) + + if err := compact.ApplyRetentionPolicyByTenant(ctx, logger, bkt, metas, tt.retentionByTenant, blocksMarkedForDeletion); (err != nil) != tt.wantErr { + t.Errorf("ApplyRetentionPolicyByResolution() error = %v, wantErr %v", err, tt.wantErr) + } + + got := []string{} + gotMarkedBlocksCount := 0.0 + testutil.Ok(t, bkt.Iter(context.TODO(), "", func(name string) error { + exists, err := bkt.Exists(ctx, filepath.Join(name, metadata.DeletionMarkFilename)) + if err != nil { + return err + } + if !exists { + got = append(got, name) + return nil + } + gotMarkedBlocksCount += 1.0 + return nil + })) + + testutil.Equals(t, got, tt.want) + testutil.Equals(t, gotMarkedBlocksCount, promtest.ToFloat64(blocksMarkedForDeletion)) + }) + } +} + +func uploadTenantBlock(t *testing.T, bkt objstore.Bucket, id, tenant string, minTime, maxTime time.Time) { + t.Helper() + meta1 := metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: ulid.MustParse(id), + MinTime: minTime.Unix() * 1000, + MaxTime: maxTime.Unix() * 1000, + Version: 1, + }, + Thanos: metadata.Thanos{ + Labels: map[string]string{ + metadata.TenantLabel: tenant, + }, + }, + } + + b, err := json.Marshal(meta1) + testutil.Ok(t, err) + + testutil.Ok(t, bkt.Upload(context.Background(), id+"/meta.json", bytes.NewReader(b))) + testutil.Ok(t, bkt.Upload(context.Background(), id+"/chunks/000001", strings.NewReader("@test-data@"))) + testutil.Ok(t, bkt.Upload(context.Background(), id+"/chunks/000002", strings.NewReader("@test-data@"))) + testutil.Ok(t, bkt.Upload(context.Background(), id+"/chunks/000003", strings.NewReader("@test-data@"))) +}