Skip to content

Commit

Permalink
stale read request shoudn't retry leader if leader is already tried
Browse files Browse the repository at this point in the history
Signed-off-by: crazycs520 <[email protected]>
  • Loading branch information
crazycs520 committed Feb 23, 2024
1 parent 8d28d3c commit 1f7eb94
Show file tree
Hide file tree
Showing 8 changed files with 562 additions and 218 deletions.
6 changes: 6 additions & 0 deletions config/retry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -191,6 +192,11 @@ func newBackoffFn(base, cap, jitter int) backoffFn {
if maxSleepMs >= 0 && realSleep > maxSleepMs {
realSleep = maxSleepMs
}
if _, err := util.EvalFailpoint("fastBackoffBySkipSleep"); err == nil {
attempts++
lastSleep = sleep
return realSleep
}
select {
case <-time.After(time.Duration(realSleep) * time.Millisecond):
attempts++
Expand Down
3 changes: 3 additions & 0 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2843,6 +2843,9 @@ func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoff
reResolveInterval = dur
}
}
if _, err := util.EvalFailpoint("skipStoreCheckUntilHealth"); err == nil {
return
}
go s.checkUntilHealth(c, liveness, reResolveInterval)
}
return
Expand Down
23 changes: 8 additions & 15 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ import (
// network error because tidb-server expect tikv client to exit as soon as possible.
var shuttingDown uint32

var randIntn = rand.Intn

// StoreShuttingDown atomically stores ShuttingDown into v.
func StoreShuttingDown(v uint32) {
atomic.StoreUint32(&shuttingDown, v)
Expand Down Expand Up @@ -612,7 +614,7 @@ func (state *tryNewProxy) next(bo *retry.Backoffer, selector *replicaSelector) (
}

// Skip advanceCnt valid candidates to find a proxy peer randomly
advanceCnt := rand.Intn(candidateNum)
advanceCnt := randIntn(candidateNum)
for idx, replica := range selector.replicas {
if !state.isCandidate(AccessIndex(idx), replica) {
continue
Expand Down Expand Up @@ -668,13 +670,13 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
resetStaleRead := false
if state.lastIdx < 0 {
if state.tryLeader {
state.lastIdx = AccessIndex(rand.Intn(replicaSize))
state.lastIdx = AccessIndex(randIntn(replicaSize))
} else {
if replicaSize <= 1 {
state.lastIdx = state.leaderIdx
} else {
// Randomly select a non-leader peer
state.lastIdx = AccessIndex(rand.Intn(replicaSize - 1))
state.lastIdx = AccessIndex(randIntn(replicaSize - 1))
if state.lastIdx >= state.leaderIdx {
state.lastIdx++
}
Expand All @@ -696,7 +698,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
}
var offset int
if state.lastIdx >= 0 {
offset = rand.Intn(replicaSize)
offset = randIntn(replicaSize)
}
reloadRegion := false
for i := 0; i < replicaSize && !state.option.leaderOnly; i++ {
Expand Down Expand Up @@ -791,16 +793,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
}

func (state *accessFollower) IsLeaderExhausted(leader *replica) bool {
// Allow another extra retry for the following case:
// 1. The stale read is enabled and leader peer is selected as the target peer at first.
// 2. Data is not ready is returned from the leader peer.
// 3. Stale read flag is removed and processing falls back to snapshot read on the leader peer.
// 4. The leader peer should be retried again using snapshot read.
if state.isStaleRead && state.option.leaderOnly {
return leader.isExhausted(2, 0)
} else {
return leader.isExhausted(1, 0)
}
return leader.isExhausted(1, 0)
}

func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) {
Expand Down Expand Up @@ -845,7 +838,7 @@ func (state *tryIdleReplica) next(bo *retry.Backoffer, selector *replicaSelector
// Select a follower replica that has the lowest estimated wait duration
minWait := time.Duration(math.MaxInt64)
targetIdx := state.leaderIdx
startIdx := rand.Intn(len(selector.replicas))
startIdx := randIntn(len(selector.replicas))
for i := 0; i < len(selector.replicas); i++ {
idx := (i + startIdx) % len(selector.replicas)
r := selector.replicas[idx]
Expand Down
103 changes: 4 additions & 99 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,10 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
// TODO(youjiali1995): Remove duplicated tests. This test may be duplicated with other
// tests but it's a dedicated one to test sending requests with the replica selector.
func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
s.NoError(failpoint.Enable("tikvclient/fastBackoffBySkipSleep", `return`))
defer func() {
s.NoError(failpoint.Disable("tikvclient/fastBackoffBySkipSleep"))
}()
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("key"),
Value: []byte("value"),
Expand Down Expand Up @@ -987,49 +991,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
for _, store := range s.storeIDs {
s.cluster.StartStore(store)
}

// Verify switch to the leader immediately when stale read requests with global txn scope meet region errors.
s.cluster.ChangeLeader(region.Region.id, s.peerIDs[0])
reachable.injectConstantLiveness(s.cache)
s.Eventually(func() bool {
stores := s.regionRequestSender.replicaSelector.regionStore.stores
return stores[0].getLivenessState() == reachable &&
stores[1].getLivenessState() == reachable &&
stores[2].getLivenessState() == reachable
}, 3*time.Second, 200*time.Millisecond)
reloadRegion()
req = tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")})
req.ReadReplicaScope = oracle.GlobalTxnScope
req.TxnScope = oracle.GlobalTxnScope
for i := 0; i < 10; i++ {
req.EnableStaleWithMixedReplicaRead()
// The request may be sent to the leader directly. We have to distinguish it.
failureOnFollower := 0
failureOnLeader := 0
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
if addr != s.cluster.GetStore(s.storeIDs[0]).Address {
failureOnFollower++
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{}}}, nil
} else if failureOnLeader == 0 && i%2 == 0 {
failureOnLeader++
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{}}}, nil
} else {
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{}}, nil
}
}}
sender.SendReq(bo, req, region.Region, time.Second)
state, ok := sender.replicaSelector.state.(*accessFollower)
s.True(ok)
s.True(failureOnFollower <= 1) // any retry should go to the leader, hence at most one failure on the follower allowed
if failureOnFollower == 0 && failureOnLeader == 0 {
// if the request goes to the leader and succeeds then it is executed as a StaleRead
s.True(req.StaleRead)
} else {
// otherwise #leaderOnly flag should be set and retry request as a normal read
s.True(state.option.leaderOnly)
s.False(req.StaleRead)
}
}
}

