Skip to content

Commit

Permalink
adjusts job building for inclusivity to prevent endless polling
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d committed Jan 14, 2025
1 parent 8601fc6 commit d73f0b5
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 1 deletion.
9 changes: 8 additions & 1 deletion pkg/blockbuilder/scheduler/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,15 @@ func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int,
// 1. kadm.GroupMemberLag contains valid Commit.At even when consumer group never committed any offset.
// no additional validation is needed here
// 2. committed offset could be behind start offset if we are falling behind retention period.

// startOffset is the previously-committed offset, so we need to start processing the first
// _uncommitted_ offset
startOffset := max(partitionOffset.Commit.At+1, partitionOffset.Start.Offset)
endOffset := partitionOffset.End.Offset
// Likewise, endOffset is initially the next available offset. In the case of race conditions
// or more likely when a partition no longer has data written to it, this will not increase.
// therefore, we decrement it to ensure we're processing data that exists.
// otherwise, the builders will get stuck polling for a final offset which will never come.
endOffset := partitionOffset.End.Offset - 1

// Skip if there's no lag
if startOffset >= endOffset {
Expand Down
1 change: 1 addition & 0 deletions pkg/blockbuilder/types/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (s JobStatus) IsFinished() bool {
}

// Offsets represents the range of offsets to process
// Inclusive on both ends.
type Offsets struct {
Min int64
Max int64
Expand Down

0 comments on commit d73f0b5

Please sign in to comment.