From 049adc75c3c6c4d8ca6a5fb64f5c9a0e7800f183 Mon Sep 17 00:00:00 2001 From: Yi Jin Date: Tue, 12 Mar 2024 21:01:52 -0700 Subject: [PATCH] delete obsolete blocks that doesn't container __tenant__ Signed-off-by: Yi Jin --- pkg/block/fetcher.go | 22 ++++++------------- pkg/block/metadata/meta.go | 7 ++++++ pkg/compact/compact.go | 45 +++++++++++++++++++++++++++++++------- 3 files changed, 51 insertions(+), 23 deletions(-) diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 3708aa5ffd..682cbe1f9b 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -103,11 +103,6 @@ const ( // Modified label values. replicaRemovedMeta = "replica-label-removed" - - tenantLabel = "__tenant__" - defautTenant = "__not_set__" - replicaLabel = "__replica__" - defaultReplica = "" ) func NewBaseFetcherMetrics(reg prometheus.Registerer) *BaseFetcherMetrics { @@ -187,7 +182,7 @@ func NewFetcherMetrics(reg prometheus.Registerer, syncedExtraLabels, modifiedExt Name: "assigned", Help: "Number of metadata blocks assigned to this pod after all filters.", }, - []string{"tenant", "level", "replica"}, + []string{"tenant", "level"}, // No init label values is fine. The only downside is those guages won't be reset to 0, but it's fine for the use case. ) return &m @@ -593,10 +588,10 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filter metas := make(map[ulid.ULID]*metadata.Meta, len(resp.metas)) for id, m := range resp.metas { metas[id] = m - if tenant, ok := m.Thanos.Labels[tenantLabel]; ok { + if tenant, ok := m.Thanos.Labels[metadata.TenantLabel]; ok { numBlocksByTenant[tenant]++ } else { - numBlocksByTenant[defautTenant]++ + numBlocksByTenant[metadata.DefaultTenant]++ } } @@ -619,16 +614,13 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filter // Therefore, it's skipped to update the gauge. if len(filters) > 0 { for _, m := range metas { - var tenant, replica string + var tenant string var ok bool // tenant and replica will have the zero value ("") if the key is not in the map. - if tenant, ok = m.Thanos.Labels[tenantLabel]; !ok { - tenant = defautTenant - } - if replica, ok = m.Thanos.Labels[replicaLabel]; !ok { - replica = defaultReplica + if tenant, ok = m.Thanos.Labels[metadata.TenantLabel]; !ok { + tenant = metadata.DefaultTenant } - metrics.Assigned.WithLabelValues(tenant, strconv.Itoa(m.BlockMeta.Compaction.Level), replica).Inc() + metrics.Assigned.WithLabelValues(tenant, strconv.Itoa(m.BlockMeta.Compaction.Level)).Inc() } } diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index a479ee242d..f142da4f15 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -54,6 +54,13 @@ const ( ThanosVersion1 = 1 ) +const ( + TenantLabel = "__tenant__" + + ObsoletedTenantLabel = "tenant_id" + DefaultTenant = "__not_set__" +) + // Meta describes the a block's meta. It wraps the known TSDB meta structure and // extends it by Thanos-specific fields. type Meta struct { diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index df3b76625f..454c322f49 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -70,6 +70,7 @@ type SyncerMetrics struct { GarbageCollectionFailures prometheus.Counter GarbageCollectionDuration prometheus.Observer BlocksMarkedForDeletion prometheus.Counter + ObsoletedBlocks prometheus.Counter } func NewSyncerMetrics(reg prometheus.Registerer, blocksMarkedForDeletion, garbageCollectedBlocks prometheus.Counter) *SyncerMetrics { @@ -89,6 +90,10 @@ func NewSyncerMetrics(reg prometheus.Registerer, blocksMarkedForDeletion, garbag Help: "Time it took to perform garbage collection iteration.", Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720}, }) + m.ObsoletedBlocks = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_compact_obsoleted_blocks_total", + Help: "Total number of blocks that are obsoleted and deleted", + }) m.BlocksMarkedForDeletion = blocksMarkedForDeletion @@ -216,6 +221,21 @@ func (s *Syncer) GarbageCollect(ctx context.Context) error { return nil } +func (s *Syncer) DeleteObsoletedBlocks(ctx context.Context, dir string) error { + s.mtx.Lock() + defer s.mtx.Unlock() + + for _, m := range s.blocks { + if _, ok := m.Thanos.Labels[metadata.ObsoletedTenantLabel]; ok { + s.metrics.ObsoletedBlocks.Inc() + if err := DeleteBlockNow(ctx, s.logger, s.bkt, m, dir); err != nil { + level.Error(s.logger).Log("msg", "deleting block failed", "block", m.String(), "err", err) + } + } + } + return nil +} + // Grouper is responsible to group all known blocks into sub groups which are safe to be // compacted concurrently. type Grouper interface { @@ -1053,14 +1073,19 @@ func (cg *Group) removeOverlappingBlocks(ctx context.Context, toCompact []*metad continue } cg.overlappingBlocks.Inc() - level.Warn(cg.logger).Log("msg", "delete overlapping block immediately", "block", m.String(), - "level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.Labels) - if err := block.Delete(ctx, cg.logger, cg.bkt, m.ULID); err != nil { - return errors.Wrapf(err, "delete overlapping block %s", m.String()) - } - if err := os.RemoveAll(filepath.Join(dir, m.ULID.String())); err != nil { - return errors.Wrapf(err, "remove old block dir %s", m.String()) - } + return DeleteBlockNow(ctx, cg.logger, cg.bkt, m, dir) + } + return nil +} + +func DeleteBlockNow(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string) error { + level.Warn(logger).Log("msg", "delete polluted block immediately", "block", m.String(), + "level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.Labels) + if err := block.Delete(ctx, logger, bkt, m.ULID); err != nil { + return errors.Wrapf(err, "delete overlapping block %s", m.String()) + } + if err := os.RemoveAll(filepath.Join(dir, m.ULID.String())); err != nil { + return errors.Wrapf(err, "remove old block dir %s", m.String()) } return nil } @@ -1517,6 +1542,10 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { return errors.Wrap(err, "garbage") } + if err := c.sy.DeleteObsoletedBlocks(ctx, c.compactDir); err != nil { + return errors.Wrap(err, "delete obsoleted blocks") + } + groups, err := c.grouper.Groups(c.sy.Metas()) if err != nil { return errors.Wrap(err, "build compaction groups")