Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix issue that no available connections cause by concurrency request limit bug #1226

Merged
merged 9 commits into from
Mar 15, 2024
11 changes: 10 additions & 1 deletion internal/client/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,16 @@ func (c *batchCommandsClient) isStopped() bool {
}

func (c *batchCommandsClient) available() int64 {
return c.maxConcurrencyRequestLimit.Load() - c.sent.Load()
limit := c.maxConcurrencyRequestLimit.Load()
sent := c.sent.Load()
// The `sent` could be less than 0, see https://github.com/tikv/client-go/issues/1225 for details.
if sent > 0 {
if limit > sent {
return limit - sent
}
return 0
}
return limit
}

func (c *batchCommandsClient) send(forwardedHost string, req *tikvpb.BatchCommandsRequest) {
Expand Down
83 changes: 83 additions & 0 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/tikv/client-go/v2/internal/client/mockserver"
"github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util/israce"
"go.uber.org/zap"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -886,6 +887,88 @@ func TestBatchClientReceiveHealthFeedback(t *testing.T) {
}
}

func TestRandomRestartStoreAndForwarding(t *testing.T) {
if israce.RaceEnabled {
t.Skip("skip since race bug in issue #1222")
}
store1, port1 := mockserver.StartMockTikvService()
require.True(t, port1 > 0)
require.True(t, store1.IsRunning())
client1 := NewRPCClient()
store2, port2 := mockserver.StartMockTikvService()
require.True(t, port2 > 0)
require.True(t, store2.IsRunning())
defer func() {
store1.Stop()
store2.Stop()
err := client1.Close()
require.NoError(t, err)
}()

wg := sync.WaitGroup{}
done := int64(0)
concurrency := 500
wg.Add(1)
go func() {
defer wg.Done()
for {
// intermittent stop and start store1 or store2.
var store *mockserver.MockServer
if rand.Intn(10) < 9 {
store = store1
} else {
store = store2
}
time.Sleep(time.Millisecond * time.Duration(rand.Intn(200)))
addr := store.Addr()
store.Stop()
require.False(t, store.IsRunning())
time.Sleep(time.Millisecond * time.Duration(rand.Intn(200)))
store.Start(addr)
if atomic.LoadInt64(&done) >= int64(concurrency) {
return
}
}
}()

conn, err := client1.getConnArray(store1.Addr(), true)
assert.Nil(t, err)
for j := 0; j < concurrency; j++ {
wg.Add(1)
go func() {
defer func() {
atomic.AddInt64(&done, 1)
wg.Done()
}()
for i := 0; i < 5000; i++ {
req := &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Coprocessor{Coprocessor: &coprocessor.Request{}}}
forwardedHost := ""
if i%2 != 0 {
forwardedHost = store2.Addr()
}
_, err := sendBatchRequest(context.Background(), store1.Addr(), forwardedHost, conn.batchConn, req, time.Millisecond*50, 0)
if err == nil ||
err.Error() == "EOF" ||
err.Error() == "rpc error: code = Unavailable desc = error reading from server: EOF" ||
strings.Contains(err.Error(), "context deadline exceeded") ||
strings.Contains(err.Error(), "connect: connection refused") ||
strings.Contains(err.Error(), "rpc error: code = Unavailable desc = error reading from server") {
continue
}
require.Fail(t, err.Error(), "unexpected error")
}
}()
}
wg.Wait()

for _, cli := range conn.batchConn.batchCommandsClients {
require.Equal(t, int64(9223372036854775807), cli.maxConcurrencyRequestLimit.Load())
require.True(t, cli.available() > 0, fmt.Sprintf("sent: %d", cli.sent.Load()))
// TODO(crazycs520): fix me, see https://github.com/tikv/client-go/pull/1219
//require.True(t, cli.sent.Load() >= 0, fmt.Sprintf("sent: %d", cli.sent.Load()))
}
}

func TestErrConn(t *testing.T) {
e := errors.New("conn error")
err1 := &ErrConn{Err: e, Addr: "127.0.0.1", Ver: 10}
Expand Down
1 change: 1 addition & 0 deletions internal/client/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("google.golang.org/grpc.(*ClientConn).WaitForStateChange"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/config/retry.newBackoffFn.func1"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/config/retry.(*Config).createBackoffFn.newBackoffFn.func2"),
goleak.IgnoreTopFunction("sync.runtime_notifyListWait"),
}
goleak.VerifyTestMain(m, opts...)
}
Loading