Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

per tenant retention #116

Merged
merged 2 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,12 @@ func runCompact(
level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h])
}

retentionByTenant, err := compact.ParesRetentionPolicyByTenant(logger, *conf.retentionTenants)
if err != nil {
level.Error(logger).Log("msg", "failed to parse retention policy by tenant", "err", err)
return err
}

var cleanMtx sync.Mutex
// TODO(GiedriusS): we could also apply retention policies here but the logic would be a bit more complex.
cleanPartialMarked := func() error {
Expand Down Expand Up @@ -534,6 +540,10 @@ func runCompact(
return errors.Wrap(err, "sync before retention")
}

if err := compact.ApplyRetentionPolicyByTenant(ctx, logger, insBkt, sy.Metas(), retentionByTenant, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, metadata.TenantRetentionExpired)); err != nil {
return errors.Wrap(err, "retention by tenant failed")
}

if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, insBkt, sy.Metas(), retentionByResolution, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, "")); err != nil {
return errors.Wrap(err, "retention failed")
}
Expand Down Expand Up @@ -726,6 +736,7 @@ type compactConfig struct {
objStore extflag.PathOrContent
consistencyDelay time.Duration
retentionRaw, retentionFiveMin, retentionOneHr model.Duration
retentionTenants *[]string
wait bool
waitInterval time.Duration
disableDownsampling bool
Expand Down Expand Up @@ -781,6 +792,7 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
Default("0d").SetValue(&cc.retentionFiveMin)
cmd.Flag("retention.resolution-1h", "How long to retain samples of resolution 2 (1 hour) in bucket. Setting this to 0d will retain samples of this resolution forever").
Default("0d").SetValue(&cc.retentionOneHr)
cc.retentionTenants = cmd.Flag("retention.tenant", "How long to retain samples in bucket per tenant. Setting this to 0d will retain samples of this resolution forever").Strings()

// TODO(kakkoyun, pgough): https://github.com/thanos-io/thanos/issues/2266.
cmd.Flag("wait", "Do not exit after all compactions have been processed and wait for new work.").
Expand Down
2 changes: 2 additions & 0 deletions pkg/block/metadata/markers.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ const (
OutOfOrderChunksNoCompactReason = "block-index-out-of-order-chunk"
// DownsampleVerticalCompactionNoCompactReason is a reason to not compact overlapping downsampled blocks as it does not make sense e.g. how to vertically compact the average.
DownsampleVerticalCompactionNoCompactReason = "downsample-vertical-compaction"
// TenantRetentionExpired is a reason to delete block as it's per tenant retention is expired.
TenantRetentionExpired = "tenant-retention-expired"
)

// NoCompactMark marker stores reason of block being excluded from compaction if needed.
Expand Down
78 changes: 78 additions & 0 deletions pkg/compact/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,25 @@ package compact
import (
"context"
"fmt"
"regexp"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/thanos-io/objstore"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
)

const (
tenantRetentionRegex = `^([\w-]+):((\d{4}-\d{2}-\d{2})|(\d+d))$`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Give some examples in a piece of code comment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

examples in unit tests already

)

// ApplyRetentionPolicyByResolution removes blocks depending on the specified retentionByResolution based on blocks MaxTime.
// A value of 0 disables the retention for its resolution.
func ApplyRetentionPolicyByResolution(
Expand Down Expand Up @@ -47,3 +53,75 @@ func ApplyRetentionPolicyByResolution(
level.Info(logger).Log("msg", "optional retention apply done")
return nil
}

type RetentionPolicy struct {
CutoffDate time.Time
RetentionDuration time.Duration
}

func (r RetentionPolicy) isExpired(blockMaxTime time.Time) bool {
if r.CutoffDate.IsZero() {
return time.Now().After(blockMaxTime.Add(r.RetentionDuration))
}
return r.CutoffDate.After(blockMaxTime)
}

func ParesRetentionPolicyByTenant(logger log.Logger, retentionTenants []string) (map[string]RetentionPolicy, error) {
pattern := regexp.MustCompile(tenantRetentionRegex)
retentionByTenant := make(map[string]RetentionPolicy, len(retentionTenants))
for _, tenantRetention := range retentionTenants {
matches := pattern.FindStringSubmatch(tenantRetention)
invalidFormat := errors.Errorf("invalid retention format for tenant: %s, must be `<tenant>:(<yyyy-mm-dd>|<duration>d)`", tenantRetention)
if len(matches) != 5 {
return nil, errors.Wrapf(invalidFormat, "matched size %d", len(matches))
}
tenant := matches[1]
var policy RetentionPolicy
if _, ok := retentionByTenant[tenant]; ok {
return nil, errors.Errorf("duplicate retention policy for tenant: %s", tenant)
}
if cutoffDate, err := time.Parse(time.DateOnly, matches[3]); err != nil && matches[3] != "" {
return nil, errors.Wrapf(invalidFormat, "error parsing cutoff date: %v", err)
} else if matches[3] != "" {
policy.CutoffDate = cutoffDate
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't need else if{} because the function returns in if{}.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still need this, matches[3] can be empty, we allow optional cutoffDate or a retention duration

if duration, err := model.ParseDuration(matches[4]); err != nil && matches[4] != "" {
return nil, errors.Wrapf(invalidFormat, "error parsing duration: %v", err)
} else if matches[4] != "" {
policy.RetentionDuration = time.Duration(duration)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

level.Info(logger).Log("msg", "retention policy for tenant is enabled", "tenant", tenant, "retention policy", fmt.Sprintf("%v", policy))
retentionByTenant[tenant] = policy
}
return retentionByTenant, nil
}

// ApplyRetentionPolicyByTenant removes blocks depending on the specified retentionByTenant based on blocks MaxTime.
func ApplyRetentionPolicyByTenant(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plan to add unit test for this as well

ctx context.Context,
logger log.Logger,
bkt objstore.Bucket,
metas map[ulid.ULID]*metadata.Meta,
retentionByTenant map[string]RetentionPolicy,
blocksMarkedForDeletion prometheus.Counter) error {
if len(retentionByTenant) == 0 {
level.Info(logger).Log("msg", "tenant retention is disabled due to no policy")
return nil
}
level.Info(logger).Log("msg", "start tenant retention")
for id, m := range metas {
policy, ok := retentionByTenant[m.Thanos.GetTenant()]
if !ok {
continue
}
maxTime := time.Unix(m.MaxTime/1000, 0)
if policy.isExpired(maxTime) {
level.Info(logger).Log("msg", "applying retention: marking block for deletion", "id", id, "maxTime", maxTime.String())
if err := block.MarkForDeletion(ctx, logger, bkt, id, fmt.Sprintf("block exceeding retention of %v", policy), blocksMarkedForDeletion); err != nil {
return errors.Wrap(err, "delete block")
}
}
}
level.Info(logger).Log("msg", "tenant retention apply done")
return nil
}
63 changes: 61 additions & 2 deletions pkg/compact/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"
"time"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -20,8 +21,6 @@ import (
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/objstore"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
Expand Down Expand Up @@ -282,6 +281,66 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) {
}
}

func TestParseRetentionPolicyByTenant(t *testing.T) {
t.Parallel()

for _, tt := range []struct {
name string
retentionTenants []string
expected map[string]compact.RetentionPolicy
expectedErr bool
}{
{
"empty",
[]string{},
map[string]compact.RetentionPolicy{},
false,
},
{
"valid",
[]string{"tenant-1:2021-01-01", "tenant-2:3d"},
map[string]compact.RetentionPolicy{
"tenant-1": {
CutoffDate: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
RetentionDuration: time.Duration(0),
},
"tenant-2": {
CutoffDate: time.Time{},
RetentionDuration: 3 * 24 * time.Hour,
},
},
false,
},
{
"invalid tenant",
[]string{"tenant1:2021-01-01", "tenant#2:1d"},
nil,
true,
},
{
"invalid date",
[]string{"tenant1:2021-010-01", "tenant2:1d"},
nil,
true,
},
{
"invalid duration",
[]string{"tenant1:2021-01-01", "tenant2:1w"},
nil,
true,
},
} {
t.Run(tt.name, func(t *testing.T) {
got, err := compact.ParesRetentionPolicyByTenant(log.NewNopLogger(), tt.retentionTenants)
if (err != nil) != tt.expectedErr {
t.Errorf("ParseRetentionPolicyByTenant() error = %v, wantErr %v", err, tt.expectedErr)
return
}
testutil.Equals(t, got, tt.expected)
})
}
}

func uploadMockBlock(t *testing.T, bkt objstore.Bucket, id string, minTime, maxTime time.Time, resolutionLevel int64) {
t.Helper()
meta1 := metadata.Meta{
Expand Down
Loading