Skip to content

Commit

Permalink
add unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Yi Jin <[email protected]>
  • Loading branch information
jnyi committed Dec 17, 2024
1 parent c3c1e59 commit 2cc4540
Show file tree
Hide file tree
Showing 2 changed files with 222 additions and 12 deletions.
18 changes: 12 additions & 6 deletions pkg/compact/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
)

const (
// tenantRetentionRegex is the regex pattern for parsing tenant retention.
// valid format is `<tenant>:(<yyyy-mm-dd>|<duration>d)` where <duration> > 0.
tenantRetentionRegex = `^([\w-]+):((\d{4}-\d{2}-\d{2})|(\d+d))$`
)

Expand Down Expand Up @@ -80,14 +82,18 @@ func ParesRetentionPolicyByTenant(logger log.Logger, retentionTenants []string)
if _, ok := retentionByTenant[tenant]; ok {
return nil, errors.Errorf("duplicate retention policy for tenant: %s", tenant)
}
if cutoffDate, err := time.Parse(time.DateOnly, matches[3]); err != nil && matches[3] != "" {
return nil, errors.Wrapf(invalidFormat, "error parsing cutoff date: %v", err)
} else if matches[3] != "" {
if cutoffDate, err := time.Parse(time.DateOnly, matches[3]); matches[3] != "" {
if err != nil {
return nil, errors.Wrapf(invalidFormat, "error parsing cutoff date: %v", err)
}
policy.CutoffDate = cutoffDate
}
if duration, err := model.ParseDuration(matches[4]); err != nil && matches[4] != "" {
return nil, errors.Wrapf(invalidFormat, "error parsing duration: %v", err)
} else if matches[4] != "" {
if duration, err := model.ParseDuration(matches[4]); matches[4] != "" {
if err != nil {
return nil, errors.Wrapf(invalidFormat, "error parsing duration: %v", err)
} else if duration == 0 {
return nil, errors.Wrapf(invalidFormat, "duration must be greater than 0")
}
policy.RetentionDuration = time.Duration(duration)
}
level.Info(logger).Log("msg", "retention policy for tenant is enabled", "tenant", tenant, "retention policy", fmt.Sprintf("%v", policy))
Expand Down
216 changes: 210 additions & 6 deletions pkg/compact/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"testing"
"time"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -21,6 +20,8 @@ import (
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/objstore"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
Expand Down Expand Up @@ -281,6 +282,31 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) {
}
}

func uploadMockBlock(t *testing.T, bkt objstore.Bucket, id string, minTime, maxTime time.Time, resolutionLevel int64) {
t.Helper()
meta1 := metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustParse(id),
MinTime: minTime.Unix() * 1000,
MaxTime: maxTime.Unix() * 1000,
Version: 1,
},
Thanos: metadata.Thanos{
Downsample: metadata.ThanosDownsample{
Resolution: resolutionLevel,
},
},
}

b, err := json.Marshal(meta1)
testutil.Ok(t, err)

testutil.Ok(t, bkt.Upload(context.Background(), id+"/meta.json", bytes.NewReader(b)))
testutil.Ok(t, bkt.Upload(context.Background(), id+"/chunks/000001", strings.NewReader("@test-data@")))
testutil.Ok(t, bkt.Upload(context.Background(), id+"/chunks/000002", strings.NewReader("@test-data@")))
testutil.Ok(t, bkt.Upload(context.Background(), id+"/chunks/000003", strings.NewReader("@test-data@")))
}

