Skip to content

Commit

Permalink
removes dead code & unnecessary difference between lastSeen vs lastCo…
Browse files Browse the repository at this point in the history
…nsumed offsets in builder code
  • Loading branch information
owen-d committed Jan 14, 2025
1 parent 964988f commit 8601fc6
Showing 1 changed file with 3 additions and 9 deletions.
12 changes: 3 additions & 9 deletions pkg/blockbuilder/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,10 +501,6 @@ func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types
}
}

if lastOffset <= job.Offsets().Min {
return lastOffset, nil
}

// log success
level.Info(logger).Log(
"msg", "successfully processed job",
Expand All @@ -527,7 +523,6 @@ func (i *BlockBuilder) loadRecords(ctx context.Context, c *kgo.Client, partition
})

var (
lastConsumedOffset = offsets.Min - 1
lastSeenOffset = offsets.Min - 1
boff = backoff.New(ctx, i.cfg.Backoff)
consecutiveTimeouts = 0
Expand All @@ -536,7 +531,7 @@ func (i *BlockBuilder) loadRecords(ctx context.Context, c *kgo.Client, partition

for lastSeenOffset < offsets.Max && boff.Ongoing() {
if consecutiveTimeouts >= maxTimeouts {
return lastConsumedOffset, fmt.Errorf("exceeded maximum consecutive timeouts (%d) while polling records", maxTimeouts)
return lastSeenOffset, fmt.Errorf("exceeded maximum consecutive timeouts (%d) while polling records", maxTimeouts)
}

if err := context.Cause(ctx); err != nil {
Expand All @@ -545,7 +540,7 @@ func (i *BlockBuilder) loadRecords(ctx context.Context, c *kgo.Client, partition

// Add timeout for each poll operation
pollCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
fs := c.PollRecords(pollCtx, int(offsets.Max-lastConsumedOffset))
fs := c.PollRecords(pollCtx, int(offsets.Max-lastSeenOffset))
cancel()

if err := fs.Err(); err != nil {
Expand Down Expand Up @@ -593,7 +588,6 @@ func (i *BlockBuilder) loadRecords(ctx context.Context, c *kgo.Client, partition
return 0, fmt.Errorf("failed to decode record: %w", err)
}

lastConsumedOffset = record.Offset
if len(stream.Entries) == 0 {
continue
}
Expand All @@ -619,7 +613,7 @@ func (i *BlockBuilder) loadRecords(ctx context.Context, c *kgo.Client, partition
}
}

return lastConsumedOffset, boff.Err()
return lastSeenOffset, boff.Err()
}

func withBackoff[T any](
Expand Down

0 comments on commit 8601fc6

Please sign in to comment.