Skip to content

Commit

Permalink
refine onNotLeader logic when new leader is not available (#1178)
Browse files Browse the repository at this point in the history
* refine onNotLeader logic when new leader is not available #1169

Signed-off-by: crazycs520 <[email protected]>

* refine logic and add test

Signed-off-by: crazycs520 <[email protected]>

* add more test case for ReplicaReadLeader with kv_read_timeout

Signed-off-by: crazycs520 <[email protected]>

* add comment

Signed-off-by: crazycs520 <[email protected]>

* fix leader has deadlineErr and retry it again issue, and add test for it

Signed-off-by: crazycs520 <[email protected]>

* refine code and comment

Signed-off-by: crazycs520 <[email protected]>

---------

Signed-off-by: crazycs520 <[email protected]>
  • Loading branch information
crazycs520 authored Feb 26, 2024
1 parent 6f9550f commit 93fff7c
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 7 deletions.
27 changes: 21 additions & 6 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,11 +413,7 @@ func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelec
selector.state = &tryNewProxy{leaderIdx: state.leaderIdx}
return nil, stateChanged{}
}
// If hibernate region is enabled and the leader is not reachable, the raft group
// will not be wakened up and re-elect the leader until the follower receives
// a request. So, before the new leader is elected, we should not send requests
// to the unreachable old leader to avoid unnecessary timeout.
if liveness != reachable || leader.isExhausted(maxReplicaAttempt, maxReplicaAttemptTime) {
if !state.isCandidate(leader) {
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true}
return nil, stateChanged{}
}
Expand All @@ -434,6 +430,20 @@ func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelec
return selector.buildRPCContext(bo)
}

// check leader is candidate or not.
func (state *accessKnownLeader) isCandidate(leader *replica) bool {
liveness := leader.store.getLivenessState()
// If hibernate region is enabled and the leader is not reachable, the raft group
// will not be wakened up and re-elect the leader until the follower receives
// a request. So, before the new leader is elected, we should not send requests
// to the unreachable old leader to avoid unnecessary timeout.
// If leader.deadlineErrUsingConfTimeout is true, it means the leader is already tried and received deadline exceeded error, then don't retry it.
if liveness != reachable || leader.isExhausted(maxReplicaAttempt, maxReplicaAttemptTime) || leader.deadlineErrUsingConfTimeout {
return false
}
return true
}

func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) {
liveness := selector.checkLiveness(bo, selector.targetReplica())
// Only enable forwarding when unreachable to avoid using proxy to access a TiKV that cannot serve.
Expand Down Expand Up @@ -1200,7 +1210,12 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) {
replica.attempts = maxReplicaAttempt - 1
replica.attemptedTime = 0
}
s.state = &accessKnownLeader{leaderIdx: AccessIndex(i)}
state := &accessKnownLeader{leaderIdx: AccessIndex(i)}
if state.isCandidate(s.replicas[i]) {
// If the new leader is candidate, switch to the new leader.
// the leader may have deadlineErrUsingConfTimeout and isn't candidate, if so, keep the state unchanged and retry the request.
s.state = state
}
// Update the workTiKVIdx so that following requests can be sent to the leader immediately.
if !s.region.switchWorkLeaderToPeer(leader) {
panic("the store must exist")
Expand Down
123 changes: 122 additions & 1 deletion internal/locate/replica_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func TestReplicaReadStaleReadAccessPathByCase(t *testing.T) {
s.SetupTest(t)
defer s.TearDownTest()

fakeEpochNotMatch := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}} // fake region error, cause by no replica is available.
var ca replicaSelectorAccessPathCase
ca = replicaSelectorAccessPathCase{
reqType: tikvrpc.CmdGet,
Expand Down Expand Up @@ -160,8 +161,128 @@ func TestReplicaReadStaleReadAccessPathByCase(t *testing.T) {
},
}
s.True(s.runCaseAndCompare(ca))
}

ca = replicaSelectorAccessPathCase{
reqType: tikvrpc.CmdGet,
readType: kv.ReplicaReadMixed,
staleRead: true,
timeout: time.Second,
label: &metapb.StoreLabel{Key: "id", Value: "2"},
accessErr: []RegionErrorType{DataIsNotReadyErr, NotLeaderWithNewLeader2Err},
expect: &accessPathResult{
accessPath: []string{
"{addr: store2, replica-read: false, stale-read: true}",
"{addr: store1, replica-read: false, stale-read: false}",
"{addr: store2, replica-read: false, stale-read: false}"}, // retry the new leader.
respErr: "",
respRegionError: nil,
backoffCnt: 0,
backoffDetail: []string{},
regionIsValid: true,
},
}
s.True(s.runCaseAndCompare(ca))