func TestParseRetentionPolicyByTenant(t *testing.T) {
t.Parallel()

Expand All @@ -298,15 +324,15 @@ func TestParseRetentionPolicyByTenant(t *testing.T) {
},
{
"valid",
[]string{"tenant-1:2021-01-01", "tenant-2:3d"},
[]string{"tenant-1:2021-01-01", "tenant-2:11d"},
map[string]compact.RetentionPolicy{
"tenant-1": {
CutoffDate: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
RetentionDuration: time.Duration(0),
},
"tenant-2": {
CutoffDate: time.Time{},
RetentionDuration: 3 * 24 * time.Hour,
RetentionDuration: 11 * 24 * time.Hour,
},
},
false,
Expand All @@ -329,6 +355,12 @@ func TestParseRetentionPolicyByTenant(t *testing.T) {
nil,
true,
},
{
"invalid duration which is 0",
[]string{"tenant1:2021-01-01", "tenant2:0d"},
nil,
true,
},
} {
t.Run(tt.name, func(t *testing.T) {
got, err := compact.ParesRetentionPolicyByTenant(log.NewNopLogger(), tt.retentionTenants)
Expand All @@ -341,7 +373,179 @@ func TestParseRetentionPolicyByTenant(t *testing.T) {
}
}

func uploadMockBlock(t *testing.T, bkt objstore.Bucket, id string, minTime, maxTime time.Time, resolutionLevel int64) {
func TestApplyRetentionPolicyByTenant(t *testing.T) {
t.Parallel()

type testBlock struct {
id, tenant string
minTime time.Time
maxTime time.Time
}

logger := log.NewNopLogger()
ctx := context.TODO()

for _, tt := range []struct {
name string
blocks []testBlock
retentionByTenant map[string]compact.RetentionPolicy
want []string
wantErr bool
}{
{
"empty bucket",
[]testBlock{},
map[string]compact.RetentionPolicy{},
[]string{},
false,
},
{
"tenant retention disabled",
[]testBlock{
{
"01CPHBEX20729MJQZXE3W0BW48",
"tenant-1",
time.Now().Add(-3 * 24 * time.Hour),
time.Now().Add(-2 * 24 * time.Hour),
},
{
"01CPHBEX20729MJQZXE3W0BW49",
"tenant-2",
time.Now().Add(-3 * 24 * time.Hour),
time.Now().Add(-2 * 24 * time.Hour),
},
},
map[string]compact.RetentionPolicy{},
[]string{
"01CPHBEX20729MJQZXE3W0BW48/",
"01CPHBEX20729MJQZXE3W0BW49/",
},
false,
},
{
"tenant retention with duration",
[]testBlock{
{
"01CPHBEX20729MJQZXE3W0BW48",
"tenant-1",
time.Now().Add(-3 * 24 * time.Hour),
time.Now().Add(-2 * 24 * time.Hour),
},
{
"01CPHBEX20729MJQZXE3W0BW49",
"tenant-1",
time.Now().Add(-2 * 24 * time.Hour),
time.Now().Add(-24 * time.Hour),
},
{
"01CPHBEX20729MJQZXE3W0BW50",
"tenant-2",
time.Now().Add(-24 * time.Hour),
time.Now().Add(-23 * time.Hour),
},
{
"01CPHBEX20729MJQZXE3W0BW51",
"tenant-2",
time.Now().Add(-23 * time.Hour),
time.Now().Add(-6 * time.Hour),
},
},
map[string]compact.RetentionPolicy{
"tenant-2": {
CutoffDate: time.Time{},
RetentionDuration: 10 * time.Hour,
},
},
[]string{
"01CPHBEX20729MJQZXE3W0BW48/",
"01CPHBEX20729MJQZXE3W0BW49/",
"01CPHBEX20729MJQZXE3W0BW51/",
},
false,
},
{
"tenant retention with cutoff date",
[]testBlock{
{
"01CPHBEX20729MJQZXE3W0BW48",
"tenant-1",
time.Now().Add(-3 * 24 * time.Hour),
time.Now().Add(-2 * 24 * time.Hour),
},
{
"01CPHBEX20729MJQZXE3W0BW49",
"tenant-1",
time.Now().Add(-2 * 24 * time.Hour),
time.Now().Add(-24 * time.Hour),
},
{
"01CPHBEX20729MJQZXE3W0BW50",
"tenant-2",
time.Date(2024, 11, 1, 0, 0, 0, 0, time.UTC),
time.Date(2024, 11, 1, 0, 0, 0, 0, time.UTC),
},
{
"01CPHBEX20729MJQZXE3W0BW51",
"tenant-2",
time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),
},
},
map[string]compact.RetentionPolicy{
"tenant-2": {
CutoffDate: time.Date(2024, 10, 1, 0, 0, 0, 0, time.UTC),
RetentionDuration: 0,
},
},
[]string{
"01CPHBEX20729MJQZXE3W0BW48/",
"01CPHBEX20729MJQZXE3W0BW49/",
"01CPHBEX20729MJQZXE3W0BW50/",
},
false,
},
} {
t.Run(tt.name, func(t *testing.T) {
bkt := objstore.WithNoopInstr(objstore.NewInMemBucket())
for _, b := range tt.blocks {
uploadTenantBlock(t, bkt, b.id, b.tenant, b.minTime, b.maxTime)
}

baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt)
metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, baseBlockIDsFetcher, "", nil, nil)
testutil.Ok(t, err)

blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{})

metas, _, err := metaFetcher.Fetch(ctx)
testutil.Ok(t, err)

if err := compact.ApplyRetentionPolicyByTenant(ctx, logger, bkt, metas, tt.retentionByTenant, blocksMarkedForDeletion); (err != nil) != tt.wantErr {
t.Errorf("ApplyRetentionPolicyByResolution() error = %v, wantErr %v", err, tt.wantErr)
}

got := []string{}
gotMarkedBlocksCount := 0.0
testutil.Ok(t, bkt.Iter(context.TODO(), "", func(name string) error {
exists, err := bkt.Exists(ctx, filepath.Join(name, metadata.DeletionMarkFilename))
if err != nil {
return err
}
if !exists {
got = append(got, name)
return nil
}
gotMarkedBlocksCount += 1.0
return nil
}))

testutil.Equals(t, got, tt.want)
testutil.Equals(t, gotMarkedBlocksCount, promtest.ToFloat64(blocksMarkedForDeletion))
})
}
}

func uploadTenantBlock(t *testing.T, bkt objstore.Bucket, id, tenant string, minTime, maxTime time.Time) {
t.Helper()
meta1 := metadata.Meta{
BlockMeta: tsdb.BlockMeta{
Expand All @@ -351,8 +555,8 @@ func uploadMockBlock(t *testing.T, bkt objstore.Bucket, id string, minTime, maxT
Version: 1,
},
Thanos: metadata.Thanos{
Downsample: metadata.ThanosDownsample{
Resolution: resolutionLevel,
Labels: map[string]string{
metadata.TenantLabel: tenant,
},
},
}
Expand Down

0 comments on commit 2cc4540

Please sign in to comment.