Skip to content

Commit

Permalink
delete obsolete blocks that doesn't container __tenant__
Browse files Browse the repository at this point in the history
Signed-off-by: Yi Jin <[email protected]>
  • Loading branch information
jnyi committed Mar 13, 2024
1 parent 38e888c commit 049adc7
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 23 deletions.
22 changes: 7 additions & 15 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,6 @@ const (

// Modified label values.
replicaRemovedMeta = "replica-label-removed"

tenantLabel = "__tenant__"
defautTenant = "__not_set__"
replicaLabel = "__replica__"
defaultReplica = ""
)

func NewBaseFetcherMetrics(reg prometheus.Registerer) *BaseFetcherMetrics {
Expand Down Expand Up @@ -187,7 +182,7 @@ func NewFetcherMetrics(reg prometheus.Registerer, syncedExtraLabels, modifiedExt
Name: "assigned",
Help: "Number of metadata blocks assigned to this pod after all filters.",
},
[]string{"tenant", "level", "replica"},
[]string{"tenant", "level"},
// No init label values is fine. The only downside is those guages won't be reset to 0, but it's fine for the use case.
)
return &m
Expand Down Expand Up @@ -593,10 +588,10 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filter
metas := make(map[ulid.ULID]*metadata.Meta, len(resp.metas))
for id, m := range resp.metas {
metas[id] = m
if tenant, ok := m.Thanos.Labels[tenantLabel]; ok {
if tenant, ok := m.Thanos.Labels[metadata.TenantLabel]; ok {
numBlocksByTenant[tenant]++
} else {
numBlocksByTenant[defautTenant]++
numBlocksByTenant[metadata.DefaultTenant]++
}
}

Expand All @@ -619,16 +614,13 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filter
// Therefore, it's skipped to update the gauge.
if len(filters) > 0 {
for _, m := range metas {
var tenant, replica string
var tenant string
var ok bool
// tenant and replica will have the zero value ("") if the key is not in the map.
if tenant, ok = m.Thanos.Labels[tenantLabel]; !ok {
tenant = defautTenant
}
if replica, ok = m.Thanos.Labels[replicaLabel]; !ok {
replica = defaultReplica
if tenant, ok = m.Thanos.Labels[metadata.TenantLabel]; !ok {
tenant = metadata.DefaultTenant
}
metrics.Assigned.WithLabelValues(tenant, strconv.Itoa(m.BlockMeta.Compaction.Level), replica).Inc()
metrics.Assigned.WithLabelValues(tenant, strconv.Itoa(m.BlockMeta.Compaction.Level)).Inc()
}
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ const (
ThanosVersion1 = 1
)

const (
TenantLabel = "__tenant__"

ObsoletedTenantLabel = "tenant_id"
DefaultTenant = "__not_set__"
)

// Meta describes the a block's meta. It wraps the known TSDB meta structure and
// extends it by Thanos-specific fields.
type Meta struct {
Expand Down
45 changes: 37 additions & 8 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type SyncerMetrics struct {
GarbageCollectionFailures prometheus.Counter
GarbageCollectionDuration prometheus.Observer
BlocksMarkedForDeletion prometheus.Counter
ObsoletedBlocks prometheus.Counter
}

func NewSyncerMetrics(reg prometheus.Registerer, blocksMarkedForDeletion, garbageCollectedBlocks prometheus.Counter) *SyncerMetrics {
Expand All @@ -89,6 +90,10 @@ func NewSyncerMetrics(reg prometheus.Registerer, blocksMarkedForDeletion, garbag
Help: "Time it took to perform garbage collection iteration.",
Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720},
})
m.ObsoletedBlocks = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_obsoleted_blocks_total",
Help: "Total number of blocks that are obsoleted and deleted",
})

m.BlocksMarkedForDeletion = blocksMarkedForDeletion

Expand Down Expand Up @@ -216,6 +221,21 @@ func (s *Syncer) GarbageCollect(ctx context.Context) error {
return nil
}

func (s *Syncer) DeleteObsoletedBlocks(ctx context.Context, dir string) error {
s.mtx.Lock()
defer s.mtx.Unlock()

for _, m := range s.blocks {
if _, ok := m.Thanos.Labels[metadata.ObsoletedTenantLabel]; ok {
s.metrics.ObsoletedBlocks.Inc()
if err := DeleteBlockNow(ctx, s.logger, s.bkt, m, dir); err != nil {
level.Error(s.logger).Log("msg", "deleting block failed", "block", m.String(), "err", err)
}
}
}
return nil
}

// Grouper is responsible to group all known blocks into sub groups which are safe to be
// compacted concurrently.
type Grouper interface {
Expand Down Expand Up @@ -1053,14 +1073,19 @@ func (cg *Group) removeOverlappingBlocks(ctx context.Context, toCompact []*metad
continue
}
cg.overlappingBlocks.Inc()
level.Warn(cg.logger).Log("msg", "delete overlapping block immediately", "block", m.String(),
"level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.Labels)
if err := block.Delete(ctx, cg.logger, cg.bkt, m.ULID); err != nil {
return errors.Wrapf(err, "delete overlapping block %s", m.String())
}
if err := os.RemoveAll(filepath.Join(dir, m.ULID.String())); err != nil {
return errors.Wrapf(err, "remove old block dir %s", m.String())
}
return DeleteBlockNow(ctx, cg.logger, cg.bkt, m, dir)
}
return nil
}

func DeleteBlockNow(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string) error {
level.Warn(logger).Log("msg", "delete polluted block immediately", "block", m.String(),
"level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.Labels)
if err := block.Delete(ctx, logger, bkt, m.ULID); err != nil {
return errors.Wrapf(err, "delete overlapping block %s", m.String())
}
if err := os.RemoveAll(filepath.Join(dir, m.ULID.String())); err != nil {
return errors.Wrapf(err, "remove old block dir %s", m.String())
}
return nil
}
Expand Down Expand Up @@ -1517,6 +1542,10 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {
return errors.Wrap(err, "garbage")
}

if err := c.sy.DeleteObsoletedBlocks(ctx, c.compactDir); err != nil {
return errors.Wrap(err, "delete obsoleted blocks")
}

groups, err := c.grouper.Groups(c.sy.Metas())
if err != nil {
return errors.Wrap(err, "build compaction groups")
Expand Down

0 comments on commit 049adc7

Please sign in to comment.