Skip to content
This repository has been archived by the owner on Jun 23, 2023. It is now read-only.

Commit

Permalink
consumer: use default retry interval when MaxRetryInterval is zero
Browse files Browse the repository at this point in the history
It seems that kafka-go is setting MaxRetryInterval to zero
if a service doesn't explicitly set a retry interval, and
that will result in backoff with no time limit on the retry interval,
so use the default retry interval whenever MaxRetryInterval
is zero.

As retries have been removed from felice in master but we want
to retrofix this without upgrading kafka-go to the latest felice,
this is targetted at the sendmessages-v0.4.0 feature branch,
which will also contain the new SendMessages feature backported.
  • Loading branch information
rogpeppe committed Dec 12, 2019
1 parent 9e5bce5 commit ca6bcb5
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 1 deletion.
3 changes: 2 additions & 1 deletion consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type Config struct {
KeyCodec codec.Codec
}

const defaultRetryInterval = 5 * time.Second

// NewConfig creates a config with sane defaults.
// The Sarama Cluster group mode will always be overwritten by the consumer
// and thus cannot be changed, as the consumer is designed to use the ConsumerModePartitions mode.
Expand All @@ -40,7 +42,6 @@ func NewConfig(clientID string) Config {
c.Group.Mode = cluster.ConsumerModePartitions

// Felice consumer configuration
c.MaxRetryInterval = 5 * time.Second
c.KeyCodec = codec.String() // defaults to String
return c
}
3 changes: 3 additions & 0 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ func (c *Consumer) setup() {
// Note: the logic in handleMsg assumes that
// this does not terminate; be aware of that when changing
// this strategy.
if c.config.MaxRetryInterval <= 0 {
c.config.MaxRetryInterval = defaultRetryInterval
}
c.retryStrategy = retry.Exponential{
Initial: time.Millisecond,
Factor: 2,
Expand Down

0 comments on commit ca6bcb5

Please sign in to comment.