Skip to content

Commit

Permalink
add counters
Browse files Browse the repository at this point in the history
Signed-off-by: Yi Jin <[email protected]>
  • Loading branch information
jnyi committed Mar 12, 2024
1 parent b9d180b commit 5e8d908
Showing 1 changed file with 34 additions and 33 deletions.
67 changes: 34 additions & 33 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -1029,6 +1029,39 @@ func (cg *Group) areBlocksOverlapping(include *metadata.Meta, exclude ...*metada
return nil
}

func (cg *Group) removeOverlappingBlocks(ctx context.Context, toCompact []*metadata.Meta, dir string, blockDeletableChecker BlockDeletableChecker) error {
if len(toCompact) == 0 {
return nil
}
kept := toCompact[0]
for _, m := range toCompact {
if m.MinTime < kept.MinTime && m.MaxTime > kept.MaxTime {
level.Warn(cg.logger).Log("msg", "found overlapping block in plan that are not the first",
"first", kept.String(), "block", m.String())
kept = m
} else if m.MinTime < kept.MinTime || m.MaxTime > kept.MaxTime {
return halt(errors.Errorf("found partially overlapping block: %s vs %s", m.String(), kept.String()))
}
}
for _, m := range toCompact {
if m.ULID.Compare(kept.ULID) == 0 {
level.Info(cg.logger).Log("msg", "skip the longest overlapping block", "block", m.String(),
"level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.Labels)
continue
}
cg.blocksOverlapped.Inc()
if err := os.RemoveAll(filepath.Join(dir, m.ULID.String())); err != nil {
return errors.Wrapf(err, "remove old block dir %s", m.String())
}
if blockDeletableChecker.CanDelete(cg, m.ULID) {
level.Warn(cg.logger).Log("msg", "deleting overlapping block", "block", m.String(),
"level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.Labels)
return block.Delete(ctx, cg.logger, cg.bkt, m.ULID)
}
}
return nil
}

// RepairIssue347 repairs the https://github.com/prometheus/tsdb/issues/347 issue when having issue347Error.
func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, blocksMarkedForDeletion prometheus.Counter, issue347Err error) error {
ie, ok := errors.Cause(issue347Err).(Issue347Error)
Expand Down Expand Up @@ -1087,38 +1120,6 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket,
return nil
}

func (cg *Group) removeOverlappedBlocks(ctx context.Context, toCompact []*metadata.Meta, dir string, blockDeletableChecker BlockDeletableChecker) error {
if len(toCompact) == 0 {
return nil
}
kept := toCompact[0]
for _, m := range toCompact {
if m.MinTime < kept.MinTime && m.MaxTime > kept.MaxTime {
level.Warn(cg.logger).Log("msg", "found overlapped block in plan that are not the first",
"first", kept.String(), "block", m.String())
kept = m
} else if m.MinTime < kept.MinTime || m.MaxTime > kept.MaxTime {
return halt(errors.Errorf("found partially overlapped block: %s vs %s", m.String(), kept.String()))
}
}
for _, m := range toCompact {
if m.ULID.Compare(kept.ULID) == 0 {
level.Info(cg.logger).Log("msg", "skip the longest overlapped block", "block", m.String(),
"level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.Labels)
continue
}
if err := os.RemoveAll(filepath.Join(dir, m.ULID.String())); err != nil {
return errors.Wrapf(err, "remove old block dir %s", m.String())
}
if blockDeletableChecker.CanDelete(cg, m.ULID) {
level.Warn(cg.logger).Log("msg", "deleting overlapped block", "block", m.String(),
"level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.Labels)
return block.Delete(ctx, cg.logger, cg.bkt, m.ULID)
}
}
return nil
}

func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Compactor, blockDeletableChecker BlockDeletableChecker, compactionLifecycleCallback CompactionLifecycleCallback, errChan chan error) (shouldRerun bool, compID ulid.ULID, _ error) {
cg.mtx.Lock()
defer cg.mtx.Unlock()
Expand Down Expand Up @@ -1156,7 +1157,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
if err := compactionLifecycleCallback.PreCompactionCallback(ctx, cg.logger, cg, toCompact); err != nil {
level.Error(cg.logger).Log("msg", fmt.Sprintf("failed to run pre compaction callback for plan: %v", toCompact), "err", err)
// instead of halting, we attempt to remove overlapped blocks and only keep the longest one.
return false, ulid.ULID{}, cg.removeOverlappedBlocks(ctx, toCompact, dir, blockDeletableChecker)
return false, ulid.ULID{}, cg.removeOverlappingBlocks(ctx, toCompact, dir, blockDeletableChecker)
}
level.Info(cg.logger).Log("msg", "finished running pre compaction callback; downloading blocks", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds(), "plan", fmt.Sprintf("%v", toCompact))

Expand Down

0 comments on commit 5e8d908

Please sign in to comment.