diff --git a/internal/client/client.go b/internal/client/client.go index e6a65ce6f..9bed1129f 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -138,6 +138,10 @@ func (e *ErrConn) Error() string { return fmt.Sprintf("[%s](%d) %s", e.Addr, e.Ver, e.Err.Error()) } +func (e *ErrConn) Cause() error { + return e.Err +} + func (e *ErrConn) Unwrap() error { return e.Err } diff --git a/internal/locate/replica_selector_test.go b/internal/locate/replica_selector_test.go index 6820152bb..ebce8dd87 100644 --- a/internal/locate/replica_selector_test.go +++ b/internal/locate/replica_selector_test.go @@ -23,6 +23,7 @@ import ( "github.com/tikv/client-go/v2/config/retry" "github.com/tikv/client-go/v2/internal/apicodec" "github.com/tikv/client-go/v2/internal/client" + "github.com/tikv/client-go/v2/internal/client/mockserver" "github.com/tikv/client-go/v2/internal/logutil" "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/kv" @@ -3215,6 +3216,71 @@ func (s *testReplicaSelectorSuite) getRegion() *Region { return nil } +func TestTiKVClientReadTimeout(t *testing.T) { + if israce.RaceEnabled { + t.Skip("the test run with race will failed, so skip it") + } + config.UpdateGlobal(func(conf *config.Config) { + // enable batch client. + conf.TiKVClient.MaxBatchSize = 128 + })() + s := new(testReplicaSelectorSuite) + s.SetupTest(t) + defer s.TearDownTest() + + server, port := mockserver.StartMockTikvService() + s.True(port > 0) + server.SetMetaChecker(func(ctx context.Context) error { + time.Sleep(time.Millisecond * 10) + return nil + }) + rpcClient := client.NewRPCClient() + defer func() { + rpcClient.Close() + server.Stop() + }() + + accessPath := []string{} + fnClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + accessPath = append(accessPath, addr) + accessPath = append(accessPath, fmt.Sprintf("{addr: %v, replica-read: %v, stale-read: %v, timeout: %v}", addr, req.ReplicaRead, req.StaleRead, req.MaxExecutionDurationMs)) + return rpcClient.SendRequest(ctx, server.Addr(), req, timeout) + }} + rc := s.getRegion() + s.NotNil(rc) + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key"), Version: 1}) + req.ReplicaReadType = kv.ReplicaReadLeader + req.MaxExecutionDurationMs = 1 + bo := retry.NewBackofferWithVars(context.Background(), 2000, nil) + sender := NewRegionRequestSender(s.cache, fnClient) + resp, _, err := sender.SendReq(bo, req, rc.VerID(), time.Millisecond) + s.Nil(err) + s.NotNil(resp) + regionErr, _ := resp.GetRegionError() + s.True(IsFakeRegionError(regionErr)) + s.Equal(0, bo.GetTotalBackoffTimes()) + s.Equal([]string{ + "store1", "{addr: store1, replica-read: false, stale-read: false, timeout: 1}", + "store2", "{addr: store2, replica-read: true, stale-read: false, timeout: 1}", + "store3", "{addr: store3, replica-read: true, stale-read: false, timeout: 1}", + }, accessPath) + // clear max execution duration for retry. + req.MaxExecutionDurationMs = 0 + sender = NewRegionRequestSender(s.cache, fnClient) + resp, _, err = sender.SendReq(bo, req, rc.VerID(), time.Second) // use a longer timeout. + s.Nil(err) + s.NotNil(resp) + regionErr, _ = resp.GetRegionError() + s.Nil(regionErr) + s.Equal(0, bo.GetTotalBackoffTimes()) + s.Equal([]string{ + "store1", "{addr: store1, replica-read: false, stale-read: false, timeout: 1}", + "store2", "{addr: store2, replica-read: true, stale-read: false, timeout: 1}", + "store3", "{addr: store3, replica-read: true, stale-read: false, timeout: 1}", + "store1", "{addr: store1, replica-read: true, stale-read: false, timeout: 1000}", + }, accessPath) +} + func BenchmarkReplicaSelector(b *testing.B) { mvccStore := mocktikv.MustNewMVCCStore() cluster := mocktikv.NewCluster(mvccStore)