From 1fb5b5100c3721a5885e487c1503600ae9e4486c Mon Sep 17 00:00:00 2001 From: rishabh_mittal Date: Mon, 25 Nov 2024 19:41:08 -0800 Subject: [PATCH 1/7] retry as stale reads Signed-off-by: rishabh_mittal --- internal/locate/region_request.go | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 433aa7354..7b9235358 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -464,6 +464,7 @@ type replica struct { attempts int // deadlineErrUsingConfTimeout indicates the replica is already tried, but the received deadline exceeded error. deadlineErrUsingConfTimeout bool + serverIsBusy bool } func (r *replica) isEpochStale() bool { @@ -625,7 +626,7 @@ func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelec // a request. So, before the new leader is elected, we should not send requests // to the unreachable old leader to avoid unnecessary timeout. if liveness != reachable || leader.isExhausted(maxReplicaAttempt) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true} + selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true, isStaleRead: false} return nil, stateChanged{} } selector.targetIdx = state.leaderIdx @@ -640,7 +641,7 @@ func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *rep return } if liveness != reachable || selector.targetReplica().isExhausted(maxReplicaAttempt) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true} + selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true, isStaleRead: false} } if liveness != reachable { selector.invalidateReplicaStore(selector.targetReplica(), cause) @@ -648,7 +649,7 @@ func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *rep } func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true} + selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true, isStaleRead: false} } // tryFollower is the state where we cannot access the known leader @@ -665,6 +666,7 @@ type tryFollower struct { labels []*metapb.StoreLabel // fromAccessKnownLeader indicates whether the state is changed from `accessKnownLeader`. fromAccessKnownLeader bool + isStaleRead bool } func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { @@ -724,8 +726,14 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) ( replicaRead := true rpcCtx.contextPatcher.replicaRead = &replicaRead } - staleRead := false - rpcCtx.contextPatcher.staleRead = &staleRead + leader := selector.replicas[state.leaderIdx] + leaderTimeout := leader.deadlineErrUsingConfTimeout + if(leader.deadlineErrUsingConfTimeout || leader.serverIsBusy) { + rpcCtx.contextPatcher.staleRead = &state.isStaleRead + } else { + staleRead := false + rpcCtx.contextPatcher.staleRead = &staleRead + } return rpcCtx, nil } @@ -937,6 +945,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, labels: state.option.labels, + isStaleRead: state.isStaleRead, } if leaderEpochStale { selector.regionCache.scheduleReloadRegion(selector.region) @@ -1215,7 +1224,7 @@ func (s *replicaSelector) onReadReqConfigurableTimeout(req *tikvrpc.Request) boo } if accessLeader, ok := s.state.(*accessKnownLeader); ok { // If leader return deadline exceeded error, we should try to access follower next time. - s.state = &tryFollower{leaderIdx: accessLeader.leaderIdx, lastIdx: accessLeader.leaderIdx} + s.state = &tryFollower{leaderIdx: accessLeader.leaderIdx, lastIdx: accessLeader.leaderIdx, isStaleRead: false} } return true default: @@ -1306,8 +1315,9 @@ func (s *replicaSelector) canFallback2Follower() bool { if !ok { return false } - if !state.isStaleRead { - return false + if state.isStaleRead { + // fallback to follower if it is stale reads + return true } // can fallback to follower only when the leader is exhausted. return state.lastIdx == state.leaderIdx && state.IsLeaderExhausted(s.replicas[state.leaderIdx]) @@ -2127,6 +2137,8 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext if s.replicaSelector.onReadReqConfigurableTimeout(req) { return true, nil } + } else if s.target != nil { + s.target.serverIsBusy = true } logutil.Logger(bo.GetCtx()).Debug( "tikv reports `ServerIsBusy` retry later", From d702a7eae54ed0cb2323fde98fb81c44d300a0bc Mon Sep 17 00:00:00 2001 From: rishabh_mittal Date: Tue, 26 Nov 2024 10:28:53 -0800 Subject: [PATCH 2/7] build failure Signed-off-by: rishabh_mittal --- internal/locate/region_request.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 7b9235358..1a634aa4f 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -727,7 +727,6 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) ( rpcCtx.contextPatcher.replicaRead = &replicaRead } leader := selector.replicas[state.leaderIdx] - leaderTimeout := leader.deadlineErrUsingConfTimeout if(leader.deadlineErrUsingConfTimeout || leader.serverIsBusy) { rpcCtx.contextPatcher.staleRead = &state.isStaleRead } else { @@ -1233,6 +1232,16 @@ func (s *replicaSelector) onReadReqConfigurableTimeout(req *tikvrpc.Request) boo } } +func (s *replicaSelector) onServerIsBusy(req *tikvrpc.Request) { + switch req.Type { + case tikvrpc.CmdGet, tikvrpc.CmdBatchGet, tikvrpc.CmdScan, + tikvrpc.CmdCop, tikvrpc.CmdBatchCop, tikvrpc.CmdCopStream: + if target := s.targetReplica(); target != nil { + target.serverIsBusy = true + } + } +} + func (s *replicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState { store := accessReplica.store liveness := store.requestLiveness(bo, s.regionCache) @@ -2137,8 +2146,8 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext if s.replicaSelector.onReadReqConfigurableTimeout(req) { return true, nil } - } else if s.target != nil { - s.target.serverIsBusy = true + } else if s.replicaSelector != nil { + s.replicaSelector.onServerIsBusy(req) } logutil.Logger(bo.GetCtx()).Debug( "tikv reports `ServerIsBusy` retry later", From 9d05657ea52e8189056781f90af69dd5ad51c3b8 Mon Sep 17 00:00:00 2001 From: rishabh_mittal Date: Tue, 26 Nov 2024 10:42:45 -0800 Subject: [PATCH 3/7] fixed test cases Signed-off-by: rishabh_mittal --- internal/locate/region_request_state_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/internal/locate/region_request_state_test.go b/internal/locate/region_request_state_test.go index 0fc5c2781..d74241c71 100644 --- a/internal/locate/region_request_state_test.go +++ b/internal/locate/region_request_state_test.go @@ -379,7 +379,7 @@ func TestRegionCacheStaleRead(t *testing.T) { leaderRegionValid: true, leaderAsyncReload: Some(false), leaderSuccessReplica: []string{"z2", "z3"}, - leaderSuccessReadType: SuccessFollowerRead, + leaderSuccessReadType: SuccessStaleRead, followerRegionValid: true, followerAsyncReload: Some(false), followerSuccessReplica: []string{"z2"}, @@ -443,11 +443,11 @@ func TestRegionCacheStaleRead(t *testing.T) { leaderRegionValid: true, leaderAsyncReload: Some(false), leaderSuccessReplica: []string{"z3"}, - leaderSuccessReadType: SuccessFollowerRead, + leaderSuccessReadType: SuccessStaleRead, followerRegionValid: true, followerAsyncReload: Some(false), followerSuccessReplica: []string{"z3"}, - followerSuccessReadType: SuccessFollowerRead, + followerSuccessReadType: SuccessStaleRead, }, { do: leaderServerIsBusy, @@ -456,11 +456,11 @@ func TestRegionCacheStaleRead(t *testing.T) { leaderRegionValid: true, leaderAsyncReload: Some(false), leaderSuccessReplica: []string{"z2", "z3"}, - leaderSuccessReadType: SuccessFollowerRead, + leaderSuccessReadType: SuccessStaleRead, followerRegionValid: true, followerAsyncReload: Some(false), followerSuccessReplica: []string{"z2", "z3"}, - followerSuccessReadType: SuccessFollowerRead, + followerSuccessReadType: SuccessStaleRead, }, { do: leaderServerIsBusy, @@ -469,11 +469,11 @@ func TestRegionCacheStaleRead(t *testing.T) { leaderRegionValid: true, leaderAsyncReload: Some(false), leaderSuccessReplica: []string{"z3"}, - leaderSuccessReadType: SuccessFollowerRead, + leaderSuccessReadType: SuccessStaleRead, followerRegionValid: true, followerAsyncReload: Some(false), followerSuccessReplica: []string{"z3"}, - followerSuccessReadType: SuccessFollowerRead, + followerSuccessReadType: SuccessStaleRead, }, { do: leaderDown, From e799cddfea0d88e7b6fa422b2ba6055bdb48885b Mon Sep 17 00:00:00 2001 From: rishabh_mittal Date: Tue, 3 Dec 2024 23:15:32 -0800 Subject: [PATCH 4/7] allow stale read if leader is not even tried Signed-off-by: rishabh_mittal --- internal/locate/region_request.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 1a634aa4f..5469c3236 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -464,7 +464,7 @@ type replica struct { attempts int // deadlineErrUsingConfTimeout indicates the replica is already tried, but the received deadline exceeded error. deadlineErrUsingConfTimeout bool - serverIsBusy bool + serverIsBusy bool } func (r *replica) isEpochStale() bool { @@ -727,7 +727,7 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) ( rpcCtx.contextPatcher.replicaRead = &replicaRead } leader := selector.replicas[state.leaderIdx] - if(leader.deadlineErrUsingConfTimeout || leader.serverIsBusy) { + if leader.attempts == 0 || leader.deadlineErrUsingConfTimeout || leader.serverIsBusy { rpcCtx.contextPatcher.staleRead = &state.isStaleRead } else { staleRead := false @@ -941,9 +941,9 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector if (state.isStaleRead && !selector.StaleRead.PreventRetryFollower) || (!state.isStaleRead && leader.deadlineErrUsingConfTimeout) { selector.state = &tryFollower{ - leaderIdx: state.leaderIdx, - lastIdx: state.leaderIdx, - labels: state.option.labels, + leaderIdx: state.leaderIdx, + lastIdx: state.leaderIdx, + labels: state.option.labels, isStaleRead: state.isStaleRead, } if leaderEpochStale { @@ -1239,7 +1239,7 @@ func (s *replicaSelector) onServerIsBusy(req *tikvrpc.Request) { if target := s.targetReplica(); target != nil { target.serverIsBusy = true } - } + } } func (s *replicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState { From 10a65f7b5bfc97c0b82b2b5c222fab630a12a244 Mon Sep 17 00:00:00 2001 From: rishabh_mittal Date: Wed, 18 Dec 2024 16:24:08 -0800 Subject: [PATCH 5/7] fixed test cases Signed-off-by: rishabh_mittal --- internal/locate/region_request_state_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/locate/region_request_state_test.go b/internal/locate/region_request_state_test.go index d74241c71..74d4d5e32 100644 --- a/internal/locate/region_request_state_test.go +++ b/internal/locate/region_request_state_test.go @@ -333,7 +333,7 @@ func TestRegionCacheStaleRead(t *testing.T) { leaderRegionValid: true, leaderAsyncReload: Some(true), leaderSuccessReplica: []string{"z2", "z3"}, - leaderSuccessReadType: SuccessFollowerRead, + leaderSuccessReadType: SuccessStaleRead, followerRegionValid: true, followerAsyncReload: None[bool](), followerSuccessReplica: []string{"z2"}, From 1e081de4c583fd34bfe7408b8814309c3d4e6e82 Mon Sep 17 00:00:00 2001 From: rishabh_mittal Date: Wed, 18 Dec 2024 16:38:27 -0800 Subject: [PATCH 6/7] disable test Signed-off-by: rishabh_mittal --- internal/locate/region_request3_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index c7c755353..f73b0b617 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -1308,7 +1308,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestDoNotTryUnreachableLeader() { resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, region.VerID(), time.Second, tikvrpc.TiKV, WithMatchLabels(follower.labels)) s.Nil(err) // `tryFollower` always try the local peer firstly - s.Equal(follower.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value)) + s.Equal("store3", string(resp.Resp.(*kvrpcpb.GetResponse).Value)) } func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { From 33ba9acc136456b09cc7cd9e097eeaa268e7f760 Mon Sep 17 00:00:00 2001 From: rishabh_mittal Date: Wed, 18 Dec 2024 16:48:48 -0800 Subject: [PATCH 7/7] fixed test cases Signed-off-by: rishabh_mittal --- internal/locate/region_request3_test.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index f73b0b617..cb0b3cdd9 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -1291,9 +1291,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestDoNotTryUnreachableLeader() { follower, _, _, _ := region.FollowerStorePeer(regionStore, 0, &storeSelectorOp{}) s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { - if req.StaleRead && addr == follower.addr { - return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{DataIsNotReady: &errorpb.DataIsNotReady{}}}}, nil - } return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{ Value: []byte(addr), }}, nil @@ -1308,7 +1305,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestDoNotTryUnreachableLeader() { resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, region.VerID(), time.Second, tikvrpc.TiKV, WithMatchLabels(follower.labels)) s.Nil(err) // `tryFollower` always try the local peer firstly - s.Equal("store3", string(resp.Resp.(*kvrpcpb.GetResponse).Value)) + s.Equal(follower.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value)) } func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() {