diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index ece58c3fc3..d29606f90a 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -567,7 +567,16 @@ func (c *batchCommandsClient) isStopped() bool { } func (c *batchCommandsClient) available() int64 { - return c.maxConcurrencyRequestLimit.Load() - c.sent.Load() + limit := c.maxConcurrencyRequestLimit.Load() + sent := c.sent.Load() + // The `sent` could be less than 0, see https://github.com/tikv/client-go/issues/1225 for details. + if sent > 0 { + if limit > sent { + return limit - sent + } + return 0 + } + return limit } func (c *batchCommandsClient) send(forwardedHost string, req *tikvpb.BatchCommandsRequest) { diff --git a/internal/client/client_test.go b/internal/client/client_test.go index e70a4585f5..60d282ea74 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -59,6 +59,7 @@ import ( "github.com/tikv/client-go/v2/internal/client/mockserver" "github.com/tikv/client-go/v2/internal/logutil" "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/util/israce" "go.uber.org/zap" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/metadata" @@ -886,6 +887,88 @@ func TestBatchClientReceiveHealthFeedback(t *testing.T) { } } +func TestRandomRestartStoreAndForwarding(t *testing.T) { + if israce.RaceEnabled { + t.Skip("skip since race bug in issue #1222") + } + store1, port1 := mockserver.StartMockTikvService() + require.True(t, port1 > 0) + require.True(t, store1.IsRunning()) + client1 := NewRPCClient() + store2, port2 := mockserver.StartMockTikvService() + require.True(t, port2 > 0) + require.True(t, store2.IsRunning()) + defer func() { + store1.Stop() + store2.Stop() + err := client1.Close() + require.NoError(t, err) + }() + + wg := sync.WaitGroup{} + done := int64(0) + concurrency := 500 + wg.Add(1) + go func() { + defer wg.Done() + for { + // intermittent stop and start store1 or store2. + var store *mockserver.MockServer + if rand.Intn(10) < 9 { + store = store1 + } else { + store = store2 + } + time.Sleep(time.Millisecond * time.Duration(rand.Intn(200))) + addr := store.Addr() + store.Stop() + require.False(t, store.IsRunning()) + time.Sleep(time.Millisecond * time.Duration(rand.Intn(200))) + store.Start(addr) + if atomic.LoadInt64(&done) >= int64(concurrency) { + return + } + } + }() + + conn, err := client1.getConnArray(store1.Addr(), true) + assert.Nil(t, err) + for j := 0; j < concurrency; j++ { + wg.Add(1) + go func() { + defer func() { + atomic.AddInt64(&done, 1) + wg.Done() + }() + for i := 0; i < 5000; i++ { + req := &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Coprocessor{Coprocessor: &coprocessor.Request{}}} + forwardedHost := "" + if i%2 != 0 { + forwardedHost = store2.Addr() + } + _, err := sendBatchRequest(context.Background(), store1.Addr(), forwardedHost, conn.batchConn, req, time.Millisecond*50, 0) + if err == nil || + err.Error() == "EOF" || + err.Error() == "rpc error: code = Unavailable desc = error reading from server: EOF" || + strings.Contains(err.Error(), "context deadline exceeded") || + strings.Contains(err.Error(), "connect: connection refused") || + strings.Contains(err.Error(), "rpc error: code = Unavailable desc = error reading from server") { + continue + } + require.Fail(t, err.Error(), "unexpected error") + } + }() + } + wg.Wait() + + for _, cli := range conn.batchConn.batchCommandsClients { + require.Equal(t, int64(9223372036854775807), cli.maxConcurrencyRequestLimit.Load()) + require.True(t, cli.available() > 0, fmt.Sprintf("sent: %d", cli.sent.Load())) + // TODO(crazycs520): fix me, see https://github.com/tikv/client-go/pull/1219 + //require.True(t, cli.sent.Load() >= 0, fmt.Sprintf("sent: %d", cli.sent.Load())) + } +} + func TestErrConn(t *testing.T) { e := errors.New("conn error") err1 := &ErrConn{Err: e, Addr: "127.0.0.1", Ver: 10} diff --git a/internal/client/main_test.go b/internal/client/main_test.go index 2e031f98e9..cafc6ec320 100644 --- a/internal/client/main_test.go +++ b/internal/client/main_test.go @@ -27,6 +27,7 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("google.golang.org/grpc.(*ClientConn).WaitForStateChange"), goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/config/retry.newBackoffFn.func1"), goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/config/retry.(*Config).createBackoffFn.newBackoffFn.func2"), + goleak.IgnoreTopFunction("sync.runtime_notifyListWait"), } goleak.VerifyTestMain(m, opts...) }