Skip to content

Commit

Permalink
增加是否强制提交参数
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangzy2345 committed Dec 14, 2022
1 parent 0d3a3e5 commit a7e4d69
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 13 deletions.
23 changes: 12 additions & 11 deletions kq/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ const (

type KqConf struct {
service.ServiceConf
Brokers []string
Group string
Topic string
Offset string `json:",options=first|last,default=last"`
Conns int `json:",default=1"`
Consumers int `json:",default=8"`
Processors int `json:",default=8"`
MinBytes int `json:",default=10240"` // 10K
MaxBytes int `json:",default=10485760"` // 10M
Username string `json:",optional"`
Password string `json:",optional"`
Brokers []string
Group string
Topic string
Offset string `json:",options=first|last,default=last"`
Conns int `json:",default=1"`
Consumers int `json:",default=8"`
Processors int `json:",default=8"`
MinBytes int `json:",default=10240"` // 10K
MaxBytes int `json:",default=10485760"` // 10M
Username string `json:",optional"`
Password string `json:",optional"`
ForceCommit bool `json:",default=true"`
}
7 changes: 5 additions & 2 deletions kq/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,12 @@ func (q *kafkaQueue) startConsumers() {
for msg := range q.channel {
if err := q.consumeOne(string(msg.Key), string(msg.Value)); err != nil {
logx.Errorf("Error on consuming: %s, error: %v", string(msg.Value), err)
} else {
q.consumer.CommitMessages(context.Background(), msg)
if q.c.ForceCommit {
q.consumer.CommitMessages(context.Background(), msg)
}
continue
}
q.consumer.CommitMessages(context.Background(), msg)
}
})
}
Expand Down

0 comments on commit a7e4d69

Please sign in to comment.