Skip to content

Commit

Permalink
Merge pull request #39 from a289459798/master
Browse files Browse the repository at this point in the history
增加是否强制提交参数
  • Loading branch information
kevwan authored Dec 16, 2022
2 parents 3e19b1f + a7e4d69 commit 6697b1c
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 11 deletions.
23 changes: 12 additions & 11 deletions kq/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ const (

type KqConf struct {
service.ServiceConf
Brokers []string
Group string
Topic string
Offset string `json:",options=first|last,default=last"`
Conns int `json:",default=1"`
Consumers int `json:",default=8"`
Processors int `json:",default=8"`
MinBytes int `json:",default=10240"` // 10K
MaxBytes int `json:",default=10485760"` // 10M
Username string `json:",optional"`
Password string `json:",optional"`
Brokers []string
Group string
Topic string
Offset string `json:",options=first|last,default=last"`
Conns int `json:",default=1"`
Consumers int `json:",default=8"`
Processors int `json:",default=8"`
MinBytes int `json:",default=10240"` // 10K
MaxBytes int `json:",default=10485760"` // 10M
Username string `json:",optional"`
Password string `json:",optional"`
ForceCommit bool `json:",default=true"`
}
4 changes: 4 additions & 0 deletions kq/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ func (q *kafkaQueue) startConsumers() {
for msg := range q.channel {
if err := q.consumeOne(string(msg.Key), string(msg.Value)); err != nil {
logx.Errorf("Error on consuming: %s, error: %v", string(msg.Value), err)
if q.c.ForceCommit {
q.consumer.CommitMessages(context.Background(), msg)
}
continue
}
q.consumer.CommitMessages(context.Background(), msg)
}
Expand Down

0 comments on commit 6697b1c

Please sign in to comment.