Skip to content

Commit

Permalink
modify outofrange process
Browse files Browse the repository at this point in the history
  • Loading branch information
wangfan committed Jul 3, 2020
1 parent 0106019 commit 5aed0f3
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 26 deletions.
12 changes: 6 additions & 6 deletions consumer/MessageReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,8 @@ func (r *MessageReader) processFetchException(err error) {
"sleep a while for waiting it work.",
r.topicAndPartition.GetPartitionId(), err.Error())
time.Sleep(time.Duration(r.consumerConfig.GetWaitPartitionWorkingTime()) * time.Millisecond)
}

// process message offset out of range, reset start offset
if utils.IsOffsetOutOfRange(err) {
} else if utils.IsOffsetOutOfRange(err) {
// process message offset out of range, reset start offset
if r.consumerConfig.resetLatestOffsetWhenOutOfRange {
r.log.Warnf("Got PartitionOutOfRange error, offset by current latest offset.")
atomic.StoreInt64(r.startOffset, int64(message.MessageOffset_LATEST_OFFSET))
Expand All @@ -147,7 +145,9 @@ func (r *MessageReader) processFetchException(err error) {
r.lastCommitOffset, r.finishedOffset = -1, -1
r.lastCommitTime = utils.CurrentTimeMills()
}
} else {
r.log.Errorf("Reading message from topic: %v of partition: %d failed: %s",
r.topicAndPartition.GetTopicTalosResourceName(),
r.topicAndPartition.GetPartitionId(), err)
}

r.log.Warnf("process unexcepted fetchException: %s", err.Error())
}
3 changes: 1 addition & 2 deletions consumer/SimpleConsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,15 +231,14 @@ func (c *SimpleConsumer) FetchMessage(startOffset, maxFetchedNumber int64) (
c.topicAndPartition).GetMessage(getMessageRequest)
if err != nil {
if c.scheduleInfoCache != nil && c.scheduleInfoCache.IsAutoLocation() {
c.log.Warnf("can't connect to the host directly, refresh "+
c.log.Debugf("can't connect to the host directly, refresh "+
"scheduleInfo and retry using url. The exception is: %s."+
" Ignore this if not frequently.", err.Error())
c.scheduleInfoCache.UpdateScheduleInfoCache()
timestamp := utils.CurrentTimeMills() + c.consumerConfig.ClientTimeout()
getMessageRequest.TimeoutTimestamp = &timestamp
getMessageResponse, err = c.messageClient.GetMessage(getMessageRequest)
if err != nil {
c.log.Errorf("getMessage error: %s", err.Error())
return nil, err
}
} else {
Expand Down
13 changes: 3 additions & 10 deletions consumer/TalosMessageReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"sync/atomic"
"time"

"github.com/XiaoMi/talos-sdk-golang/thrift/common"
"github.com/XiaoMi/talos-sdk-golang/thrift/consumer"
"github.com/XiaoMi/talos-sdk-golang/utils"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -84,15 +83,9 @@ func (r *TalosMessageReader) FetchData() {
messageList, err := r.simpleConsumer.FetchMessage(
atomic.LoadInt64(r.startOffset), r.consumerConfig.GetMaxFetchRecords())
if err != nil {
if t, ok := err.(*common.GalaxyTalosException); ok {
r.log.Errorf("Reading message from topic: %v of partition: %d failed: %s",
r.topicAndPartition.GetTopicTalosResourceName(),
r.topicAndPartition.GetPartitionId(), t.GetDetails())
r.processFetchException(t)
r.lastFetchTime = utils.CurrentTimeMills()
return
}
r.log.Errorf("Unknow Exception when fetchMessage: %s", err.Error())
r.processFetchException(err)
r.lastFetchTime = utils.CurrentTimeMills()
return
}

r.lastFetchTime = utils.CurrentTimeMills()
Expand Down
10 changes: 2 additions & 8 deletions utils/Utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,17 +219,11 @@ func IsTopicNotExist(err error) bool {
}

func IsPartitionNotServing(err error) bool {
if talosError, ok := err.(*common.GalaxyTalosException); ok {
return talosError.GetErrorCode() == common.ErrorCode_PARTITION_NOT_SERVING
}
return false
return strings.Contains(err.Error(), "partition not serving error")
}

func IsOffsetOutOfRange(err error) bool {
if talosError, ok := err.(*common.GalaxyTalosException); ok {
return talosError.GetErrorCode() == common.ErrorCode_MESSAGE_OFFSET_OUT_OF_RANGE
}
return false
return strings.Contains(err.Error(), "Message out range")
}

func UpdateMessage(msg *message.Message, messageType message.MessageType) {
Expand Down

0 comments on commit 5aed0f3

Please sign in to comment.