diff --git a/config/retry/config.go b/config/retry/config.go index 89a9bc382d..c95d2cbd15 100644 --- a/config/retry/config.go +++ b/config/retry/config.go @@ -46,6 +46,7 @@ import ( "github.com/tikv/client-go/v2/internal/logutil" "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/metrics" + "github.com/tikv/client-go/v2/util" "go.uber.org/zap" ) @@ -191,6 +192,11 @@ func newBackoffFn(base, cap, jitter int) backoffFn { if maxSleepMs >= 0 && realSleep > maxSleepMs { realSleep = maxSleepMs } + if _, err := util.EvalFailpoint("fastBackoffBySkipSleep"); err == nil { + attempts++ + lastSleep = sleep + return realSleep + } select { case <-time.After(time.Duration(realSleep) * time.Millisecond): attempts++ diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index e8f652e00f..d1b511e256 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -2843,6 +2843,9 @@ func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoff reResolveInterval = dur } } + if _, err := util.EvalFailpoint("skipStoreCheckUntilHealth"); err == nil { + return + } go s.checkUntilHealth(c, liveness, reResolveInterval) } return diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index efcafbf162..655b47ab97 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -73,6 +73,9 @@ import ( // network error because tidb-server expect tikv client to exit as soon as possible. var shuttingDown uint32 +// randIntn is only use for testing. +var randIntn = rand.Intn + // StoreShuttingDown atomically stores ShuttingDown into v. func StoreShuttingDown(v uint32) { atomic.StoreUint32(&shuttingDown, v) @@ -612,7 +615,7 @@ func (state *tryNewProxy) next(bo *retry.Backoffer, selector *replicaSelector) ( } // Skip advanceCnt valid candidates to find a proxy peer randomly - advanceCnt := rand.Intn(candidateNum) + advanceCnt := randIntn(candidateNum) for idx, replica := range selector.replicas { if !state.isCandidate(AccessIndex(idx), replica) { continue @@ -668,13 +671,13 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector resetStaleRead := false if state.lastIdx < 0 { if state.tryLeader { - state.lastIdx = AccessIndex(rand.Intn(replicaSize)) + state.lastIdx = AccessIndex(randIntn(replicaSize)) } else { if replicaSize <= 1 { state.lastIdx = state.leaderIdx } else { // Randomly select a non-leader peer - state.lastIdx = AccessIndex(rand.Intn(replicaSize - 1)) + state.lastIdx = AccessIndex(randIntn(replicaSize - 1)) if state.lastIdx >= state.leaderIdx { state.lastIdx++ } @@ -696,7 +699,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector } var offset int if state.lastIdx >= 0 { - offset = rand.Intn(replicaSize) + offset = randIntn(replicaSize) } reloadRegion := false for i := 0; i < replicaSize && !state.option.leaderOnly; i++ { @@ -791,16 +794,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector } func (state *accessFollower) IsLeaderExhausted(leader *replica) bool { - // Allow another extra retry for the following case: - // 1. The stale read is enabled and leader peer is selected as the target peer at first. - // 2. Data is not ready is returned from the leader peer. - // 3. Stale read flag is removed and processing falls back to snapshot read on the leader peer. - // 4. The leader peer should be retried again using snapshot read. - if state.isStaleRead && state.option.leaderOnly { - return leader.isExhausted(2, 0) - } else { - return leader.isExhausted(1, 0) - } + return leader.isExhausted(1, 0) } func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) { @@ -845,7 +839,7 @@ func (state *tryIdleReplica) next(bo *retry.Backoffer, selector *replicaSelector // Select a follower replica that has the lowest estimated wait duration minWait := time.Duration(math.MaxInt64) targetIdx := state.leaderIdx - startIdx := rand.Intn(len(selector.replicas)) + startIdx := randIntn(len(selector.replicas)) for i := 0; i < len(selector.replicas); i++ { idx := (i + startIdx) % len(selector.replicas) r := selector.replicas[idx] diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index b2ede8ab64..36e1f71bba 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -759,6 +759,10 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { // TODO(youjiali1995): Remove duplicated tests. This test may be duplicated with other // tests but it's a dedicated one to test sending requests with the replica selector. func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { + s.NoError(failpoint.Enable("tikvclient/fastBackoffBySkipSleep", `return`)) + defer func() { + s.NoError(failpoint.Disable("tikvclient/fastBackoffBySkipSleep")) + }() req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ Key: []byte("key"), Value: []byte("value"), @@ -987,49 +991,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { for _, store := range s.storeIDs { s.cluster.StartStore(store) } - - // Verify switch to the leader immediately when stale read requests with global txn scope meet region errors. - s.cluster.ChangeLeader(region.Region.id, s.peerIDs[0]) - reachable.injectConstantLiveness(s.cache) - s.Eventually(func() bool { - stores := s.regionRequestSender.replicaSelector.regionStore.stores - return stores[0].getLivenessState() == reachable && - stores[1].getLivenessState() == reachable && - stores[2].getLivenessState() == reachable - }, 3*time.Second, 200*time.Millisecond) - reloadRegion() - req = tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}) - req.ReadReplicaScope = oracle.GlobalTxnScope - req.TxnScope = oracle.GlobalTxnScope - for i := 0; i < 10; i++ { - req.EnableStaleWithMixedReplicaRead() - // The request may be sent to the leader directly. We have to distinguish it. - failureOnFollower := 0 - failureOnLeader := 0 - s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { - if addr != s.cluster.GetStore(s.storeIDs[0]).Address { - failureOnFollower++ - return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{}}}, nil - } else if failureOnLeader == 0 && i%2 == 0 { - failureOnLeader++ - return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{}}}, nil - } else { - return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{}}, nil - } - }} - sender.SendReq(bo, req, region.Region, time.Second) - state, ok := sender.replicaSelector.state.(*accessFollower) - s.True(ok) - s.True(failureOnFollower <= 1) // any retry should go to the leader, hence at most one failure on the follower allowed - if failureOnFollower == 0 && failureOnLeader == 0 { - // if the request goes to the leader and succeeds then it is executed as a StaleRead - s.True(req.StaleRead) - } else { - // otherwise #leaderOnly flag should be set and retry request as a normal read - s.True(state.option.leaderOnly) - s.False(req.StaleRead) - } - } } func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() { @@ -1223,62 +1184,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown() } } -func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() { - leaderStore, _ := s.loadAndGetLeaderStore() - leaderLabel := []*metapb.StoreLabel{ - { - Key: "id", - Value: strconv.FormatUint(leaderStore.StoreID(), 10), - }, - } - regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) - s.Nil(err) - s.NotNil(regionLoc) - value := []byte("value") - isFirstReq := true - - s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { - select { - case <-ctx.Done(): - return nil, errors.New("timeout") - default: - } - // Return `DataIsNotReady` for the first time on leader. - if isFirstReq { - isFirstReq = false - return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ - DataIsNotReady: &errorpb.DataIsNotReady{}, - }}}, nil - } - return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil - }} - - region, _ := s.cache.searchCachedRegionByID(regionLoc.Region.GetID()) - s.True(region.isValid()) - - req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil) - req.ReadReplicaScope = oracle.GlobalTxnScope - req.TxnScope = oracle.GlobalTxnScope - req.EnableStaleWithMixedReplicaRead() - req.ReplicaReadType = kv.ReplicaReadMixed - var ops []StoreSelectorOption - ops = append(ops, WithMatchLabels(leaderLabel)) - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - bo := retry.NewBackoffer(ctx, -1) - s.Nil(err) - resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...) - s.Nil(err) - - regionErr, err := resp.GetRegionError() - s.Nil(err) - s.Nil(regionErr) - getResp, ok := resp.Resp.(*kvrpcpb.GetResponse) - s.True(ok) - s.Equal(getResp.Value, value) -} - func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { leaderAddr := "" reqTargetAddrs := make(map[string]struct{}) diff --git a/internal/locate/region_request_state_test.go b/internal/locate/region_request_state_test.go index 6a92b52767..c94552b2f2 100644 --- a/internal/locate/region_request_state_test.go +++ b/internal/locate/region_request_state_test.go @@ -348,17 +348,6 @@ func TestRegionCacheStaleRead(t *testing.T) { followerSuccessReplica: []string{"z2"}, followerSuccessReadType: SuccessStaleRead, }, - { - do: leaderDataIsNotReady, - leaderRegionValid: true, - leaderAsyncReload: Some(false), - leaderSuccessReplica: []string{"z1"}, - leaderSuccessReadType: SuccessLeaderRead, - followerRegionValid: true, - followerAsyncReload: Some(false), - followerSuccessReplica: []string{"z2"}, - followerSuccessReadType: SuccessStaleRead, - }, { do: followerDataIsNotReady, leaderRegionValid: true, @@ -395,45 +384,6 @@ func TestRegionCacheStaleRead(t *testing.T) { followerSuccessReplica: []string{"z1"}, followerSuccessReadType: SuccessLeaderRead, }, - { - do: leaderDataIsNotReady, - extra: []func(suite *testRegionCacheStaleReadSuite){followerServerIsBusy}, - recoverable: true, - leaderRegionValid: true, - leaderAsyncReload: Some(false), - leaderSuccessReplica: []string{"z1"}, - leaderSuccessReadType: SuccessLeaderRead, - followerRegionValid: true, - followerAsyncReload: Some(false), - followerSuccessReplica: []string{"z1"}, - followerSuccessReadType: SuccessLeaderRead, - }, - { - do: leaderDataIsNotReady, - extra: []func(suite *testRegionCacheStaleReadSuite){followerDataIsNotReady}, - recoverable: true, - leaderRegionValid: true, - leaderAsyncReload: Some(false), - leaderSuccessReplica: []string{"z1"}, - leaderSuccessReadType: SuccessLeaderRead, - followerRegionValid: true, - followerAsyncReload: Some(false), - followerSuccessReplica: []string{"z1"}, - followerSuccessReadType: SuccessLeaderRead, - }, - { - do: leaderDataIsNotReady, - extra: []func(suite *testRegionCacheStaleReadSuite){followerDown}, - recoverable: true, - leaderRegionValid: true, - leaderAsyncReload: Some(false), - leaderSuccessReplica: []string{"z1"}, - leaderSuccessReadType: SuccessLeaderRead, - followerRegionValid: true, - followerAsyncReload: Some(false), - followerSuccessReplica: []string{"z1"}, - followerSuccessReadType: SuccessLeaderRead, - }, { do: leaderServerIsBusy, extra: []func(suite *testRegionCacheStaleReadSuite){followerServerIsBusy}, @@ -598,10 +548,11 @@ func testStaleRead(s *testRegionCacheStaleReadSuite, r *RegionCacheTestCase, zon return } + msg := fmt.Sprintf("%v %#v", string(resp.Resp.(*kvrpcpb.GetResponse).Value), r) _, successZone, successReadType := s.extractResp(resp) find := false if leaderZone { - s.Equal(r.leaderSuccessReadType, successReadType) + s.Equal(r.leaderSuccessReadType, successReadType, msg) for _, z := range r.leaderSuccessReplica { if z == successZone { find = true @@ -617,7 +568,7 @@ func testStaleRead(s *testRegionCacheStaleReadSuite, r *RegionCacheTestCase, zon } } } - s.True(find) + s.True(find, msg) } type Option[T interface{}] struct { @@ -767,22 +718,6 @@ func leaderDownAndElect(s *testRegionCacheStaleReadSuite) { s.setUnavailableStore(leader.Id) } -func leaderDataIsNotReady(s *testRegionCacheStaleReadSuite) { - peerID, _ := s.getLeader() - s.injection.leaderRegionError = func(req *tikvrpc.Request, zone string) *errorpb.Error { - if !req.StaleRead || zone != "z1" { - return nil - } - return &errorpb.Error{ - DataIsNotReady: &errorpb.DataIsNotReady{ - RegionId: s.regionID, - PeerId: peerID, - SafeTs: 0, - }, - } - } -} - func leaderServerIsBusy(s *testRegionCacheStaleReadSuite) { s.injection.leaderRegionError = func(req *tikvrpc.Request, zone string) *errorpb.Error { if zone != "z1" { diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 58d046a2d1..1d12d1f968 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -679,42 +679,6 @@ func (s *testRegionRequestToSingleStoreSuite) TestCloseConnectionOnStoreNotMatch s.Equal(target, client.closedAddr) } -func (s *testRegionRequestToSingleStoreSuite) TestStaleReadRetry() { - req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ - Key: []byte("key"), - }) - req.EnableStaleWithMixedReplicaRead() - req.ReadReplicaScope = "z1" // not global stale read. - region, err := s.cache.LocateRegionByID(s.bo, s.region) - s.Nil(err) - s.NotNil(region) - - oc := s.regionRequestSender.client - defer func() { - s.regionRequestSender.client = oc - }() - - s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { - if req.StaleRead { - // Mock for stale-read request always return DataIsNotReady error when tikv `ResolvedTS` is blocked. - response = &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{ - RegionError: &errorpb.Error{DataIsNotReady: &errorpb.DataIsNotReady{}}, - }} - } else { - response = &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}} - } - return response, nil - }} - - bo := retry.NewBackofferWithVars(context.Background(), 5, nil) - resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second) - s.Nil(err) - s.NotNil(resp) - regionErr, _ := resp.GetRegionError() - s.Nil(regionErr) - s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) -} - func (s *testRegionRequestToSingleStoreSuite) TestKVReadTimeoutWithDisableBatchClient() { config.UpdateGlobal(func(conf *config.Config) { conf.TiKVClient.MaxBatchSize = 0 diff --git a/internal/locate/replica_selector_test.go b/internal/locate/replica_selector_test.go new file mode 100644 index 0000000000..3e1b40caa0 --- /dev/null +++ b/internal/locate/replica_selector_test.go @@ -0,0 +1,533 @@ +package locate + +import ( + "context" + "fmt" + "math/rand" + "sort" + "strconv" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/errorpb" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/stretchr/testify/suite" + "github.com/tikv/client-go/v2/config/retry" + "github.com/tikv/client-go/v2/internal/apicodec" + "github.com/tikv/client-go/v2/internal/client" + "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" + "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/oracle" + "github.com/tikv/client-go/v2/tikvrpc" +) + +type testReplicaSelectorSuite struct { + suite.Suite + cluster *mocktikv.Cluster + storeIDs []uint64 + peerIDs []uint64 + regionID uint64 + leaderPeer uint64 + cache *RegionCache + bo *retry.Backoffer + mvccStore mocktikv.MVCCStore +} + +func (s *testReplicaSelectorSuite) SetupTest(t *testing.T) { + s.mvccStore = mocktikv.MustNewMVCCStore() + s.cluster = mocktikv.NewCluster(s.mvccStore) + s.storeIDs, s.peerIDs, s.regionID, s.leaderPeer = mocktikv.BootstrapWithMultiStores(s.cluster, 3) + pdCli := &CodecPDClient{mocktikv.NewPDClient(s.cluster), apicodec.NewCodecV1(apicodec.ModeTxn)} + s.cache = NewRegionCache(pdCli) + s.bo = retry.NewNoopBackoff(context.Background()) + s.SetT(t) + s.SetS(s) + + randIntn = func(n int) int { return 0 } + s.NoError(failpoint.Enable("tikvclient/fastBackoffBySkipSleep", `return`)) + s.NoError(failpoint.Enable("tikvclient/skipStoreCheckUntilHealth", `return`)) + + loc, err := s.cache.LocateKey(s.bo, []byte("key")) + s.Nil(err) + r := s.cache.GetCachedRegionWithRLock(loc.Region) + s.NotNil(r) + // The following assumptions are made in the latter tests, which should be checked in advance: + s.Equal(r.GetLeaderStoreID(), uint64(1)) // region's leader in store1. + s.Equal(len(r.getStore().stores), 3) // region has 3 peer(stores). + for _, store := range r.getStore().stores { + s.Equal(store.labels[0].Key, "id") // Each store has a label "id", and the value is the store's ID. + s.Equal(store.labels[0].Value, fmt.Sprintf("%v", store.storeID)) + } +} + +func (s *testReplicaSelectorSuite) TearDownTest() { + s.cache.Close() + s.mvccStore.Close() + + randIntn = rand.Intn + s.NoError(failpoint.Disable("tikvclient/fastBackoffBySkipSleep")) + s.NoError(failpoint.Disable("tikvclient/skipStoreCheckUntilHealth")) +} + +type replicaSelectorAccessPathCase struct { + reqType tikvrpc.CmdType + readType kv.ReplicaReadType + staleRead bool + timeout time.Duration + label *metapb.StoreLabel + accessErr []RegionErrorType + accessErrInValid bool + accessPathResult // use to record the execution result. + expect *accessPathResult // +} + +type accessPathResult struct { + accessPath []string + respErr string + respRegionError *errorpb.Error + backoffCnt int + backoffDetail []string + regionIsValid bool +} + +func TestReplicaReadStaleReadAccessPathByCase(t *testing.T) { + s := new(testReplicaSelectorSuite) + s.SetupTest(t) + defer s.TearDownTest() + + var ca replicaSelectorAccessPathCase + ca = replicaSelectorAccessPathCase{ + reqType: tikvrpc.CmdGet, + readType: kv.ReplicaReadMixed, + staleRead: true, + accessErr: []RegionErrorType{DataIsNotReadyErr, ServerIsBusyErr}, + expect: &accessPathResult{ + accessPath: []string{ + "{addr: store1, replica-read: false, stale-read: true}", + "{addr: store2, replica-read: true, stale-read: false}", + "{addr: store3, replica-read: true, stale-read: false}", + }, + respErr: "", + respRegionError: nil, + backoffCnt: 1, + backoffDetail: []string{"tikvServerBusy+1"}, + regionIsValid: true, + }, + } + s.True(s.runCaseAndCompare(ca)) + + // test stale read with label. + ca = replicaSelectorAccessPathCase{ + reqType: tikvrpc.CmdGet, + readType: kv.ReplicaReadMixed, + staleRead: true, + label: &metapb.StoreLabel{Key: "id", Value: "2"}, + accessErr: []RegionErrorType{DataIsNotReadyErr}, + expect: &accessPathResult{ + accessPath: []string{ + "{addr: store2, replica-read: false, stale-read: true}", + "{addr: store1, replica-read: false, stale-read: false}", // try leader with leader read. + }, + respErr: "", + respRegionError: nil, + backoffCnt: 0, + backoffDetail: []string{}, + regionIsValid: true, + }, + } + s.True(s.runCaseAndCompare(ca)) + + ca = replicaSelectorAccessPathCase{ + reqType: tikvrpc.CmdGet, + readType: kv.ReplicaReadMixed, + staleRead: true, + label: &metapb.StoreLabel{Key: "id", Value: "2"}, + accessErr: []RegionErrorType{DataIsNotReadyErr, ServerIsBusyErr}, + expect: &accessPathResult{ + accessPath: []string{ + "{addr: store2, replica-read: false, stale-read: true}", + "{addr: store1, replica-read: false, stale-read: false}", + "{addr: store2, replica-read: true, stale-read: false}"}, + respErr: "", + respRegionError: nil, + backoffCnt: 0, + backoffDetail: []string{}, + regionIsValid: true, + }, + } + s.True(s.runCaseAndCompare(ca)) +} + +func (s *testReplicaSelectorSuite) runCaseAndCompare(ca2 replicaSelectorAccessPathCase) bool { + ca2.run(s) + if ca2.accessErrInValid { + // the case has been marked as invalid, just ignore it. + return false + } + if ca2.expect != nil { + msg := fmt.Sprintf("%v\n\n", ca2.Format()) + expect := ca2.expect + result := ca2.accessPathResult + s.Equal(expect.accessPath, result.accessPath, msg) + s.Equal(expect.respErr, result.respErr, msg) + s.Equal(expect.respRegionError, result.respRegionError, msg) + s.Equal(expect.regionIsValid, result.regionIsValid, msg) + s.Equal(expect.backoffCnt, result.backoffCnt, msg) + s.Equal(expect.backoffDetail, result.backoffDetail, msg) + } + return true +} + +func (ca *replicaSelectorAccessPathCase) run(s *testReplicaSelectorSuite) { + reachable.injectConstantLiveness(s.cache) // inject reachable liveness. + msg := ca.Format() + access := []string{} + fnClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + idx := len(access) + access = append(access, fmt.Sprintf("{addr: %v, replica-read: %v, stale-read: %v}", addr, req.ReplicaRead, req.StaleRead)) + if idx < len(ca.accessErr) { + if !ca.accessErr[idx].Valid(addr, req) { + // mark this case is invalid. just ignore this case. + ca.accessErrInValid = true + } else { + loc, err := s.cache.LocateKey(s.bo, []byte("key")) + s.Nil(err) + rc := s.cache.GetCachedRegionWithRLock(loc.Region) + s.NotNil(rc) + regionErr, err := ca.genAccessErr(s.cache, rc, ca.accessErr[idx]) + if regionErr != nil { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{ + RegionError: regionErr, + }}, nil + } + if err != nil { + return nil, err + } + } + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{ + Value: []byte("hello world"), + }}, nil + }} + sender := NewRegionRequestSender(s.cache, fnClient) + var req *tikvrpc.Request + switch ca.reqType { + case tikvrpc.CmdGet: + req = tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ + Key: []byte("key"), + }) + case tikvrpc.CmdPrewrite: + req = tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{}) + default: + s.FailNow("unsupported reqType " + ca.reqType.String()) + } + if ca.staleRead { + req.EnableStaleWithMixedReplicaRead() + req.ReadReplicaScope = oracle.GlobalTxnScope + req.TxnScope = oracle.GlobalTxnScope + } else { + req.ReplicaReadType = ca.readType + req.ReplicaRead = ca.readType.IsFollowerRead() + } + opts := []StoreSelectorOption{} + if ca.label != nil { + opts = append(opts, WithMatchLabels([]*metapb.StoreLabel{ca.label})) + } + // reset slow score, since serverIsBusyErr will mark the store is slow, and affect remaining test cases. + loc, err := s.cache.LocateKey(s.bo, []byte("key")) + s.Nil(err) + rc := s.cache.GetCachedRegionWithRLock(loc.Region) + s.NotNil(rc) + for _, store := range rc.getStore().stores { + store.slowScore.resetSlowScore() + atomic.StoreUint32(&store.livenessState, uint32(reachable)) + store.setResolveState(resolved) + } + + bo := retry.NewBackofferWithVars(context.Background(), 40000, nil) + timeout := ca.timeout + if timeout == 0 { + timeout = client.ReadTimeoutShort + } + resp, _, _, err := sender.SendReqCtx(bo, req, loc.Region, timeout, tikvrpc.TiKV, opts...) + if err == nil { + s.NotNil(resp, msg) + regionErr, err := resp.GetRegionError() + s.Nil(err, msg) + ca.respRegionError = regionErr + } else { + ca.respErr = err.Error() + } + ca.accessPath = access + ca.backoffCnt = bo.GetTotalBackoffTimes() + detail := make([]string, 0, len(bo.GetBackoffTimes())) + for tp, cnt := range bo.GetBackoffTimes() { + detail = append(detail, fmt.Sprintf("%v+%v", tp, cnt)) + } + sort.Strings(detail) + ca.backoffDetail = detail + ca.regionIsValid = sender.replicaSelector.region.isValid() + sender.replicaSelector.invalidateRegion() // invalidate region to reload for next test case. +} + +func (ca *replicaSelectorAccessPathCase) genAccessErr(regionCache *RegionCache, r *Region, accessErr RegionErrorType) (regionErr *errorpb.Error, err error) { + genNotLeaderErr := func(storeID uint64) *errorpb.Error { + var peerInStore *metapb.Peer + for _, peer := range r.meta.Peers { + if peer.StoreId == storeID { + peerInStore = peer + break + } + } + return &errorpb.Error{ + NotLeader: &errorpb.NotLeader{ + RegionId: r.meta.Id, + Leader: peerInStore, + }, + } + } + switch accessErr { + case NotLeaderWithNewLeader1Err: + regionErr = genNotLeaderErr(1) + case NotLeaderWithNewLeader2Err: + regionErr = genNotLeaderErr(2) + case NotLeaderWithNewLeader3Err: + regionErr = genNotLeaderErr(3) + default: + regionErr, err = accessErr.GenError() + } + if err != nil { + // inject unreachable liveness. + unreachable.injectConstantLiveness(regionCache) + } + return regionErr, err +} + +func (c *replicaSelectorAccessPathCase) Format() string { + label := "" + if c.label != nil { + label = fmt.Sprintf("%v->%v", c.label.Key, c.label.Value) + } + respRegionError := "" + if c.respRegionError != nil { + respRegionError = c.respRegionError.String() + } + accessErr := make([]string, len(c.accessErr)) + for i := range c.accessErr { + accessErr[i] = c.accessErr[i].String() + } + return fmt.Sprintf("{\n"+ + "\treq: %v\n"+ + "\tread_type: %v\n"+ + "\tstale_read: %v\n"+ + "\ttimeout: %v\n"+ + "\tlabel: %v\n"+ + "\taccess_err: %v\n"+ + "\taccess_path: %v\n"+ + "\tresp_err: %v\n"+ + "\tresp_region_err: %v\n"+ + "\tbackoff_cnt: %v\n"+ + "\tbackoff_detail: %v\n"+ + "\tregion_is_valid: %v\n}", + c.reqType, c.readType, c.staleRead, c.timeout, label, strings.Join(accessErr, ", "), strings.Join(c.accessPath, ", "), + c.respErr, respRegionError, c.backoffCnt, strings.Join(c.backoffDetail, ", "), c.regionIsValid) +} + +type RegionErrorType int + +const ( + NotLeaderErr RegionErrorType = iota + 1 + NotLeaderWithNewLeader1Err + NotLeaderWithNewLeader2Err + NotLeaderWithNewLeader3Err + RegionNotFoundErr + KeyNotInRegionErr + EpochNotMatchErr + ServerIsBusyErr + ServerIsBusyWithEstimatedWaitMsErr + StaleCommandErr + StoreNotMatchErr + RaftEntryTooLargeErr + MaxTimestampNotSyncedErr + ReadIndexNotReadyErr + ProposalInMergingModeErr + DataIsNotReadyErr + RegionNotInitializedErr + DiskFullErr + RecoveryInProgressErr + FlashbackInProgressErr + FlashbackNotPreparedErr + IsWitnessErr + MismatchPeerIdErr + BucketVersionNotMatchErr + // following error type is not region error. + DeadLineExceededErr + RegionErrorTypeMax +) + +func (tp RegionErrorType) GenRegionError() *errorpb.Error { + err := &errorpb.Error{} + switch tp { + case NotLeaderErr: + err.NotLeader = &errorpb.NotLeader{} + case RegionNotFoundErr: + err.RegionNotFound = &errorpb.RegionNotFound{} + case KeyNotInRegionErr: + err.KeyNotInRegion = &errorpb.KeyNotInRegion{} + case EpochNotMatchErr: + err.EpochNotMatch = &errorpb.EpochNotMatch{} + case ServerIsBusyErr: + err.ServerIsBusy = &errorpb.ServerIsBusy{} + case ServerIsBusyWithEstimatedWaitMsErr: + err.ServerIsBusy = &errorpb.ServerIsBusy{EstimatedWaitMs: 10} + case StaleCommandErr: + err.StaleCommand = &errorpb.StaleCommand{} + case StoreNotMatchErr: + err.StoreNotMatch = &errorpb.StoreNotMatch{} + case RaftEntryTooLargeErr: + err.RaftEntryTooLarge = &errorpb.RaftEntryTooLarge{} + case MaxTimestampNotSyncedErr: + err.MaxTimestampNotSynced = &errorpb.MaxTimestampNotSynced{} + case ReadIndexNotReadyErr: + err.ReadIndexNotReady = &errorpb.ReadIndexNotReady{} + case ProposalInMergingModeErr: + err.ProposalInMergingMode = &errorpb.ProposalInMergingMode{} + case DataIsNotReadyErr: + err.DataIsNotReady = &errorpb.DataIsNotReady{} + case RegionNotInitializedErr: + err.RegionNotInitialized = &errorpb.RegionNotInitialized{} + case DiskFullErr: + err.DiskFull = &errorpb.DiskFull{} + case RecoveryInProgressErr: + err.RecoveryInProgress = &errorpb.RecoveryInProgress{} + case FlashbackInProgressErr: + err.FlashbackInProgress = &errorpb.FlashbackInProgress{} + case FlashbackNotPreparedErr: + err.FlashbackNotPrepared = &errorpb.FlashbackNotPrepared{} + case IsWitnessErr: + err.IsWitness = &errorpb.IsWitness{} + case MismatchPeerIdErr: + err.MismatchPeerId = &errorpb.MismatchPeerId{} + case BucketVersionNotMatchErr: + err.BucketVersionNotMatch = &errorpb.BucketVersionNotMatch{} + default: + return nil + } + return err +} + +func (tp RegionErrorType) GenError() (*errorpb.Error, error) { + regionErr := tp.GenRegionError() + if regionErr != nil { + return regionErr, nil + } + switch tp { + case DeadLineExceededErr: + return nil, context.DeadlineExceeded + } + return nil, nil +} + +func (tp RegionErrorType) Valid(addr string, req *tikvrpc.Request) bool { + // leader-read. + if !req.StaleRead && !req.ReplicaRead { + switch tp { + case DataIsNotReadyErr: + // DataIsNotReadyErr only return when req is a stale read. + return false + } + } + // replica-read. + if !req.StaleRead && req.ReplicaRead { + switch tp { + case NotLeaderErr, NotLeaderWithNewLeader1Err, NotLeaderWithNewLeader2Err, NotLeaderWithNewLeader3Err: + // NotLeaderErr will not return in replica read. + return false + case DataIsNotReadyErr: + // DataIsNotReadyErr only return when req is a stale read. + return false + } + } + // stale-read. + if req.StaleRead && !req.ReplicaRead { + switch tp { + case NotLeaderErr, NotLeaderWithNewLeader1Err, NotLeaderWithNewLeader2Err, NotLeaderWithNewLeader3Err: + // NotLeaderErr will not return in stale read. + return false + } + } + // store1 can't return a not leader error with new leader in store1. + if addr == "store1" && tp == NotLeaderWithNewLeader1Err { + return false + } + // ditto. + if addr == "store2" && tp == NotLeaderWithNewLeader2Err { + return false + } + // ditto. + if addr == "store3" && tp == NotLeaderWithNewLeader3Err { + return false + } + return true +} + +func (tp RegionErrorType) String() string { + switch tp { + case NotLeaderErr: + return "NotLeaderErr" + case NotLeaderWithNewLeader1Err: + return "NotLeaderWithNewLeader1Err" + case NotLeaderWithNewLeader2Err: + return "NotLeaderWithNewLeader2Err" + case NotLeaderWithNewLeader3Err: + return "NotLeaderWithNewLeader3Err" + case RegionNotFoundErr: + return "RegionNotFoundErr" + case KeyNotInRegionErr: + return "KeyNotInRegionErr" + case EpochNotMatchErr: + return "EpochNotMatchErr" + case ServerIsBusyErr: + return "ServerIsBusyErr" + case ServerIsBusyWithEstimatedWaitMsErr: + return "ServerIsBusyWithEstimatedWaitMsErr" + case StaleCommandErr: + return "StaleCommandErr" + case StoreNotMatchErr: + return "StoreNotMatchErr" + case RaftEntryTooLargeErr: + return "RaftEntryTooLargeErr" + case MaxTimestampNotSyncedErr: + return "MaxTimestampNotSyncedErr" + case ReadIndexNotReadyErr: + return "ReadIndexNotReadyErr" + case ProposalInMergingModeErr: + return "ProposalInMergingModeErr" + case DataIsNotReadyErr: + return "DataIsNotReadyErr" + case RegionNotInitializedErr: + return "RegionNotInitializedErr" + case DiskFullErr: + return "DiskFullErr" + case RecoveryInProgressErr: + return "RecoveryInProgressErr" + case FlashbackInProgressErr: + return "FlashbackInProgressErr" + case FlashbackNotPreparedErr: + return "FlashbackNotPreparedErr" + case IsWitnessErr: + return "IsWitnessErr" + case MismatchPeerIdErr: + return "MismatchPeerIdErr" + case BucketVersionNotMatchErr: + return "BucketVersionNotMatchErr" + case DeadLineExceededErr: + return "DeadLineExceededErr" + default: + return "unknown_" + strconv.Itoa(int(tp)) + } +} diff --git a/internal/locate/slow_score.go b/internal/locate/slow_score.go index 562c9e9db7..d67c846b3c 100644 --- a/internal/locate/slow_score.go +++ b/internal/locate/slow_score.go @@ -153,6 +153,11 @@ func (ss *SlowScoreStat) markAlreadySlow() { atomic.StoreUint64(&ss.avgScore, slowScoreMax) } +// resetSlowScore resets the slow score to 0. It's used for test. +func (ss *SlowScoreStat) resetSlowScore() { + atomic.StoreUint64(&ss.avgScore, 0) +} + func (ss *SlowScoreStat) isSlow() bool { return ss.getSlowScore() >= slowScoreThreshold }