Skip to content

Commit

Permalink
Revert "Check time spent on attempting RPC to avoid spending too much…
Browse files Browse the repository at this point in the history
… time on retrying (#1117) (#1131)" (#1133)

This reverts commit 062f4c9.

Signed-off-by: MyonKeminta <[email protected]>
Co-authored-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta and MyonKeminta authored Jan 24, 2024
1 parent 062f4c9 commit 5c8e782
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 159 deletions.
3 changes: 0 additions & 3 deletions internal/locate/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@ package locate
import (
"testing"

"github.com/tikv/client-go/v2/util"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
util.EnableFailpoints()

opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/pingcap/goleveldb/leveldb.(*DB).mpoolDrain"),
}
Expand Down
68 changes: 18 additions & 50 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,19 +236,18 @@ func (s *RegionRequestSender) SendReq(bo *retry.Backoffer, req *tikvrpc.Request,
}

type replica struct {
store *Store
peer *metapb.Peer
epoch uint32
attempts int
attemptedTime time.Duration
store *Store
peer *metapb.Peer
epoch uint32
attempts int
}

func (r *replica) isEpochStale() bool {
return r.epoch != atomic.LoadUint32(&r.store.epoch)
}

func (r *replica) isExhausted(maxAttempt int, maxAttemptTime time.Duration) bool {
return r.attempts >= maxAttempt || (maxAttemptTime > 0 && r.attemptedTime >= maxAttemptTime)
func (r *replica) isExhausted(maxAttempt int) bool {
return r.attempts >= maxAttempt
}

type replicaSelector struct {
Expand Down Expand Up @@ -337,7 +336,7 @@ func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelec
// will not be wakened up and re-elect the leader until the follower receives
// a request. So, before the new leader is elected, we should not send requests
// to the unreachable old leader to avoid unnecessary timeout.
if liveness != reachable || leader.isExhausted(maxReplicaAttempt, maxReplicaAttemptTime) {
if liveness != reachable || leader.isExhausted(maxReplicaAttempt) {
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx}
return nil, stateChanged{}
}
Expand All @@ -352,7 +351,7 @@ func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *rep
selector.state = &accessByKnownProxy{leaderIdx: state.leaderIdx}
return
}
if liveness != reachable || selector.targetReplica().isExhausted(maxReplicaAttempt, maxReplicaAttemptTime) {
if liveness != reachable || selector.targetReplica().isExhausted(maxReplicaAttempt) {
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx}
}
if liveness != reachable {
Expand Down Expand Up @@ -409,7 +408,7 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (
if selector.targetIdx < 0 {
// Search replica that is not attempted from the last accessed replica
idx, selectReplica := filterReplicas(func(selectReplica *replica) bool {
return !selectReplica.isExhausted(1, 0)
return !selectReplica.isExhausted(1)
})
if selectReplica != nil && idx >= 0 {
state.lastIdx = idx
Expand Down Expand Up @@ -537,7 +536,7 @@ func (state *tryNewProxy) next(bo *retry.Backoffer, selector *replicaSelector) (

func (state *tryNewProxy) isCandidate(idx AccessIndex, replica *replica) bool {
// Try each peer only once
return idx != state.leaderIdx && !replica.isExhausted(1, 0)
return idx != state.leaderIdx && !replica.isExhausted(1)
}

func (state *tryNewProxy) onSendSuccess(selector *replicaSelector) {
Expand Down Expand Up @@ -669,9 +668,9 @@ func (state *accessFollower) IsLeaderExhausted(leader *replica) bool {
// 3. Stale read flag is removed and processing falls back to snapshot read on the leader peer.
// 4. The leader peer should be retried again using snapshot read.
if state.isStaleRead && state.option.leaderOnly {
return leader.isExhausted(2, 0)
return leader.isExhausted(2)
}
return leader.isExhausted(1, 0)
return leader.isExhausted(1)
}

func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) {
Expand All @@ -681,7 +680,7 @@ func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replic
}

func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool {
if replica.isEpochStale() || replica.isExhausted(1, 0) || replica.store.getLivenessState() == unreachable {
if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable {
return false
}
if state.option.leaderOnly && idx == state.leaderIdx {
Expand Down Expand Up @@ -752,16 +751,6 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tik
}
}

if val, err := util.EvalFailpoint("newReplicaSelectorInitialAttemptedTime"); err == nil {
attemptedTime, err := time.ParseDuration(val.(string))
if err != nil {
panic(err)
}
for _, r := range replicas {
r.attemptedTime = attemptedTime
}
}

return &replicaSelector{
regionCache,
cachedRegion,
Expand All @@ -774,13 +763,7 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tik
}, nil
}

const (
maxReplicaAttempt = 10
// The maximum time to allow retrying sending requests after RPC failure. In case an RPC request fails after
// timeout (there might be network issue or the TiKV node stuck), we use this to avoid retrying 10 times which may cost too much time.
// For request using `client.ReadTimeoutShort` which is 30s, it might retry twice which costs 1min.
maxReplicaAttemptTime = time.Second * 50
)
const maxReplicaAttempt = 10

// next creates the RPCContext of the current candidate replica.
// It returns a SendError if runs out of all replicas or the cached region is invalidated.
Expand Down Expand Up @@ -843,9 +826,8 @@ func (s *replicaSelector) refreshRegionStore() {
// request is sent to the leader.
newLeaderIdx := newRegionStore.workTiKVIdx
s.state = &accessKnownLeader{leaderIdx: newLeaderIdx}
if s.replicas[newLeaderIdx].isExhausted(maxReplicaAttempt, maxReplicaAttemptTime) {
s.replicas[newLeaderIdx].attempts = maxReplicaAttempt - 1
s.replicas[newLeaderIdx].attemptedTime = 0
if s.replicas[newLeaderIdx].attempts == maxReplicaAttempt {
s.replicas[newLeaderIdx].attempts--
}
}
}
Expand Down Expand Up @@ -957,11 +939,10 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) {
if replica.store.getLivenessState() != reachable {
return
}
if replica.isExhausted(maxReplicaAttempt, maxReplicaAttemptTime) {
if replica.isExhausted(maxReplicaAttempt) {
// Give the replica one more chance and because each follower is tried only once,
// it won't result in infinite retry.
replica.attempts = maxReplicaAttempt - 1
replica.attemptedTime = 0
}
s.state = &accessKnownLeader{leaderIdx: AccessIndex(i)}
// Update the workTiKVIdx so that following requests can be sent to the leader immediately.
Expand Down Expand Up @@ -1438,12 +1419,8 @@ func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, rpcCtx *RPCCo
if !injectFailOnSend {
start := time.Now()
resp, err = s.client.SendRequest(ctx, sendToAddr, req, timeout)
rpcDuration := time.Since(start)
if s.replicaSelector != nil {
s.replicaSelector.recordAttemptedTime(rpcDuration)
}
if s.Stats != nil {
RecordRegionRequestRuntimeStats(s.Stats, req.Type, rpcDuration)
RecordRegionRequestRuntimeStats(s.Stats, req.Type, time.Since(start))
if val, fpErr := util.EvalFailpoint("tikvStoreRespResult"); fpErr == nil {
if val.(bool) {
if req.Type == tikvrpc.CmdCop && bo.GetTotalSleep() == 0 {
Expand Down Expand Up @@ -1996,12 +1973,3 @@ func (s *replicaSelector) patchRequestSource(req *tikvrpc.Request, rpcCtx *RPCCo
}
sb.WriteString(req.ReadType)
}

func (s *replicaSelector) recordAttemptedTime(duration time.Duration) {
if targetReplica := s.targetReplica(); targetReplica != nil {
targetReplica.attemptedTime += duration
}
if proxyReplica := s.proxyReplica(); proxyReplica != nil {
proxyReplica.attemptedTime += duration
}
}
106 changes: 0 additions & 106 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,12 @@ package locate

import (
"context"
"fmt"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
"unsafe"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -1316,106 +1313,3 @@ func (s *testRegionRequestToThreeStoresSuite) TestRetryRequestSource() {
}
}
}

func (s *testRegionRequestToThreeStoresSuite) TestLeaderStuck() {
key := []byte("key")
value := []byte("value1")

s.NoError(failpoint.Enable("tikvclient/injectLiveness", `return("reachable")`))
defer func() {
s.NoError(failpoint.Disable("tikvclient/injectLiveness"))
}()

region, err := s.regionRequestSender.regionCache.findRegionByKey(s.bo, key, false)
s.Nil(err)
regionStore := region.getStore()
oldLeader, oldLeaderPeer, _, _ := region.WorkStorePeer(regionStore)
// The follower will become the new leader later
follower, followerPeer, _, _ := region.FollowerStorePeer(regionStore, 0, &storeSelectorOp{})

currLeader := struct {
sync.Mutex
addr string
peer *metapb.Peer
}{
addr: oldLeader.addr,
peer: oldLeaderPeer,
}

requestHandled := false

s.regionRequestSender.client = &fnClient{
fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
if addr == oldLeader.addr {
time.Sleep(timeout)
return nil, context.DeadlineExceeded
}

currLeader.Lock()
leaderAddr := currLeader.addr
leaderPeer := currLeader.peer
currLeader.Unlock()

if addr != leaderAddr {
return &tikvrpc.Response{Resp: &kvrpcpb.PrewriteResponse{RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{
RegionId: region.GetID(),
Leader: leaderPeer,
}}}}, nil
}

requestHandled = true
return &tikvrpc.Response{Resp: &kvrpcpb.PrewriteResponse{}}, nil
},
}

// Simulate the attempted time is nearly reached so that the test won't take too much time to run.
// But the `replicaSelector` of the request sender is not initialized yet before sending any request.
// So try to control it by using a failpoint.
s.NoError(failpoint.Enable("tikvclient/newReplicaSelectorInitialAttemptedTime", fmt.Sprintf(`return("%s")`, (maxReplicaAttemptTime-time.Second).String())))
defer func() {
s.NoError(failpoint.Disable("tikvclient/newReplicaSelectorInitialAttemptedTime"))
}()

resCh := make(chan struct {
resp *tikvrpc.Response
err error
})
startTime := time.Now()
go func() {
bo := retry.NewBackoffer(context.Background(), -1)
req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{
Mutations: []*kvrpcpb.Mutation{{
Op: kvrpcpb.Op_Put,
Key: key,
Value: value,
}},
StartVersion: 100,
})
resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, region.VerID(), time.Second*2, tikvrpc.TiKV)
resCh <- struct {
resp *tikvrpc.Response
err error
}{resp: resp, err: err}
}()

select {
case res := <-resCh:
s.Fail("request finished too early", fmt.Sprintf("resp: %s, error: %+q", res.resp, res.err))
case <-time.After(time.Millisecond * 200):
}

s.cluster.ChangeLeader(region.GetID(), followerPeer.GetId())
currLeader.Lock()
currLeader.addr = follower.addr
currLeader.peer = followerPeer
currLeader.Unlock()

res := <-resCh
elapsed := time.Since(startTime)

s.NoError(res.err)
s.Nil(res.resp.GetRegionError())
s.IsType(&kvrpcpb.PrewriteResponse{}, res.resp.Resp)
s.Less(elapsed, time.Millisecond*2500)
s.True(requestHandled)
}

0 comments on commit 5c8e782

Please sign in to comment.