-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore(blockbuilder): cleanup #15730
base: main
Are you sure you want to change the base?
chore(blockbuilder): cleanup #15730
Conversation
owen-d
commented
Jan 13, 2025
•
edited
Loading
edited
- Rewrites queue for more reliable & safe state transitions (no more negative pending jobs)
- Handles off-by-1 polling error where builders would poll indefinitely for the final offset of a partition which didn't exist
- Adds backoff & error propagation logic for polling so jobs can fail after 3 unsuccessful attempts.
fails job after 3 successive kafka polling errors
pkg/blockbuilder/builder/builder.go
Outdated
@@ -592,7 +616,7 @@ func (i *BlockBuilder) loadRecords(ctx context.Context, c *kgo.Client, partition | |||
} | |||
} | |||
|
|||
return lastConsumedOffset, boff.Err() | |||
return lastSeenOffset, boff.Err() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's a fair point. The reason I removed this was because outside of logging, it's not used anywhere. It's returned up the chain and logged, but it doesn't influence behavior. I figured reducing unnecessary complexity made sense in this case. The new behavior is we're logging the last seen offset, rather than the last consumed offset. However, I should probably just log the final consumed offset (rather than seen), but not return thread it through our code unnecessarily.
startOffset := max(partitionOffset.Commit.At+1, partitionOffset.Start.Offset) | ||
// Likewise, endOffset is initially the next available offset: this is why we treat jobs as end-exclusive: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we include the endOffset
since it is a valid offset for a record in the partition? we'd be skipping the last record otherwise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYM? end offset in this case is exclusive. This is the next available record in partitions which we're caught up on. Trying to poll for these when the partition is no longer being written to causes us to hang forever waiting for data which will not appear. This is the reason we were seeing infinite polling on defunct partitions.
pkg/blockbuilder/scheduler/queue.go
Outdated
@@ -88,25 +88,21 @@ type JobQueue struct { | |||
metrics *jobQueueMetrics | |||
} | |||
|
|||
// NewJobQueue creates a new job queue instance | |||
// NewJobQueue creates a new JobQueue2 instance |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: needs correction
q.metrics.inProgress.Inc() | ||
job.StartTime = job.UpdateTime | ||
case types.JobStatusComplete, types.JobStatusFailed, types.JobStatusExpired: | ||
q.completed.Push(job) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we might have to remove the evicted job from status map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch!
pkg/blockbuilder/scheduler/queue.go
Outdated
q.statusMap[jobMeta.ID()] = types.JobStatusInProgress | ||
q.metrics.inProgress.Inc() | ||
if finished { | ||
level.Debug(q.logger).Log("msg", "ignoring transition for completed job; will recreate", "id", jobID, "from", currentStatus, "to", to) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this comment correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but perhaps I can be clearer: we're not going to delete the old completed entry for this job. Instead we're going to recreate a copy.
beforeSync := time.Now() | ||
q.SyncJob(jobID, job) | ||
afterSync := time.Now() | ||
func TestJobQueue2_TransitionState(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename tests that contain Queue2