func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() {
Expand Down Expand Up @@ -1223,62 +1184,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown()
}
}

func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() {
leaderStore, _ := s.loadAndGetLeaderStore()
leaderLabel := []*metapb.StoreLabel{
{
Key: "id",
Value: strconv.FormatUint(leaderStore.StoreID(), 10),
},
}
regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
s.Nil(err)
s.NotNil(regionLoc)
value := []byte("value")
isFirstReq := true

s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
select {
case <-ctx.Done():
return nil, errors.New("timeout")
default:
}
// Return `DataIsNotReady` for the first time on leader.
if isFirstReq {
isFirstReq = false
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
DataIsNotReady: &errorpb.DataIsNotReady{},
}}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil
}}

region, _ := s.cache.searchCachedRegionByID(regionLoc.Region.GetID())
s.True(region.isValid())

req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil)
req.ReadReplicaScope = oracle.GlobalTxnScope
req.TxnScope = oracle.GlobalTxnScope
req.EnableStaleWithMixedReplicaRead()
req.ReplicaReadType = kv.ReplicaReadMixed
var ops []StoreSelectorOption
ops = append(ops, WithMatchLabels(leaderLabel))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
bo := retry.NewBackoffer(ctx, -1)
s.Nil(err)
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...)
s.Nil(err)

regionErr, err := resp.GetRegionError()
s.Nil(err)
s.Nil(regionErr)
getResp, ok := resp.Resp.(*kvrpcpb.GetResponse)
s.True(ok)
s.Equal(getResp.Value, value)
}