ca = replicaSelectorAccessPathCase{
reqType: tikvrpc.CmdGet,
readType: kv.ReplicaReadMixed,
staleRead: true,
timeout: time.Second,
label: &metapb.StoreLabel{Key: "id", Value: "2"},
accessErr: []RegionErrorType{DeadLineExceededErr, NotLeaderWithNewLeader2Err},
expect: &accessPathResult{
accessPath: []string{
"{addr: store2, replica-read: false, stale-read: true}",
"{addr: store1, replica-read: false, stale-read: false}",
"{addr: store3, replica-read: true, stale-read: false}"}, // store2 has DeadLineExceededErr, so don't retry store2 even it is new leader.
respErr: "",
respRegionError: nil,
backoffCnt: 0,
backoffDetail: []string{},
regionIsValid: true,
},
}
s.True(s.runCaseAndCompare(ca))

ca = replicaSelectorAccessPathCase{
reqType: tikvrpc.CmdGet,
readType: kv.ReplicaReadLeader,
staleRead: false,
timeout: time.Second,
accessErr: []RegionErrorType{DeadLineExceededErr},
expect: &accessPathResult{
accessPath: []string{
"{addr: store1, replica-read: false, stale-read: false}",
"{addr: store2, replica-read: true, stale-read: false}"},
respErr: "",
respRegionError: nil,
backoffCnt: 0,
backoffDetail: []string{},
regionIsValid: true,
},
}
s.True(s.runCaseAndCompare(ca))

ca = replicaSelectorAccessPathCase{
reqType: tikvrpc.CmdGet,
readType: kv.ReplicaReadLeader,
staleRead: false,
timeout: time.Second,
accessErr: []RegionErrorType{NotLeaderWithNewLeader3Err, DeadLineExceededErr},
expect: &accessPathResult{
accessPath: []string{
"{addr: store1, replica-read: false, stale-read: false}",
"{addr: store3, replica-read: false, stale-read: false}",
"{addr: store2, replica-read: true, stale-read: false}"},
respErr: "",
respRegionError: nil,
backoffCnt: 0,
backoffDetail: []string{},
regionIsValid: true,
},
}
s.True(s.runCaseAndCompare(ca))

ca = replicaSelectorAccessPathCase{
reqType: tikvrpc.CmdPrewrite,
readType: kv.ReplicaReadLeader,
staleRead: false,
timeout: time.Second, // this actually has no effect on write req. since tikv_client_read_timeout is used for read req only.
accessErr: []RegionErrorType{NotLeaderWithNewLeader3Err, DeadLineExceededErr},
expect: &accessPathResult{
accessPath: []string{
"{addr: store1, replica-read: false, stale-read: false}",
"{addr: store3, replica-read: false, stale-read: false}", // try new leader in store3, but got DeadLineExceededErr, and this store's liveness will be mock to unreachable in test case running.
"{addr: store2, replica-read: false, stale-read: false}"}, // try remaining replica in store2.
respErr: "",
respRegionError: nil,
backoffCnt: 1,
backoffDetail: []string{"tikvRPC+1"},
regionIsValid: true,
},
}
s.True(s.runCaseAndCompare(ca))

ca = replicaSelectorAccessPathCase{
reqType: tikvrpc.CmdGet,
readType: kv.ReplicaReadLeader,
staleRead: false,
timeout: time.Second,
accessErr: []RegionErrorType{NotLeaderErr, DeadLineExceededErr, NotLeaderWithNewLeader2Err},
expect: &accessPathResult{
accessPath: []string{
"{addr: store1, replica-read: false, stale-read: false}",
"{addr: store2, replica-read: false, stale-read: false}",
"{addr: store3, replica-read: false, stale-read: false}"},
respErr: "",
respRegionError: fakeEpochNotMatch,
backoffCnt: 1,
backoffDetail: []string{"regionScheduling+1"},
regionIsValid: false,
},
}
s.True(s.runCaseAndCompare(ca))
}
func (s *testReplicaSelectorSuite) runCaseAndCompare(ca2 replicaSelectorAccessPathCase) bool {
ca2.run(s)
if ca2.accessErrInValid {
Expand Down

0 comments on commit 93fff7c

Please sign in to comment.