Skip to content

Commit

Permalink
fix issue read req timeout bug cause by pr#1223 (#1232)
Browse files Browse the repository at this point in the history
* fix issue read req timeout bug cause by pr#1223

Signed-off-by: crazycs520 <[email protected]>

* add test

Signed-off-by: crazycs520 <[email protected]>

* fix test

Signed-off-by: crazycs520 <[email protected]>

* implement Cause

Signed-off-by: crazycs520 <[email protected]>

* fix race test

Signed-off-by: crazycs520 <[email protected]>

---------

Signed-off-by: crazycs520 <[email protected]>
Co-authored-by: you06 <[email protected]>
  • Loading branch information
crazycs520 and you06 authored Mar 16, 2024
1 parent 87a984a commit 98a7df8
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 0 deletions.
4 changes: 4 additions & 0 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ func (e *ErrConn) Error() string {
return fmt.Sprintf("[%s](%d) %s", e.Addr, e.Ver, e.Err.Error())
}

func (e *ErrConn) Cause() error {
return e.Err
}

func (e *ErrConn) Unwrap() error {
return e.Err
}
Expand Down
66 changes: 66 additions & 0 deletions internal/locate/replica_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/tikv/client-go/v2/config/retry"
"github.com/tikv/client-go/v2/internal/apicodec"
"github.com/tikv/client-go/v2/internal/client"
"github.com/tikv/client-go/v2/internal/client/mockserver"
"github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/kv"
Expand Down Expand Up @@ -3215,6 +3216,71 @@ func (s *testReplicaSelectorSuite) getRegion() *Region {
return nil
}

func TestTiKVClientReadTimeout(t *testing.T) {
if israce.RaceEnabled {
t.Skip("the test run with race will failed, so skip it")
}
config.UpdateGlobal(func(conf *config.Config) {
// enable batch client.
conf.TiKVClient.MaxBatchSize = 128
})()
s := new(testReplicaSelectorSuite)
s.SetupTest(t)
defer s.TearDownTest()

server, port := mockserver.StartMockTikvService()
s.True(port > 0)
server.SetMetaChecker(func(ctx context.Context) error {
time.Sleep(time.Millisecond * 10)
return nil
})
rpcClient := client.NewRPCClient()
defer func() {
rpcClient.Close()
server.Stop()
}()

accessPath := []string{}
fnClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
accessPath = append(accessPath, addr)
accessPath = append(accessPath, fmt.Sprintf("{addr: %v, replica-read: %v, stale-read: %v, timeout: %v}", addr, req.ReplicaRead, req.StaleRead, req.MaxExecutionDurationMs))
return rpcClient.SendRequest(ctx, server.Addr(), req, timeout)
}}
rc := s.getRegion()
s.NotNil(rc)
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key"), Version: 1})
req.ReplicaReadType = kv.ReplicaReadLeader
req.MaxExecutionDurationMs = 1
bo := retry.NewBackofferWithVars(context.Background(), 2000, nil)
sender := NewRegionRequestSender(s.cache, fnClient)
resp, _, err := sender.SendReq(bo, req, rc.VerID(), time.Millisecond)
s.Nil(err)
s.NotNil(resp)
regionErr, _ := resp.GetRegionError()
s.True(IsFakeRegionError(regionErr))
s.Equal(0, bo.GetTotalBackoffTimes())
s.Equal([]string{
"store1", "{addr: store1, replica-read: false, stale-read: false, timeout: 1}",
"store2", "{addr: store2, replica-read: true, stale-read: false, timeout: 1}",
"store3", "{addr: store3, replica-read: true, stale-read: false, timeout: 1}",
}, accessPath)
// clear max execution duration for retry.
req.MaxExecutionDurationMs = 0
sender = NewRegionRequestSender(s.cache, fnClient)
resp, _, err = sender.SendReq(bo, req, rc.VerID(), time.Second) // use a longer timeout.
s.Nil(err)
s.NotNil(resp)
regionErr, _ = resp.GetRegionError()
s.Nil(regionErr)
s.Equal(0, bo.GetTotalBackoffTimes())
s.Equal([]string{
"store1", "{addr: store1, replica-read: false, stale-read: false, timeout: 1}",
"store2", "{addr: store2, replica-read: true, stale-read: false, timeout: 1}",
"store3", "{addr: store3, replica-read: true, stale-read: false, timeout: 1}",
"store1", "{addr: store1, replica-read: true, stale-read: false, timeout: 1000}",
}, accessPath)
}

func BenchmarkReplicaSelector(b *testing.B) {
mvccStore := mocktikv.MustNewMVCCStore()
cluster := mocktikv.NewCluster(mvccStore)
Expand Down

0 comments on commit 98a7df8

Please sign in to comment.