diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 433aa7354..5469c3236 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -464,6 +464,7 @@ type replica struct { attempts int // deadlineErrUsingConfTimeout indicates the replica is already tried, but the received deadline exceeded error. deadlineErrUsingConfTimeout bool + serverIsBusy bool } func (r *replica) isEpochStale() bool { @@ -625,7 +626,7 @@ func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelec // a request. So, before the new leader is elected, we should not send requests // to the unreachable old leader to avoid unnecessary timeout. if liveness != reachable || leader.isExhausted(maxReplicaAttempt) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true} + selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true, isStaleRead: false} return nil, stateChanged{} } selector.targetIdx = state.leaderIdx @@ -640,7 +641,7 @@ func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *rep return } if liveness != reachable || selector.targetReplica().isExhausted(maxReplicaAttempt) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true} + selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true, isStaleRead: false} } if liveness != reachable { selector.invalidateReplicaStore(selector.targetReplica(), cause) @@ -648,7 +649,7 @@ func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *rep } func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true} + selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true, isStaleRead: false} } // tryFollower is the state where we cannot access the known leader @@ -665,6 +666,7 @@ type tryFollower struct { labels []*metapb.StoreLabel // fromAccessKnownLeader indicates whether the state is changed from `accessKnownLeader`. fromAccessKnownLeader bool + isStaleRead bool } func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { @@ -724,8 +726,13 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) ( replicaRead := true rpcCtx.contextPatcher.replicaRead = &replicaRead } - staleRead := false - rpcCtx.contextPatcher.staleRead = &staleRead + leader := selector.replicas[state.leaderIdx] + if leader.attempts == 0 || leader.deadlineErrUsingConfTimeout || leader.serverIsBusy { + rpcCtx.contextPatcher.staleRead = &state.isStaleRead + } else { + staleRead := false + rpcCtx.contextPatcher.staleRead = &staleRead + } return rpcCtx, nil } @@ -934,9 +941,10 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector if (state.isStaleRead && !selector.StaleRead.PreventRetryFollower) || (!state.isStaleRead && leader.deadlineErrUsingConfTimeout) { selector.state = &tryFollower{ - leaderIdx: state.leaderIdx, - lastIdx: state.leaderIdx, - labels: state.option.labels, + leaderIdx: state.leaderIdx, + lastIdx: state.leaderIdx, + labels: state.option.labels, + isStaleRead: state.isStaleRead, } if leaderEpochStale { selector.regionCache.scheduleReloadRegion(selector.region) @@ -1215,7 +1223,7 @@ func (s *replicaSelector) onReadReqConfigurableTimeout(req *tikvrpc.Request) boo } if accessLeader, ok := s.state.(*accessKnownLeader); ok { // If leader return deadline exceeded error, we should try to access follower next time. - s.state = &tryFollower{leaderIdx: accessLeader.leaderIdx, lastIdx: accessLeader.leaderIdx} + s.state = &tryFollower{leaderIdx: accessLeader.leaderIdx, lastIdx: accessLeader.leaderIdx, isStaleRead: false} } return true default: @@ -1224,6 +1232,16 @@ func (s *replicaSelector) onReadReqConfigurableTimeout(req *tikvrpc.Request) boo } } +func (s *replicaSelector) onServerIsBusy(req *tikvrpc.Request) { + switch req.Type { + case tikvrpc.CmdGet, tikvrpc.CmdBatchGet, tikvrpc.CmdScan, + tikvrpc.CmdCop, tikvrpc.CmdBatchCop, tikvrpc.CmdCopStream: + if target := s.targetReplica(); target != nil { + target.serverIsBusy = true + } + } +} + func (s *replicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState { store := accessReplica.store liveness := store.requestLiveness(bo, s.regionCache) @@ -1306,8 +1324,9 @@ func (s *replicaSelector) canFallback2Follower() bool { if !ok { return false } - if !state.isStaleRead { - return false + if state.isStaleRead { + // fallback to follower if it is stale reads + return true } // can fallback to follower only when the leader is exhausted. return state.lastIdx == state.leaderIdx && state.IsLeaderExhausted(s.replicas[state.leaderIdx]) @@ -2127,6 +2146,8 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext if s.replicaSelector.onReadReqConfigurableTimeout(req) { return true, nil } + } else if s.replicaSelector != nil { + s.replicaSelector.onServerIsBusy(req) } logutil.Logger(bo.GetCtx()).Debug( "tikv reports `ServerIsBusy` retry later", diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index c7c755353..cb0b3cdd9 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -1291,9 +1291,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestDoNotTryUnreachableLeader() { follower, _, _, _ := region.FollowerStorePeer(regionStore, 0, &storeSelectorOp{}) s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { - if req.StaleRead && addr == follower.addr { - return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{DataIsNotReady: &errorpb.DataIsNotReady{}}}}, nil - } return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{ Value: []byte(addr), }}, nil diff --git a/internal/locate/region_request_state_test.go b/internal/locate/region_request_state_test.go index 0fc5c2781..74d4d5e32 100644 --- a/internal/locate/region_request_state_test.go +++ b/internal/locate/region_request_state_test.go @@ -333,7 +333,7 @@ func TestRegionCacheStaleRead(t *testing.T) { leaderRegionValid: true, leaderAsyncReload: Some(true), leaderSuccessReplica: []string{"z2", "z3"}, - leaderSuccessReadType: SuccessFollowerRead, + leaderSuccessReadType: SuccessStaleRead, followerRegionValid: true, followerAsyncReload: None[bool](), followerSuccessReplica: []string{"z2"}, @@ -379,7 +379,7 @@ func TestRegionCacheStaleRead(t *testing.T) { leaderRegionValid: true, leaderAsyncReload: Some(false), leaderSuccessReplica: []string{"z2", "z3"}, - leaderSuccessReadType: SuccessFollowerRead, + leaderSuccessReadType: SuccessStaleRead, followerRegionValid: true, followerAsyncReload: Some(false), followerSuccessReplica: []string{"z2"}, @@ -443,11 +443,11 @@ func TestRegionCacheStaleRead(t *testing.T) { leaderRegionValid: true, leaderAsyncReload: Some(false), leaderSuccessReplica: []string{"z3"}, - leaderSuccessReadType: SuccessFollowerRead, + leaderSuccessReadType: SuccessStaleRead, followerRegionValid: true, followerAsyncReload: Some(false), followerSuccessReplica: []string{"z3"}, - followerSuccessReadType: SuccessFollowerRead, + followerSuccessReadType: SuccessStaleRead, }, { do: leaderServerIsBusy, @@ -456,11 +456,11 @@ func TestRegionCacheStaleRead(t *testing.T) { leaderRegionValid: true, leaderAsyncReload: Some(false), leaderSuccessReplica: []string{"z2", "z3"}, - leaderSuccessReadType: SuccessFollowerRead, + leaderSuccessReadType: SuccessStaleRead, followerRegionValid: true, followerAsyncReload: Some(false), followerSuccessReplica: []string{"z2", "z3"}, - followerSuccessReadType: SuccessFollowerRead, + followerSuccessReadType: SuccessStaleRead, }, { do: leaderServerIsBusy, @@ -469,11 +469,11 @@ func TestRegionCacheStaleRead(t *testing.T) { leaderRegionValid: true, leaderAsyncReload: Some(false), leaderSuccessReplica: []string{"z3"}, - leaderSuccessReadType: SuccessFollowerRead, + leaderSuccessReadType: SuccessStaleRead, followerRegionValid: true, followerAsyncReload: Some(false), followerSuccessReplica: []string{"z3"}, - followerSuccessReadType: SuccessFollowerRead, + followerSuccessReadType: SuccessStaleRead, }, { do: leaderDown,