func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() {
leaderAddr := ""
reqTargetAddrs := make(map[string]struct{})
Expand Down
71 changes: 3 additions & 68 deletions internal/locate/region_request_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,17 +348,6 @@ func TestRegionCacheStaleRead(t *testing.T) {
followerSuccessReplica: []string{"z2"},
followerSuccessReadType: SuccessStaleRead,
},
{
do: leaderDataIsNotReady,
leaderRegionValid: true,
leaderAsyncReload: Some(false),
leaderSuccessReplica: []string{"z1"},
leaderSuccessReadType: SuccessLeaderRead,
followerRegionValid: true,
followerAsyncReload: Some(false),
followerSuccessReplica: []string{"z2"},
followerSuccessReadType: SuccessStaleRead,
},
{
do: followerDataIsNotReady,
leaderRegionValid: true,
Expand Down Expand Up @@ -395,45 +384,6 @@ func TestRegionCacheStaleRead(t *testing.T) {
followerSuccessReplica: []string{"z1"},
followerSuccessReadType: SuccessLeaderRead,
},
{
do: leaderDataIsNotReady,
extra: []func(suite *testRegionCacheStaleReadSuite){followerServerIsBusy},
recoverable: true,
leaderRegionValid: true,
leaderAsyncReload: Some(false),
leaderSuccessReplica: []string{"z1"},
leaderSuccessReadType: SuccessLeaderRead,
followerRegionValid: true,
followerAsyncReload: Some(false),
followerSuccessReplica: []string{"z1"},
followerSuccessReadType: SuccessLeaderRead,
},
{
do: leaderDataIsNotReady,
extra: []func(suite *testRegionCacheStaleReadSuite){followerDataIsNotReady},
recoverable: true,
leaderRegionValid: true,
leaderAsyncReload: Some(false),
leaderSuccessReplica: []string{"z1"},
leaderSuccessReadType: SuccessLeaderRead,
followerRegionValid: true,
followerAsyncReload: Some(false),
followerSuccessReplica: []string{"z1"},
followerSuccessReadType: SuccessLeaderRead,
},
{
do: leaderDataIsNotReady,
extra: []func(suite *testRegionCacheStaleReadSuite){followerDown},
recoverable: true,
leaderRegionValid: true,
leaderAsyncReload: Some(false),
leaderSuccessReplica: []string{"z1"},
leaderSuccessReadType: SuccessLeaderRead,
followerRegionValid: true,
followerAsyncReload: Some(false),
followerSuccessReplica: []string{"z1"},
followerSuccessReadType: SuccessLeaderRead,
},
{
do: leaderServerIsBusy,
extra: []func(suite *testRegionCacheStaleReadSuite){followerServerIsBusy},
Expand Down Expand Up @@ -598,10 +548,11 @@ func testStaleRead(s *testRegionCacheStaleReadSuite, r *RegionCacheTestCase, zon
return
}

msg := fmt.Sprintf("%v %#v", string(resp.Resp.(*kvrpcpb.GetResponse).Value), r)
_, successZone, successReadType := s.extractResp(resp)
find := false
if leaderZone {
s.Equal(r.leaderSuccessReadType, successReadType)
s.Equal(r.leaderSuccessReadType, successReadType, msg)
for _, z := range r.leaderSuccessReplica {
if z == successZone {
find = true
Expand All @@ -617,7 +568,7 @@ func testStaleRead(s *testRegionCacheStaleReadSuite, r *RegionCacheTestCase, zon
}
}
}
s.True(find)
s.True(find, msg)
}

type Option[T interface{}] struct {
Expand Down Expand Up @@ -767,22 +718,6 @@ func leaderDownAndElect(s *testRegionCacheStaleReadSuite) {
s.setUnavailableStore(leader.Id)
}

func leaderDataIsNotReady(s *testRegionCacheStaleReadSuite) {
peerID, _ := s.getLeader()
s.injection.leaderRegionError = func(req *tikvrpc.Request, zone string) *errorpb.Error {
if !req.StaleRead || zone != "z1" {
return nil
}
return &errorpb.Error{
DataIsNotReady: &errorpb.DataIsNotReady{
RegionId: s.regionID,
PeerId: peerID,
SafeTs: 0,
},
}
}
}

func leaderServerIsBusy(s *testRegionCacheStaleReadSuite) {
s.injection.leaderRegionError = func(req *tikvrpc.Request, zone string) *errorpb.Error {
if zone != "z1" {
Expand Down
36 changes: 0 additions & 36 deletions internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,42 +679,6 @@ func (s *testRegionRequestToSingleStoreSuite) TestCloseConnectionOnStoreNotMatch
s.Equal(target, client.closedAddr)
}

func (s *testRegionRequestToSingleStoreSuite) TestStaleReadRetry() {
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
Key: []byte("key"),
})
req.EnableStaleWithMixedReplicaRead()
req.ReadReplicaScope = "z1" // not global stale read.
region, err := s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
s.NotNil(region)

oc := s.regionRequestSender.client
defer func() {
s.regionRequestSender.client = oc
}()

s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
if req.StaleRead {
// Mock for stale-read request always return DataIsNotReady error when tikv `ResolvedTS` is blocked.
response = &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{
RegionError: &errorpb.Error{DataIsNotReady: &errorpb.DataIsNotReady{}},
}}
} else {
response = &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}}
}
return response, nil
}}

bo := retry.NewBackofferWithVars(context.Background(), 5, nil)
resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.NotNil(resp)
regionErr, _ := resp.GetRegionError()
s.Nil(regionErr)
s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value)
}

func (s *testRegionRequestToSingleStoreSuite) TestKVReadTimeoutWithDisableBatchClient() {
config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.MaxBatchSize = 0
Expand Down
Loading

0 comments on commit 1f7eb94

Please sign in to comment.