diff --git a/internal/locate/replica_selector.go b/internal/locate/replica_selector.go index 5d5a927bb..3a20eac7f 100644 --- a/internal/locate/replica_selector.go +++ b/internal/locate/replica_selector.go @@ -79,6 +79,9 @@ func newReplicaSelectorV2( for _, op := range opts { op(&option) } + if req.ReplicaReadType == kv.ReplicaReadPreferLeader { + WithPerferLeader()(&option) + } return &replicaSelectorV2{ baseReplicaSelector: baseReplicaSelector{ regionCache: regionCache, @@ -166,17 +169,10 @@ func (s *replicaSelectorV2) nextForReplicaReadMixed(req *tikvrpc.Request) { return } } - preferLeader := req.ReplicaReadType == kv.ReplicaReadPreferLeader - if s.attempts > 1 { - if req.ReplicaReadType == kv.ReplicaReadMixed { - // For mixed read retry, prefer retry leader first. - preferLeader = true - } - } strategy := ReplicaSelectMixedStrategy{ leaderIdx: leaderIdx, tryLeader: req.ReplicaReadType == kv.ReplicaReadMixed || req.ReplicaReadType == kv.ReplicaReadPreferLeader, - preferLeader: preferLeader, + preferLeader: s.option.preferLeader, leaderOnly: s.option.leaderOnly, learnerOnly: req.ReplicaReadType == kv.ReplicaReadLearner, labels: s.option.labels, @@ -201,6 +197,14 @@ func (s *replicaSelectorV2) nextForReplicaReadMixed(req *tikvrpc.Request) { req.StaleRead = false req.ReplicaRead = s.isReadOnlyReq } + // Monitor the flows destination if selector is under `ReplicaReadPreferLeader` mode. + if s.option.preferLeader { + if s.target.peer.Id != s.region.GetLeaderPeerID() { + s.target.store.recordReplicaFlowsStats(toFollower) + } else { + s.target.store.recordReplicaFlowsStats(toLeader) + } + } } } diff --git a/internal/locate/replica_selector_test.go b/internal/locate/replica_selector_test.go index bb6cd01a6..bdb8325b6 100644 --- a/internal/locate/replica_selector_test.go +++ b/internal/locate/replica_selector_test.go @@ -663,6 +663,69 @@ func TestReplicaReadAccessPathByCase(t *testing.T) { }, } s.True(s.runCaseAndCompare(ca)) + + s.changeRegionLeader(3) + ca = replicaSelectorAccessPathCase{ + reqType: tikvrpc.CmdGet, + readType: kv.ReplicaReadMixed, + staleRead: false, + accessErr: []RegionErrorType{ServerIsBusyErr, ServerIsBusyErr, ServerIsBusyErr}, + expect: &accessPathResult{ + accessPath: []string{ + "{addr: store1, replica-read: true, stale-read: false}", + "{addr: store2, replica-read: true, stale-read: false}", + "{addr: store3, replica-read: true, stale-read: false}", + }, + respErr: "", + respRegionError: fakeEpochNotMatch, + backoffCnt: 1, + backoffDetail: []string{"tikvServerBusy+1"}, + regionIsValid: false, + }, + } + s.True(s.runCaseAndCompare(ca)) + + ca = replicaSelectorAccessPathCase{ + reqType: tikvrpc.CmdGet, + readType: kv.ReplicaReadPreferLeader, + staleRead: false, + accessErr: []RegionErrorType{ServerIsBusyErr, ServerIsBusyErr, ServerIsBusyErr}, + expect: &accessPathResult{ + accessPath: []string{ + "{addr: store3, replica-read: true, stale-read: false}", + "{addr: store1, replica-read: true, stale-read: false}", + "{addr: store2, replica-read: true, stale-read: false}", + }, + respErr: "", + respRegionError: fakeEpochNotMatch, + backoffCnt: 1, + backoffDetail: []string{"tikvServerBusy+1"}, + regionIsValid: false, + }, + } + s.True(s.runCaseAndCompare(ca)) + + ca = replicaSelectorAccessPathCase{ + reqType: tikvrpc.CmdGet, + readType: kv.ReplicaReadPreferLeader, + staleRead: false, + label: &metapb.StoreLabel{Key: "id", Value: "2"}, + accessErr: []RegionErrorType{ServerIsBusyErr, ServerIsBusyErr, ServerIsBusyErr}, + expect: &accessPathResult{ + accessPath: []string{ + "{addr: store2, replica-read: true, stale-read: false}", + "{addr: store3, replica-read: true, stale-read: false}", + "{addr: store1, replica-read: true, stale-read: false}", + }, + respErr: "", + respRegionError: fakeEpochNotMatch, + backoffCnt: 1, + backoffDetail: []string{"tikvServerBusy+1"}, + regionIsValid: false, + }, + } + s.True(s.runCaseAndCompare(ca)) + s.changeRegionLeader(1) } func TestReplicaReadAccessPathByCase2(t *testing.T) {