Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stale read request shoudn't retry leader if leader is already tried #1174

Merged
merged 1 commit into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config/retry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -191,6 +192,11 @@ func newBackoffFn(base, cap, jitter int) backoffFn {
if maxSleepMs >= 0 && realSleep > maxSleepMs {
realSleep = maxSleepMs
}
if _, err := util.EvalFailpoint("fastBackoffBySkipSleep"); err == nil {
attempts++
lastSleep = sleep
return realSleep
}
select {
case <-time.After(time.Duration(realSleep) * time.Millisecond):
attempts++
Expand Down
3 changes: 3 additions & 0 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2843,6 +2843,9 @@ func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoff
reResolveInterval = dur
}
}
if _, err := util.EvalFailpoint("skipStoreCheckUntilHealth"); err == nil {
return
}
go s.checkUntilHealth(c, liveness, reResolveInterval)
}
return
Expand Down
24 changes: 9 additions & 15 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ import (
// network error because tidb-server expect tikv client to exit as soon as possible.
var shuttingDown uint32

// randIntn is only use for testing.
var randIntn = rand.Intn
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved

// StoreShuttingDown atomically stores ShuttingDown into v.
func StoreShuttingDown(v uint32) {
atomic.StoreUint32(&shuttingDown, v)
Expand Down Expand Up @@ -612,7 +615,7 @@ func (state *tryNewProxy) next(bo *retry.Backoffer, selector *replicaSelector) (
}

// Skip advanceCnt valid candidates to find a proxy peer randomly
advanceCnt := rand.Intn(candidateNum)
advanceCnt := randIntn(candidateNum)
for idx, replica := range selector.replicas {
if !state.isCandidate(AccessIndex(idx), replica) {
continue
Expand Down Expand Up @@ -668,13 +671,13 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
resetStaleRead := false
if state.lastIdx < 0 {
if state.tryLeader {
state.lastIdx = AccessIndex(rand.Intn(replicaSize))
state.lastIdx = AccessIndex(randIntn(replicaSize))
} else {
if replicaSize <= 1 {
state.lastIdx = state.leaderIdx
} else {
// Randomly select a non-leader peer
state.lastIdx = AccessIndex(rand.Intn(replicaSize - 1))
state.lastIdx = AccessIndex(randIntn(replicaSize - 1))
if state.lastIdx >= state.leaderIdx {
state.lastIdx++
}
Expand All @@ -696,7 +699,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
}
var offset int
if state.lastIdx >= 0 {
offset = rand.Intn(replicaSize)
offset = randIntn(replicaSize)
}
reloadRegion := false
for i := 0; i < replicaSize && !state.option.leaderOnly; i++ {
Expand Down Expand Up @@ -791,16 +794,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
}

func (state *accessFollower) IsLeaderExhausted(leader *replica) bool {
// Allow another extra retry for the following case:
// 1. The stale read is enabled and leader peer is selected as the target peer at first.
// 2. Data is not ready is returned from the leader peer.
// 3. Stale read flag is removed and processing falls back to snapshot read on the leader peer.
// 4. The leader peer should be retried again using snapshot read.
if state.isStaleRead && state.option.leaderOnly {
return leader.isExhausted(2, 0)
} else {
return leader.isExhausted(1, 0)
}
return leader.isExhausted(1, 0)
}

func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) {
Expand Down Expand Up @@ -845,7 +839,7 @@ func (state *tryIdleReplica) next(bo *retry.Backoffer, selector *replicaSelector
// Select a follower replica that has the lowest estimated wait duration
minWait := time.Duration(math.MaxInt64)
targetIdx := state.leaderIdx
startIdx := rand.Intn(len(selector.replicas))
startIdx := randIntn(len(selector.replicas))
for i := 0; i < len(selector.replicas); i++ {
idx := (i + startIdx) % len(selector.replicas)
r := selector.replicas[idx]
Expand Down
103 changes: 4 additions & 99 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,10 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
// TODO(youjiali1995): Remove duplicated tests. This test may be duplicated with other
// tests but it's a dedicated one to test sending requests with the replica selector.
func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
s.NoError(failpoint.Enable("tikvclient/fastBackoffBySkipSleep", `return`))
defer func() {
s.NoError(failpoint.Disable("tikvclient/fastBackoffBySkipSleep"))
}()
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("key"),
Value: []byte("value"),
Expand Down Expand Up @@ -987,49 +991,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
for _, store := range s.storeIDs {
s.cluster.StartStore(store)
}

// Verify switch to the leader immediately when stale read requests with global txn scope meet region errors.
s.cluster.ChangeLeader(region.Region.id, s.peerIDs[0])
reachable.injectConstantLiveness(s.cache)
s.Eventually(func() bool {
stores := s.regionRequestSender.replicaSelector.regionStore.stores
return stores[0].getLivenessState() == reachable &&
stores[1].getLivenessState() == reachable &&
stores[2].getLivenessState() == reachable
}, 3*time.Second, 200*time.Millisecond)
reloadRegion()
req = tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")})
req.ReadReplicaScope = oracle.GlobalTxnScope
req.TxnScope = oracle.GlobalTxnScope
for i := 0; i < 10; i++ {
req.EnableStaleWithMixedReplicaRead()
// The request may be sent to the leader directly. We have to distinguish it.
failureOnFollower := 0
failureOnLeader := 0
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
if addr != s.cluster.GetStore(s.storeIDs[0]).Address {
failureOnFollower++
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{}}}, nil
} else if failureOnLeader == 0 && i%2 == 0 {
failureOnLeader++
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{}}}, nil
} else {
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{}}, nil
}
}}
sender.SendReq(bo, req, region.Region, time.Second)
state, ok := sender.replicaSelector.state.(*accessFollower)
s.True(ok)
s.True(failureOnFollower <= 1) // any retry should go to the leader, hence at most one failure on the follower allowed
if failureOnFollower == 0 && failureOnLeader == 0 {
// if the request goes to the leader and succeeds then it is executed as a StaleRead
s.True(req.StaleRead)
} else {
// otherwise #leaderOnly flag should be set and retry request as a normal read
s.True(state.option.leaderOnly)
s.False(req.StaleRead)
}
}
}

