From cfa0996f5c7a6a97a08a276de538f59dac68a7bf Mon Sep 17 00:00:00 2001 From: wangfan Date: Mon, 9 Mar 2020 17:02:36 +0800 Subject: [PATCH] update addMessage timeout interface & fix uncommit bug --- consumer/MessageReader.go | 3 +- example/talos_producer/TalosProducerDemo.go | 2 +- producer/TalosProducer.go | 41 +++++++++++++++++++++ 3 files changed, 43 insertions(+), 3 deletions(-) diff --git a/consumer/MessageReader.go b/consumer/MessageReader.go index ff4e1e0..1401ab2 100644 --- a/consumer/MessageReader.go +++ b/consumer/MessageReader.go @@ -122,9 +122,8 @@ func (r *MessageReader) ShouldCommit(isContinuous bool) bool { (r.finishedOffset-r.lastCommitOffset >= r.commitThreshold) } else { return (utils.CurrentTimeMills()-r.lastCommitTime >= r.commitInterval) && - (r.finishedOffset-r.lastCommitOffset >= r.commitThreshold) + (r.finishedOffset > r.lastCommitOffset) } - } func (r *MessageReader) processFetchException(err error) { diff --git a/example/talos_producer/TalosProducerDemo.go b/example/talos_producer/TalosProducerDemo.go index 55c84b8..cf884eb 100644 --- a/example/talos_producer/TalosProducerDemo.go +++ b/example/talos_producer/TalosProducerDemo.go @@ -57,7 +57,7 @@ func main() { galaxy.talos.service.endpoint=$talosServiceURI */ var propertyFilename string - flag.StringVar(&propertyFilename, "conf", "talosProducer.conf", "conf: talosConsumer.conf'") + flag.StringVar(&propertyFilename, "conf", "talosProducer.conf", "conf: talosProducer.conf'") flag.Parse() var err error diff --git a/producer/TalosProducer.go b/producer/TalosProducer.go index ce02055..ae5ef7d 100644 --- a/producer/TalosProducer.go +++ b/producer/TalosProducer.go @@ -182,6 +182,47 @@ func NewDefaultTalosProducer(producerConfig *TalosProducerConfig, credential *au return talosProducer, nil } +func (p *TalosProducer) AddUserMessageWithTimeout(msgList []*message.Message, timeoutMillis int64) error { + p.producerLock.Lock() + defer p.producerLock.Unlock() + + // check producer state + if !p.IsActive() { + return fmt.Errorf("Producer is not active, "+ + "current state: %d ", p.producerState) + } + + // check total buffered message number + startWaitTime := utils.CurrentTimeMills() + for p.bufferedCount.IsFull() { + log.Infof("too many buffered messages, globalLock is active."+ + " message number: %d, message bytes: %d", + p.bufferedCount.GetBufferedMsgNumber(), + p.bufferedCount.GetBufferedMsgBytes()) + p.producerLock.Unlock() + p.BufferFullChan <- utils.NOTIFY + + // judging wait exit by 'timeout' or 'notify' + select { + case <-p.NotifyChan: + // if receive notify, just break wait and judge if should addMessage + case <-time.After(time.Duration(timeoutMillis) * time.Millisecond): + if utils.CurrentTimeMills()-startWaitTime >= timeoutMillis { + p.producerLock.Lock() + return fmt.Errorf("Producer buffer is full and AddUserMessage"+ + " timeout by: %d millis. ", timeoutMillis) + } + } + + p.producerLock.Lock() + } + + if err := p.DoAddUserMessage(msgList); err != nil { + return err + } + return nil +} + func (p *TalosProducer) AddUserMessage(msgList []*message.Message) error { p.producerLock.Lock() defer p.producerLock.Unlock()