diff --git a/pkg/blockbuilder/scheduler/strategy.go b/pkg/blockbuilder/scheduler/strategy.go index 445b58276cd5a..769350c5256ee 100644 --- a/pkg/blockbuilder/scheduler/strategy.go +++ b/pkg/blockbuilder/scheduler/strategy.go @@ -63,8 +63,15 @@ func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int, // 1. kadm.GroupMemberLag contains valid Commit.At even when consumer group never committed any offset. // no additional validation is needed here // 2. committed offset could be behind start offset if we are falling behind retention period. + + // startOffset is the previously-committed offset, so we need to start processing the first + // _uncommitted_ offset startOffset := max(partitionOffset.Commit.At+1, partitionOffset.Start.Offset) - endOffset := partitionOffset.End.Offset + // Likewise, endOffset is initially the next available offset. In the case of race conditions + // or more likely when a partition no longer has data written to it, this will not increase. + // therefore, we decrement it to ensure we're processing data that exists. + // otherwise, the builders will get stuck polling for a final offset which will never come. + endOffset := partitionOffset.End.Offset - 1 // Skip if there's no lag if startOffset >= endOffset { diff --git a/pkg/blockbuilder/types/job.go b/pkg/blockbuilder/types/job.go index 626ab45e907fb..a21c3e0c19a97 100644 --- a/pkg/blockbuilder/types/job.go +++ b/pkg/blockbuilder/types/job.go @@ -56,6 +56,7 @@ func (s JobStatus) IsFinished() bool { } // Offsets represents the range of offsets to process +// Inclusive on both ends. type Offsets struct { Min int64 Max int64