diff --git a/sdk/meta/conn.go b/sdk/meta/conn.go index aa5c46fd16..dccaec84e7 100644 --- a/sdk/meta/conn.go +++ b/sdk/meta/conn.go @@ -17,7 +17,6 @@ package meta import ( "fmt" "net" - "sync/atomic" "syscall" "time" @@ -27,6 +26,11 @@ import ( "github.com/chubaofs/chubaofs/util/log" ) +const ( + SendRetryInterval = 100 * time.Millisecond + SendTimeLimit = 60 * time.Second +) + type MetaConn struct { conn *net.TCPConn id uint64 //PartitionID @@ -55,13 +59,11 @@ func (mw *MetaWrapper) putConn(mc *MetaConn, err error) { func (mw *MetaWrapper) sendToMetaPartition(mp *MetaPartition, req *proto.Packet) (*proto.Packet, error) { var ( - resp *proto.Packet - err error - addr string - mc *MetaConn - start time.Time - timeout time.Duration - interval time.Duration + resp *proto.Packet + err error + addr string + mc *MetaConn + start time.Time ) errs := make(map[int]error, len(mp.Members)) var j int @@ -85,8 +87,6 @@ func (mw *MetaWrapper) sendToMetaPartition(mp *MetaPartition, req *proto.Packet) retry: start = time.Now() - timeout = time.Duration(atomic.LoadInt64(&mw.userConfig.sendRetryTimeoutSeconds)) - interval = time.Duration(atomic.LoadInt64(&mw.userConfig.sendRetryIntervalMilliseconds)) for i := 1; ; i++ { for j, addr = range mp.Members { mc, err = mw.getConn(mp.PartitionID, addr) @@ -107,12 +107,12 @@ retry: } log.LogWarnf("sendToMetaPartition: retry failed req(%v) mp(%v) mc(%v) errs(%v) resp(%v)", req, mp, mc, errs, resp) } - if time.Since(start) > timeout { + if time.Since(start) > SendTimeLimit { log.LogWarnf("sendToMetaPartition: retry timeout req(%v) mp(%v) time(%v)", req, mp, time.Since(start)) break } - log.LogWarnf("sendToMetaPartition: req(%v) mp(%v) retry in (%v) count(%v)", req, mp, interval, i) - time.Sleep(interval) + log.LogWarnf("sendToMetaPartition: req(%v) mp(%v) retry in (%v) count(%v)", req, mp, SendRetryInterval, i) + time.Sleep(SendRetryInterval) } out: diff --git a/sdk/meta/meta.go b/sdk/meta/meta.go index 929534bd79..356ec7a727 100644 --- a/sdk/meta/meta.go +++ b/sdk/meta/meta.go @@ -16,10 +16,7 @@ package meta import ( "fmt" - "net/http" - "strconv" "sync" - "sync/atomic" "syscall" "time" @@ -63,11 +60,6 @@ const ( MinForceUpdateMetaPartitionsInterval = 5 ) -const ( - SendRetryIntervalMilliseconds int64 = 100 - SendRetryTimeoutSeconds int64 = 60 -) - type AsyncTaskErrorFunc func(err error) func (f AsyncTaskErrorFunc) OnError(err error) { @@ -88,11 +80,6 @@ type MetaConfig struct { OnAsyncTaskError AsyncTaskErrorFunc } -type userConfig struct { - sendRetryIntervalMilliseconds int64 - sendRetryTimeoutSeconds int64 -} - type MetaWrapper struct { sync.RWMutex cluster string @@ -142,8 +129,6 @@ type MetaWrapper struct { // Used to trigger and throttle instant partition updates forceUpdate chan struct{} forceUpdateLimit *rate.Limiter - - userConfig userConfig } //the ticket from authnode @@ -154,44 +139,6 @@ type Ticket struct { Ticket string `json:"ticket"` } -func (mw *MetaWrapper) MetaWrapperConfig(w http.ResponseWriter, r *http.Request) { - var sendRetryInterval = atomic.LoadInt64(&mw.userConfig.sendRetryIntervalMilliseconds) - var sendRetryTimeout = atomic.LoadInt64(&mw.userConfig.sendRetryTimeoutSeconds) - - if err := r.ParseForm(); err != nil { - w.Write([]byte(err.Error())) - return - } - - if str := r.FormValue("sendRetryIntervalMilliseconds"); str != "" { - val, err := strconv.ParseInt(str, 10, 64) - if err != nil { - w.Write([]byte("Set send retry interval failed\n")) - } else { - atomic.StoreInt64(&mw.userConfig.sendRetryIntervalMilliseconds, val*int64(time.Millisecond)) - w.Write([]byte(fmt.Sprintf("Set retry interval to %v millisecond successfully\n", val))) - sendRetryInterval = val * int64(time.Millisecond) - } - } - - if str := r.FormValue("sendRetryTimeoutSeconds"); str != "" { - val, err := strconv.ParseInt(str, 10, 64) - if err != nil { - w.Write([]byte("Set send retry timeout failed\n")) - } else if val*int64(time.Second) < sendRetryInterval { - w.Write([]byte(fmt.Sprintf("Set send retry timeout failed: less than interval %v milliseconds\n", - sendRetryInterval))) - } else { - atomic.StoreInt64(&mw.userConfig.sendRetryTimeoutSeconds, val*int64(time.Second)) - w.Write([]byte(fmt.Sprintf("Set retry timeout to %v second successfully\n", val))) - sendRetryTimeout = val * int64(time.Second) - } - } - - w.Write([]byte(fmt.Sprintf("Set retry config successfully: interval %s timeout %s\n", - time.Duration(sendRetryInterval), time.Duration(sendRetryTimeout)))) -} - func NewMetaWrapper(config *MetaConfig) (*MetaWrapper, error) { var err error mw := new(MetaWrapper) @@ -230,8 +177,6 @@ func NewMetaWrapper(config *MetaConfig) (*MetaWrapper, error) { mw.partCond = sync.NewCond(&mw.partMutex) mw.forceUpdate = make(chan struct{}, 1) mw.forceUpdateLimit = rate.NewLimiter(1, MinForceUpdateMetaPartitionsInterval) - mw.userConfig.sendRetryTimeoutSeconds = SendRetryTimeoutSeconds * int64(time.Second) - mw.userConfig.sendRetryIntervalMilliseconds = SendRetryIntervalMilliseconds * int64(time.Millisecond) limit := MaxMountRetryLimit @@ -254,8 +199,6 @@ func NewMetaWrapper(config *MetaConfig) (*MetaWrapper, error) { return nil, err } - http.HandleFunc("/mwconf", mw.MetaWrapperConfig) - go mw.refresh() return mw, nil }