Skip to content

Commit

Permalink
Revert "sdk: Add mwconf API to config MetaWrapper"
Browse files Browse the repository at this point in the history
This reverts commit bba640a.

Signed-off-by: shengyong002 <[email protected]>
  • Loading branch information
shengyong002 authored and liushuoran001 committed Mar 9, 2022
1 parent 974d33b commit d3ee7ae
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 70 deletions.
26 changes: 13 additions & 13 deletions sdk/meta/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package meta
import (
"fmt"
"net"
"sync/atomic"
"syscall"
"time"

Expand All @@ -27,6 +26,11 @@ import (
"github.com/chubaofs/chubaofs/util/log"
)

const (
SendRetryInterval = 100 * time.Millisecond
SendTimeLimit = 60 * time.Second
)

type MetaConn struct {
conn *net.TCPConn
id uint64 //PartitionID
Expand Down Expand Up @@ -55,13 +59,11 @@ func (mw *MetaWrapper) putConn(mc *MetaConn, err error) {

func (mw *MetaWrapper) sendToMetaPartition(mp *MetaPartition, req *proto.Packet) (*proto.Packet, error) {
var (
resp *proto.Packet
err error
addr string
mc *MetaConn
start time.Time
timeout time.Duration
interval time.Duration
resp *proto.Packet
err error
addr string
mc *MetaConn
start time.Time
)
errs := make(map[int]error, len(mp.Members))
var j int
Expand All @@ -85,8 +87,6 @@ func (mw *MetaWrapper) sendToMetaPartition(mp *MetaPartition, req *proto.Packet)

retry:
start = time.Now()
timeout = time.Duration(atomic.LoadInt64(&mw.userConfig.sendRetryTimeoutSeconds))
interval = time.Duration(atomic.LoadInt64(&mw.userConfig.sendRetryIntervalMilliseconds))
for i := 1; ; i++ {
for j, addr = range mp.Members {
mc, err = mw.getConn(mp.PartitionID, addr)
Expand All @@ -107,12 +107,12 @@ retry:
}
log.LogWarnf("sendToMetaPartition: retry failed req(%v) mp(%v) mc(%v) errs(%v) resp(%v)", req, mp, mc, errs, resp)
}
if time.Since(start) > timeout {
if time.Since(start) > SendTimeLimit {
log.LogWarnf("sendToMetaPartition: retry timeout req(%v) mp(%v) time(%v)", req, mp, time.Since(start))
break
}
log.LogWarnf("sendToMetaPartition: req(%v) mp(%v) retry in (%v) count(%v)", req, mp, interval, i)
time.Sleep(interval)
log.LogWarnf("sendToMetaPartition: req(%v) mp(%v) retry in (%v) count(%v)", req, mp, SendRetryInterval, i)
time.Sleep(SendRetryInterval)
}

out:
Expand Down
57 changes: 0 additions & 57 deletions sdk/meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ package meta

import (
"fmt"
"net/http"
"strconv"
"sync"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -63,11 +60,6 @@ const (
MinForceUpdateMetaPartitionsInterval = 5
)

const (
SendRetryIntervalMilliseconds int64 = 100
SendRetryTimeoutSeconds int64 = 60
)

type AsyncTaskErrorFunc func(err error)

func (f AsyncTaskErrorFunc) OnError(err error) {
Expand All @@ -88,11 +80,6 @@ type MetaConfig struct {
OnAsyncTaskError AsyncTaskErrorFunc
}

type userConfig struct {
sendRetryIntervalMilliseconds int64
sendRetryTimeoutSeconds int64
}

type MetaWrapper struct {
sync.RWMutex
cluster string
Expand Down Expand Up @@ -142,8 +129,6 @@ type MetaWrapper struct {
// Used to trigger and throttle instant partition updates
forceUpdate chan struct{}
forceUpdateLimit *rate.Limiter

userConfig userConfig
}

//the ticket from authnode
Expand All @@ -154,44 +139,6 @@ type Ticket struct {
Ticket string `json:"ticket"`
}

func (mw *MetaWrapper) MetaWrapperConfig(w http.ResponseWriter, r *http.Request) {
var sendRetryInterval = atomic.LoadInt64(&mw.userConfig.sendRetryIntervalMilliseconds)
var sendRetryTimeout = atomic.LoadInt64(&mw.userConfig.sendRetryTimeoutSeconds)

if err := r.ParseForm(); err != nil {
w.Write([]byte(err.Error()))
return
}

if str := r.FormValue("sendRetryIntervalMilliseconds"); str != "" {
val, err := strconv.ParseInt(str, 10, 64)
if err != nil {
w.Write([]byte("Set send retry interval failed\n"))
} else {
atomic.StoreInt64(&mw.userConfig.sendRetryIntervalMilliseconds, val*int64(time.Millisecond))
w.Write([]byte(fmt.Sprintf("Set retry interval to %v millisecond successfully\n", val)))
sendRetryInterval = val * int64(time.Millisecond)
}
}

if str := r.FormValue("sendRetryTimeoutSeconds"); str != "" {
val, err := strconv.ParseInt(str, 10, 64)
if err != nil {
w.Write([]byte("Set send retry timeout failed\n"))
} else if val*int64(time.Second) < sendRetryInterval {
w.Write([]byte(fmt.Sprintf("Set send retry timeout failed: less than interval %v milliseconds\n",
sendRetryInterval)))
} else {
atomic.StoreInt64(&mw.userConfig.sendRetryTimeoutSeconds, val*int64(time.Second))
w.Write([]byte(fmt.Sprintf("Set retry timeout to %v second successfully\n", val)))
sendRetryTimeout = val * int64(time.Second)
}
}

w.Write([]byte(fmt.Sprintf("Set retry config successfully: interval %s timeout %s\n",
time.Duration(sendRetryInterval), time.Duration(sendRetryTimeout))))
}

func NewMetaWrapper(config *MetaConfig) (*MetaWrapper, error) {
var err error
mw := new(MetaWrapper)
Expand Down Expand Up @@ -230,8 +177,6 @@ func NewMetaWrapper(config *MetaConfig) (*MetaWrapper, error) {
mw.partCond = sync.NewCond(&mw.partMutex)
mw.forceUpdate = make(chan struct{}, 1)
mw.forceUpdateLimit = rate.NewLimiter(1, MinForceUpdateMetaPartitionsInterval)
mw.userConfig.sendRetryTimeoutSeconds = SendRetryTimeoutSeconds * int64(time.Second)
mw.userConfig.sendRetryIntervalMilliseconds = SendRetryIntervalMilliseconds * int64(time.Millisecond)

limit := MaxMountRetryLimit

Expand All @@ -254,8 +199,6 @@ func NewMetaWrapper(config *MetaConfig) (*MetaWrapper, error) {
return nil, err
}

http.HandleFunc("/mwconf", mw.MetaWrapperConfig)

go mw.refresh()
return mw, nil
}
Expand Down

0 comments on commit d3ee7ae

Please sign in to comment.