From 5aed0f3c49da092bd01457d6d2964d7be3302bf7 Mon Sep 17 00:00:00 2001 From: wangfan Date: Fri, 3 Jul 2020 11:50:09 +0800 Subject: [PATCH] modify outofrange process --- consumer/MessageReader.go | 12 ++++++------ consumer/SimpleConsumer.go | 3 +-- consumer/TalosMessageReader.go | 13 +++---------- utils/Utils.go | 10 ++-------- 4 files changed, 12 insertions(+), 26 deletions(-) diff --git a/consumer/MessageReader.go b/consumer/MessageReader.go index 1264598..2b3420e 100644 --- a/consumer/MessageReader.go +++ b/consumer/MessageReader.go @@ -133,10 +133,8 @@ func (r *MessageReader) processFetchException(err error) { "sleep a while for waiting it work.", r.topicAndPartition.GetPartitionId(), err.Error()) time.Sleep(time.Duration(r.consumerConfig.GetWaitPartitionWorkingTime()) * time.Millisecond) - } - - // process message offset out of range, reset start offset - if utils.IsOffsetOutOfRange(err) { + } else if utils.IsOffsetOutOfRange(err) { + // process message offset out of range, reset start offset if r.consumerConfig.resetLatestOffsetWhenOutOfRange { r.log.Warnf("Got PartitionOutOfRange error, offset by current latest offset.") atomic.StoreInt64(r.startOffset, int64(message.MessageOffset_LATEST_OFFSET)) @@ -147,7 +145,9 @@ func (r *MessageReader) processFetchException(err error) { r.lastCommitOffset, r.finishedOffset = -1, -1 r.lastCommitTime = utils.CurrentTimeMills() } + } else { + r.log.Errorf("Reading message from topic: %v of partition: %d failed: %s", + r.topicAndPartition.GetTopicTalosResourceName(), + r.topicAndPartition.GetPartitionId(), err) } - - r.log.Warnf("process unexcepted fetchException: %s", err.Error()) } diff --git a/consumer/SimpleConsumer.go b/consumer/SimpleConsumer.go index 478c787..9f8317c 100644 --- a/consumer/SimpleConsumer.go +++ b/consumer/SimpleConsumer.go @@ -231,7 +231,7 @@ func (c *SimpleConsumer) FetchMessage(startOffset, maxFetchedNumber int64) ( c.topicAndPartition).GetMessage(getMessageRequest) if err != nil { if c.scheduleInfoCache != nil && c.scheduleInfoCache.IsAutoLocation() { - c.log.Warnf("can't connect to the host directly, refresh "+ + c.log.Debugf("can't connect to the host directly, refresh "+ "scheduleInfo and retry using url. The exception is: %s."+ " Ignore this if not frequently.", err.Error()) c.scheduleInfoCache.UpdateScheduleInfoCache() @@ -239,7 +239,6 @@ func (c *SimpleConsumer) FetchMessage(startOffset, maxFetchedNumber int64) ( getMessageRequest.TimeoutTimestamp = ×tamp getMessageResponse, err = c.messageClient.GetMessage(getMessageRequest) if err != nil { - c.log.Errorf("getMessage error: %s", err.Error()) return nil, err } } else { diff --git a/consumer/TalosMessageReader.go b/consumer/TalosMessageReader.go index a2d2cde..6e1e2c1 100644 --- a/consumer/TalosMessageReader.go +++ b/consumer/TalosMessageReader.go @@ -10,7 +10,6 @@ import ( "sync/atomic" "time" - "github.com/XiaoMi/talos-sdk-golang/thrift/common" "github.com/XiaoMi/talos-sdk-golang/thrift/consumer" "github.com/XiaoMi/talos-sdk-golang/utils" "github.com/sirupsen/logrus" @@ -84,15 +83,9 @@ func (r *TalosMessageReader) FetchData() { messageList, err := r.simpleConsumer.FetchMessage( atomic.LoadInt64(r.startOffset), r.consumerConfig.GetMaxFetchRecords()) if err != nil { - if t, ok := err.(*common.GalaxyTalosException); ok { - r.log.Errorf("Reading message from topic: %v of partition: %d failed: %s", - r.topicAndPartition.GetTopicTalosResourceName(), - r.topicAndPartition.GetPartitionId(), t.GetDetails()) - r.processFetchException(t) - r.lastFetchTime = utils.CurrentTimeMills() - return - } - r.log.Errorf("Unknow Exception when fetchMessage: %s", err.Error()) + r.processFetchException(err) + r.lastFetchTime = utils.CurrentTimeMills() + return } r.lastFetchTime = utils.CurrentTimeMills() diff --git a/utils/Utils.go b/utils/Utils.go index 4aaba84..cbd0647 100644 --- a/utils/Utils.go +++ b/utils/Utils.go @@ -219,17 +219,11 @@ func IsTopicNotExist(err error) bool { } func IsPartitionNotServing(err error) bool { - if talosError, ok := err.(*common.GalaxyTalosException); ok { - return talosError.GetErrorCode() == common.ErrorCode_PARTITION_NOT_SERVING - } - return false + return strings.Contains(err.Error(), "partition not serving error") } func IsOffsetOutOfRange(err error) bool { - if talosError, ok := err.(*common.GalaxyTalosException); ok { - return talosError.GetErrorCode() == common.ErrorCode_MESSAGE_OFFSET_OUT_OF_RANGE - } - return false + return strings.Contains(err.Error(), "Message out range") } func UpdateMessage(msg *message.Message, messageType message.MessageType) {