From 4f1e8eeb0e370eeafabda7d19b0b08152be1d5a2 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 14 Mar 2024 10:52:11 +0800 Subject: [PATCH 1/6] fix issue that no available connections cause by concurrency request limit bug Signed-off-by: crazycs520 --- internal/client/client_batch.go | 9 +++- internal/client/client_test.go | 83 +++++++++++++++++++++++++++++++++ internal/client/main_test.go | 1 + 3 files changed, 92 insertions(+), 1 deletion(-) diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index 3964efc1f1..42eeea78ea 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -567,7 +567,14 @@ func (c *batchCommandsClient) isStopped() bool { } func (c *batchCommandsClient) available() int64 { - return c.maxConcurrencyRequestLimit.Load() - c.sent.Load() + limit := c.maxConcurrencyRequestLimit.Load() + sent := c.sent.Load() + // just in case, `sent` may less than 0, see https://github.com/tikv/client-go/issues/1225 + // then maxInt64 - (-1) will overflow, which is unexpected. + if sent > 0 { + return limit - sent + } + return limit } func (c *batchCommandsClient) send(forwardedHost string, req *tikvpb.BatchCommandsRequest) { diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 8794a59358..9897c3c404 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -59,6 +59,7 @@ import ( "github.com/tikv/client-go/v2/internal/client/mockserver" "github.com/tikv/client-go/v2/internal/logutil" "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/util/israce" "go.uber.org/zap" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/metadata" @@ -879,3 +880,85 @@ func TestBatchClientReceiveHealthFeedback(t *testing.T) { assert.Fail(t, "health feedback not received") } } + +func TestRandomRestartStoreAndForwarding(t *testing.T) { + if israce.RaceEnabled { + t.Skip("skip since race bug in issue #1222") + } + store1, port1 := mockserver.StartMockTikvService() + require.True(t, port1 > 0) + require.True(t, store1.IsRunning()) + addr1 := store1.Addr() + client1 := NewRPCClient() + store2, port2 := mockserver.StartMockTikvService() + require.True(t, port2 > 0) + require.True(t, store2.IsRunning()) + addr2 := store2.Addr() + defer func() { + store1.Stop() + store2.Stop() + err := client1.Close() + require.NoError(t, err) + }() + + conn, err := client1.getConnArray(addr1, true) + assert.Nil(t, err) + wg := sync.WaitGroup{} + done := int64(0) + concurrency := 500 + wg.Add(1) + go func() { + defer wg.Done() + for { + // intermittent stop and start store1 or store2. + var store *mockserver.MockServer + if rand.Intn(10) < 9 { + store = store1 + } else { + store = store2 + } + time.Sleep(time.Millisecond * time.Duration(rand.Intn(200))) + store.Stop() + require.False(t, store.IsRunning()) + time.Sleep(time.Millisecond * time.Duration(rand.Intn(200))) + store.Start(addr1) + if atomic.LoadInt64(&done) >= int64(concurrency) { + return + } + } + }() + for j := 0; j < concurrency; j++ { + wg.Add(1) + go func() { + defer func() { + atomic.AddInt64(&done, 1) + wg.Done() + }() + for i := 0; i < 5000; i++ { + req := &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Coprocessor{Coprocessor: &coprocessor.Request{}}} + forwardedHost := "" + if i%2 != 0 { + forwardedHost = addr2 + } + _, err := sendBatchRequest(context.Background(), addr1, forwardedHost, conn.batchConn, req, time.Millisecond*50, 0) + if err == nil || + err.Error() == "EOF" || + err.Error() == "rpc error: code = Unavailable desc = error reading from server: EOF" || + strings.Contains(err.Error(), "context deadline exceeded") || + strings.Contains(err.Error(), "connect: connection refused") || + strings.Contains(err.Error(), "rpc error: code = Unavailable desc = error reading from server") { + continue + } + require.Fail(t, err.Error(), "unexpected error") + } + }() + } + wg.Wait() + + for _, cli := range conn.batchConn.batchCommandsClients { + require.Equal(t, int64(9223372036854775807), cli.maxConcurrencyRequestLimit.Load()) + require.True(t, cli.available() > 0, fmt.Sprintf("sent: %d", cli.sent.Load())) + // TODO(crazycs520): fix me, see https://github.com/tikv/client-go/pull/1219 + //require.True(t, cli.sent.Load() >= 0, fmt.Sprintf("sent: %d", cli.sent.Load())) + } +} diff --git a/internal/client/main_test.go b/internal/client/main_test.go index 2e031f98e9..cafc6ec320 100644 --- a/internal/client/main_test.go +++ b/internal/client/main_test.go @@ -27,6 +27,7 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("google.golang.org/grpc.(*ClientConn).WaitForStateChange"), goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/config/retry.newBackoffFn.func1"), goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/config/retry.(*Config).createBackoffFn.newBackoffFn.func2"), + goleak.IgnoreTopFunction("sync.runtime_notifyListWait"), } goleak.VerifyTestMain(m, opts...) } From 0b798dd92f9119d477ba2646361a4d3c8193b4ad Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 14 Mar 2024 13:28:01 +0800 Subject: [PATCH 2/6] fix test Signed-off-by: crazycs520 --- internal/client/client_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 9897c3c404..c456cf693d 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -918,10 +918,11 @@ func TestRandomRestartStoreAndForwarding(t *testing.T) { store = store2 } time.Sleep(time.Millisecond * time.Duration(rand.Intn(200))) + addr := store.Addr() store.Stop() require.False(t, store.IsRunning()) time.Sleep(time.Millisecond * time.Duration(rand.Intn(200))) - store.Start(addr1) + store.Start(addr) if atomic.LoadInt64(&done) >= int64(concurrency) { return } From acc54ff030ceec912f69f50aa6ec183707955a1b Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 15 Mar 2024 10:14:09 +0800 Subject: [PATCH 3/6] refine comment Signed-off-by: crazycs520 --- internal/client/client_batch.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index 42eeea78ea..0d0ba857ae 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -569,8 +569,7 @@ func (c *batchCommandsClient) isStopped() bool { func (c *batchCommandsClient) available() int64 { limit := c.maxConcurrencyRequestLimit.Load() sent := c.sent.Load() - // just in case, `sent` may less than 0, see https://github.com/tikv/client-go/issues/1225 - // then maxInt64 - (-1) will overflow, which is unexpected. + // The `sent` could be less than 0, see https://github.com/tikv/client-go/issues/1225 for details. if sent > 0 { return limit - sent } From a88a6102924f08e94ac52ddcc7c2b3e52168f1ad Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 15 Mar 2024 10:32:29 +0800 Subject: [PATCH 4/6] refine code Signed-off-by: crazycs520 --- internal/client/client_batch.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index eebead2293..d29606f90a 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -571,7 +571,10 @@ func (c *batchCommandsClient) available() int64 { sent := c.sent.Load() // The `sent` could be less than 0, see https://github.com/tikv/client-go/issues/1225 for details. if sent > 0 { - return limit - sent + if limit > sent { + return limit - sent + } + return 0 } return limit } From 6e0430c0c8e7ce0a25613e6ca890dcb8a9733e21 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 15 Mar 2024 10:38:52 +0800 Subject: [PATCH 5/6] refine test Signed-off-by: crazycs520 --- internal/client/client_test.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 3777b79dbb..68a2c4a019 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -894,12 +894,10 @@ func TestRandomRestartStoreAndForwarding(t *testing.T) { store1, port1 := mockserver.StartMockTikvService() require.True(t, port1 > 0) require.True(t, store1.IsRunning()) - addr1 := store1.Addr() client1 := NewRPCClient() store2, port2 := mockserver.StartMockTikvService() require.True(t, port2 > 0) require.True(t, store2.IsRunning()) - addr2 := store2.Addr() defer func() { store1.Stop() store2.Stop() @@ -907,8 +905,6 @@ func TestRandomRestartStoreAndForwarding(t *testing.T) { require.NoError(t, err) }() - conn, err := client1.getConnArray(addr1, true) - assert.Nil(t, err) wg := sync.WaitGroup{} done := int64(0) concurrency := 500 @@ -934,6 +930,9 @@ func TestRandomRestartStoreAndForwarding(t *testing.T) { } } }() + + conn, err := client1.getConnArray(store1.Addr(), true) + assert.Nil(t, err) for j := 0; j < concurrency; j++ { wg.Add(1) go func() { @@ -945,9 +944,9 @@ func TestRandomRestartStoreAndForwarding(t *testing.T) { req := &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Coprocessor{Coprocessor: &coprocessor.Request{}}} forwardedHost := "" if i%2 != 0 { - forwardedHost = addr2 + forwardedHost = store2.Addr() } - _, err := sendBatchRequest(context.Background(), addr1, forwardedHost, conn.batchConn, req, time.Millisecond*50, 0) + _, err = sendBatchRequest(context.Background(), store1.Addr(), forwardedHost, conn.batchConn, req, time.Millisecond*50, 0) if err == nil || err.Error() == "EOF" || err.Error() == "rpc error: code = Unavailable desc = error reading from server: EOF" || From 78647461bbcc0106e22b051a60236ca11bd49aaf Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 15 Mar 2024 10:39:50 +0800 Subject: [PATCH 6/6] fix test Signed-off-by: crazycs520 --- internal/client/client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 68a2c4a019..60d282ea74 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -946,7 +946,7 @@ func TestRandomRestartStoreAndForwarding(t *testing.T) { if i%2 != 0 { forwardedHost = store2.Addr() } - _, err = sendBatchRequest(context.Background(), store1.Addr(), forwardedHost, conn.batchConn, req, time.Millisecond*50, 0) + _, err := sendBatchRequest(context.Background(), store1.Addr(), forwardedHost, conn.batchConn, req, time.Millisecond*50, 0) if err == nil || err.Error() == "EOF" || err.Error() == "rpc error: code = Unavailable desc = error reading from server: EOF" ||