Skip to content

Commit

Permalink
update addMessage timeout interface & fix uncommit bug
Browse files Browse the repository at this point in the history
  • Loading branch information
wangfan committed Mar 9, 2020
1 parent f220291 commit cfa0996
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 3 deletions.
3 changes: 1 addition & 2 deletions consumer/MessageReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,8 @@ func (r *MessageReader) ShouldCommit(isContinuous bool) bool {
(r.finishedOffset-r.lastCommitOffset >= r.commitThreshold)
} else {
return (utils.CurrentTimeMills()-r.lastCommitTime >= r.commitInterval) &&
(r.finishedOffset-r.lastCommitOffset >= r.commitThreshold)
(r.finishedOffset > r.lastCommitOffset)
}

}

func (r *MessageReader) processFetchException(err error) {
Expand Down
2 changes: 1 addition & 1 deletion example/talos_producer/TalosProducerDemo.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func main() {
galaxy.talos.service.endpoint=$talosServiceURI
*/
var propertyFilename string
flag.StringVar(&propertyFilename, "conf", "talosProducer.conf", "conf: talosConsumer.conf'")
flag.StringVar(&propertyFilename, "conf", "talosProducer.conf", "conf: talosProducer.conf'")
flag.Parse()

var err error
Expand Down
41 changes: 41 additions & 0 deletions producer/TalosProducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,47 @@ func NewDefaultTalosProducer(producerConfig *TalosProducerConfig, credential *au
return talosProducer, nil
}

func (p *TalosProducer) AddUserMessageWithTimeout(msgList []*message.Message, timeoutMillis int64) error {
p.producerLock.Lock()
defer p.producerLock.Unlock()

// check producer state
if !p.IsActive() {
return fmt.Errorf("Producer is not active, "+
"current state: %d ", p.producerState)
}

// check total buffered message number
startWaitTime := utils.CurrentTimeMills()
for p.bufferedCount.IsFull() {
log.Infof("too many buffered messages, globalLock is active."+
" message number: %d, message bytes: %d",
p.bufferedCount.GetBufferedMsgNumber(),
p.bufferedCount.GetBufferedMsgBytes())
p.producerLock.Unlock()
p.BufferFullChan <- utils.NOTIFY

// judging wait exit by 'timeout' or 'notify'
select {
case <-p.NotifyChan:
// if receive notify, just break wait and judge if should addMessage
case <-time.After(time.Duration(timeoutMillis) * time.Millisecond):
if utils.CurrentTimeMills()-startWaitTime >= timeoutMillis {
p.producerLock.Lock()
return fmt.Errorf("Producer buffer is full and AddUserMessage"+
" timeout by: %d millis. ", timeoutMillis)
}
}

p.producerLock.Lock()
}

if err := p.DoAddUserMessage(msgList); err != nil {
return err
}
return nil
}

func (p *TalosProducer) AddUserMessage(msgList []*message.Message) error {
p.producerLock.Lock()
defer p.producerLock.Unlock()
Expand Down

0 comments on commit cfa0996

Please sign in to comment.