func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() {
Expand Down Expand Up @@ -1223,62 +1184,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown()
}
}

func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() {
leaderStore, _ := s.loadAndGetLeaderStore()
leaderLabel := []*metapb.StoreLabel{
{
Key: "id",
Value: strconv.FormatUint(leaderStore.StoreID(), 10),
},
}
regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
s.Nil(err)
s.NotNil(regionLoc)
value := []byte("value")
isFirstReq := true

s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
select {
case <-ctx.Done():
return nil, errors.New("timeout")
default:
}
// Return `DataIsNotReady` for the first time on leader.
if isFirstReq {
isFirstReq = false
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
DataIsNotReady: &errorpb.DataIsNotReady{},
}}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil
}}

region, _ := s.cache.searchCachedRegionByID(regionLoc.Region.GetID())
s.True(region.isValid())

req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil)
req.ReadReplicaScope = oracle.GlobalTxnScope
req.TxnScope = oracle.GlobalTxnScope
req.EnableStaleWithMixedReplicaRead()
req.ReplicaReadType = kv.ReplicaReadMixed
var ops []StoreSelectorOption
ops = append(ops, WithMatchLabels(leaderLabel))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
bo := retry.NewBackoffer(ctx, -1)
s.Nil(err)
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...)
s.Nil(err)

regionErr, err := resp.GetRegionError()
s.Nil(err)
s.Nil(regionErr)
getResp, ok := resp.Resp.(*kvrpcpb.GetResponse)
s.True(ok)
s.Equal(getResp.Value, value)
}

func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() {
leaderAddr := ""
reqTargetAddrs := make(map[string]struct{})
Expand Down
71 changes: 3 additions & 68 deletions internal/locate/region_request_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,17 +348,6 @@ func TestRegionCacheStaleRead(t *testing.T) {
followerSuccessReplica: []string{"z2"},
followerSuccessReadType: SuccessStaleRead,
},
{
do: leaderDataIsNotReady,
leaderRegionValid: true,
leaderAsyncReload: Some(false),
leaderSuccessReplica: []string{"z1"},
leaderSuccessReadType: SuccessLeaderRead,
followerRegionValid: true,
followerAsyncReload: Some(false),
followerSuccessReplica: []string{"z2"},
followerSuccessReadType: SuccessStaleRead,
},
{
do: followerDataIsNotReady,
leaderRegionValid: true,
Expand Down Expand Up @@ -395,45 +384,6 @@ func TestRegionCacheStaleRead(t *testing.T) {
followerSuccessReplica: []string{"z1"},
followerSuccessReadType: SuccessLeaderRead,
},
{
do: leaderDataIsNotReady,
extra: []func(suite *testRegionCacheStaleReadSuite){followerServerIsBusy},
recoverable: true,
leaderRegionValid: true,
leaderAsyncReload: Some(false),
leaderSuccessReplica: []string{"z1"},
leaderSuccessReadType: SuccessLeaderRead,
followerRegionValid: true,
followerAsyncReload: Some(false),
followerSuccessReplica: []string{"z1"},
followerSuccessReadType: SuccessLeaderRead,
},
{
do: leaderDataIsNotReady,
extra: []func(suite *testRegionCacheStaleReadSuite){followerDataIsNotReady},
recoverable: true,
leaderRegionValid: true,
leaderAsyncReload: Some(false),
leaderSuccessReplica: []string{"z1"},
leaderSuccessReadType: SuccessLeaderRead,
followerRegionValid: true,
followerAsyncReload: Some(false),
followerSuccessReplica: []string{"z1"},
followerSuccessReadType: SuccessLeaderRead,
},
{
do: leaderDataIsNotReady,
extra: []func(suite *testRegionCacheStaleReadSuite){followerDown},
recoverable: true,
leaderRegionValid: true,
leaderAsyncReload: Some(false),
leaderSuccessReplica: []string{"z1"},
leaderSuccessReadType: SuccessLeaderRead,
followerRegionValid: true,
followerAsyncReload: Some(false),
followerSuccessReplica: []string{"z1"},
followerSuccessReadType: SuccessLeaderRead,
},
{
do: leaderServerIsBusy,
extra: []func(suite *testRegionCacheStaleReadSuite){followerServerIsBusy},
Expand Down Expand Up @@ -598,10 +548,11 @@ func testStaleRead(s *testRegionCacheStaleReadSuite, r *RegionCacheTestCase, zon
return
}

msg := fmt.Sprintf("%v %#v", string(resp.Resp.(*kvrpcpb.GetResponse).Value), r)
_, successZone, successReadType := s.extractResp(resp)
find := false
if leaderZone {
s.Equal(r.leaderSuccessReadType, successReadType)
s.Equal(r.leaderSuccessReadType, successReadType, msg)
for _, z := range r.leaderSuccessReplica {
if z == successZone {
find = true
Expand All @@ -617,7 +568,7 @@ func testStaleRead(s *testRegionCacheStaleReadSuite, r *RegionCacheTestCase, zon
}
}
}
s.True(find)
s.True(find, msg)
}

type Option[T interface{}] struct {
Expand Down Expand Up @@ -767,22 +718,6 @@ func leaderDownAndElect(s *testRegionCacheStaleReadSuite) {
s.setUnavailableStore(leader.Id)
}

func leaderDataIsNotReady(s *testRegionCacheStaleReadSuite) {
peerID, _ := s.getLeader()
s.injection.leaderRegionError = func(req *tikvrpc.Request, zone string) *errorpb.Error {
if !req.StaleRead || zone != "z1" {
return nil
}
return &errorpb.Error{
DataIsNotReady: &errorpb.DataIsNotReady{
RegionId: s.regionID,
PeerId: peerID,
SafeTs: 0,
},
}
}
}

func leaderServerIsBusy(s *testRegionCacheStaleReadSuite) {
s.injection.leaderRegionError = func(req *tikvrpc.Request, zone string) *errorpb.Error {
if zone != "z1" {
Expand Down
36 changes: 0 additions & 36 deletions internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,42 +679,6 @@ func (s *testRegionRequestToSingleStoreSuite) TestCloseConnectionOnStoreNotMatch
s.Equal(target, client.closedAddr)
}

func (s *testRegionRequestToSingleStoreSuite) TestStaleReadRetry() {
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
Key: []byte("key"),
})
req.EnableStaleWithMixedReplicaRead()
req.ReadReplicaScope = "z1" // not global stale read.
region, err := s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
s.NotNil(region)

oc := s.regionRequestSender.client
defer func() {
s.regionRequestSender.client = oc
}()

s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
if req.StaleRead {
// Mock for stale-read request always return DataIsNotReady error when tikv `ResolvedTS` is blocked.
response = &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{
RegionError: &errorpb.Error{DataIsNotReady: &errorpb.DataIsNotReady{}},
}}
} else {
response = &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}}
}
return response, nil
}}

bo := retry.NewBackofferWithVars(context.Background(), 5, nil)
resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.NotNil(resp)
regionErr, _ := resp.GetRegionError()
s.Nil(regionErr)
s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value)
}

func (s *testRegionRequestToSingleStoreSuite) TestKVReadTimeoutWithDisableBatchClient() {
config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.MaxBatchSize = 0
Expand Down
Loading
Loading