Skip to content

Commit

Permalink
Merge branch 'master' into p-dml/rollback
Browse files Browse the repository at this point in the history
  • Loading branch information
you06 authored Apr 4, 2024
2 parents efc590c + 5a4905d commit 907fd9b
Show file tree
Hide file tree
Showing 22 changed files with 196 additions and 69 deletions.
2 changes: 1 addition & 1 deletion examples/gcworker/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/prometheus/procfs v0.12.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20240229062430-b207e514ece1 // indirect
github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
Expand Down
2 changes: 1 addition & 1 deletion examples/rawkv/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/prometheus/procfs v0.12.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20240229062430-b207e514ece1 // indirect
github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/1pc_txn/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/prometheus/procfs v0.12.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20240229062430-b207e514ece1 // indirect
github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/async_commit/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/prometheus/procfs v0.12.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20240229062430-b207e514ece1 // indirect
github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/delete_range/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/prometheus/procfs v0.12.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20240229062430-b207e514ece1 // indirect
github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/prometheus/procfs v0.12.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20240229062430-b207e514ece1 // indirect
github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/pessimistic_txn/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/prometheus/procfs v0.12.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20240229062430-b207e514ece1 // indirect
github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/unsafedestoryrange/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/prometheus/procfs v0.12.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20240229062430-b207e514ece1 // indirect
github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
Expand Down
22 changes: 6 additions & 16 deletions integration_tests/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,25 +321,15 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats() {
}
snapshot.MergeExecDetail(detail)
expect = "Get:{num_rpc:4, total_time:2s},txnLockFast_backoff:{num:2, total_time:10ms}, " +
"total_process_time: 100ms, total_wait_time: 100ms, " +
"scan_detail: {total_process_keys: 10, " +
"total_process_keys_size: 10, " +
"total_keys: 15, " +
"get_snapshot_time: 500ns, " +
"rocksdb: {delete_skipped_count: 5, " +
"key_skipped_count: 1, " +
"block: {cache_hit_count: 10, read_count: 20, read_byte: 15 Bytes}}}"
"time_detail: {total_process_time: 100ms, total_wait_time: 100ms}, " +
"scan_detail: {total_process_keys: 10, total_process_keys_size: 10, total_keys: 15, get_snapshot_time: 500ns, " +
"rocksdb: {delete_skipped_count: 5, key_skipped_count: 1, block: {cache_hit_count: 10, read_count: 20, read_byte: 15 Bytes}}}"
s.Equal(expect, snapshot.FormatStats())
snapshot.MergeExecDetail(detail)
expect = "Get:{num_rpc:4, total_time:2s},txnLockFast_backoff:{num:2, total_time:10ms}, " +
"total_process_time: 200ms, total_wait_time: 200ms, " +
"scan_detail: {total_process_keys: 20, " +
"total_process_keys_size: 20, " +
"total_keys: 30, " +
"get_snapshot_time: 1µs, " +
"rocksdb: {delete_skipped_count: 10, " +
"key_skipped_count: 2, " +
"block: {cache_hit_count: 20, read_count: 40, read_byte: 30 Bytes}}}"
"time_detail: {total_process_time: 200ms, total_wait_time: 200ms}, " +
"scan_detail: {total_process_keys: 20, total_process_keys_size: 20, total_keys: 30, get_snapshot_time: 1µs, " +
"rocksdb: {delete_skipped_count: 10, key_skipped_count: 2, block: {cache_hit_count: 20, read_count: 40, read_byte: 30 Bytes}}}"
s.Equal(expect, snapshot.FormatStats())
}

Expand Down
41 changes: 34 additions & 7 deletions internal/client/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ func (c *batchCommandsClient) send(forwardedHost string, req *tikvpb.BatchComman
zap.String("forwardedHost", forwardedHost),
zap.Error(err),
)
c.failPendingRequests(err)
c.failRequestsByIDs(err, req.RequestIds) // fast fail requests.
return
}

Expand All @@ -604,23 +604,50 @@ func (c *batchCommandsClient) send(forwardedHost string, req *tikvpb.BatchComman
zap.Uint64s("requestIDs", req.RequestIds),
zap.Error(err),
)
c.failPendingRequests(err)
c.failRequestsByIDs(err, req.RequestIds) // fast fail requests.
}
}

// `failPendingRequests` must be called in locked contexts in order to avoid double closing channels.
func (c *batchCommandsClient) failPendingRequests(err error) {
// when enable-forwarding is true, the `forwardedHost` maybe not empty.
// failPendingRequests fails all pending requests which req.forwardedHost equals to forwardedHost parameter.
// Why need check `forwardedHost`? Here is an example, when enable-forwarding is true, and this client has network issue with store1:
// - some requests are sent to store1 with forwarding, such as forwardedHost is store2, those requests will succeed.
// - some requests are sent to store1 without forwarding, and may fail then `failPendingRequests` would be called,
// if we don't check `forwardedHost` and fail all pending requests, the requests with forwarding will be failed too. this may cause some issue:
// 1. data race. see https://github.com/tikv/client-go/issues/1222 and TestRandomRestartStoreAndForwarding.
// 2. panic which cause by `send on closed channel`, since failPendingRequests will close the entry.res channel,
// but in another batchRecvLoop goroutine, it may receive the response from forwardedHost store2 and try to send the response to entry.res channel,
// then panic by send on closed channel.
func (c *batchCommandsClient) failPendingRequests(err error, forwardedHost string) {
util.EvalFailpoint("panicInFailPendingRequests")
c.batched.Range(func(key, value interface{}) bool {
id, _ := key.(uint64)
entry, _ := value.(*batchCommandsEntry)
c.batched.Delete(id)
c.sent.Add(-1)
entry.error(err)
if entry.forwardedHost == forwardedHost {
c.failRequest(err, id, entry)
}
return true
})
}

// failRequestsByIDs fails requests by requestID.
func (c *batchCommandsClient) failRequestsByIDs(err error, requestIDs []uint64) {
for _, requestID := range requestIDs {
value, ok := c.batched.Load(requestID)
if !ok {
continue
}
c.failRequest(err, requestID, value.(*batchCommandsEntry))
}
}

func (c *batchCommandsClient) failRequest(err error, requestID uint64, entry *batchCommandsEntry) {
c.batched.Delete(requestID)
c.sent.Add(-1)
entry.error(err)
}

func (c *batchCommandsClient) waitConnReady() (err error) {
state := c.conn.GetState()
if state == connectivity.Ready {
Expand Down Expand Up @@ -793,7 +820,7 @@ func (c *batchCommandsClient) recreateStreamingClient(err error, streamClient *b
}
*epoch++

c.failPendingRequests(err) // fail all pending requests.
c.failPendingRequests(err, streamClient.forwardedHost) // fail all pending requests.
b := retry.NewBackofferWithVars(context.Background(), math.MaxInt32, nil)
for { // try to re-create the streaming in the loop.
if c.isStopped() {
Expand Down
29 changes: 20 additions & 9 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ import (
"github.com/tikv/client-go/v2/internal/client/mockserver"
"github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util/israce"
"go.uber.org/zap"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -888,9 +887,6 @@ func TestBatchClientReceiveHealthFeedback(t *testing.T) {
}

func TestRandomRestartStoreAndForwarding(t *testing.T) {
if israce.RaceEnabled {
t.Skip("skip since race bug in issue #1222")
}
store1, port1 := mockserver.StartMockTikvService()
require.True(t, port1 > 0)
require.True(t, store1.IsRunning())
Expand All @@ -908,6 +904,8 @@ func TestRandomRestartStoreAndForwarding(t *testing.T) {
wg := sync.WaitGroup{}
done := int64(0)
concurrency := 500
addr1 := store1.Addr()
addr2 := store2.Addr()
wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -931,7 +929,7 @@ func TestRandomRestartStoreAndForwarding(t *testing.T) {
}
}()

conn, err := client1.getConnArray(store1.Addr(), true)
conn, err := client1.getConnArray(addr1, true)
assert.Nil(t, err)
for j := 0; j < concurrency; j++ {
wg.Add(1)
Expand All @@ -944,9 +942,9 @@ func TestRandomRestartStoreAndForwarding(t *testing.T) {
req := &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Coprocessor{Coprocessor: &coprocessor.Request{}}}
forwardedHost := ""
if i%2 != 0 {
forwardedHost = store2.Addr()
forwardedHost = addr2
}
_, err := sendBatchRequest(context.Background(), store1.Addr(), forwardedHost, conn.batchConn, req, time.Millisecond*50, 0)
_, err := sendBatchRequest(context.Background(), addr1, forwardedHost, conn.batchConn, req, time.Millisecond*50, 0)
if err == nil ||
err.Error() == "EOF" ||
err.Error() == "rpc error: code = Unavailable desc = error reading from server: EOF" ||
Expand All @@ -964,11 +962,24 @@ func TestRandomRestartStoreAndForwarding(t *testing.T) {
for _, cli := range conn.batchConn.batchCommandsClients {
require.Equal(t, int64(9223372036854775807), cli.maxConcurrencyRequestLimit.Load())
require.True(t, cli.available() > 0, fmt.Sprintf("sent: %d", cli.sent.Load()))
// TODO(crazycs520): fix me, see https://github.com/tikv/client-go/pull/1219
//require.True(t, cli.sent.Load() >= 0, fmt.Sprintf("sent: %d", cli.sent.Load()))
require.True(t, cli.sent.Load() >= 0, fmt.Sprintf("sent: %d", cli.sent.Load()))
}
}

func TestFastFailRequest(t *testing.T) {
client := NewRPCClient()
defer func() {
err := client.Close()
require.NoError(t, err)
}()
start := time.Now()
unknownAddr := "127.0.0.1:52027"
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")})
_, err := client.sendRequest(context.Background(), unknownAddr, req, time.Second*20)
require.Equal(t, "context deadline exceeded", errors.Cause(err).Error())
require.True(t, time.Since(start) < time.Second*6) // fast fail when dial target failed.
}

func TestErrConn(t *testing.T) {
e := errors.New("conn error")
err1 := &ErrConn{Err: e, Addr: "127.0.0.1", Ver: 10}
Expand Down
5 changes: 4 additions & 1 deletion internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ type RegionRequestSender struct {
}

func (s *RegionRequestSender) String() string {
return fmt.Sprintf("{rpcError:%v,replicaSelector: %v}", s.rpcError, s.replicaSelector.String())
if s.replicaSelector == nil {
return fmt.Sprintf("{rpcError:%v, replicaSelector: %v}", s.rpcError, s.replicaSelector)
}
return fmt.Sprintf("{rpcError:%v, replicaSelector: %v}", s.rpcError, s.replicaSelector.String())
}

// RegionRequestRuntimeStats records the runtime stats of send region requests.
Expand Down
10 changes: 10 additions & 0 deletions internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,3 +818,13 @@ func (s *testRegionRequestToSingleStoreSuite) TestClientExt() {
s.NotNil(sender.client)
s.Nil(sender.getClientExt())
}

func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestSenderString() {
sender := NewRegionRequestSender(s.cache, &fnClient{})
loc, err := s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
// invalid region cache before sending request.
s.cache.InvalidateCachedRegion(loc.Region)
sender.SendReqCtx(s.bo, tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}), loc.Region, time.Second, tikvrpc.TiKV)
s.Equal("{rpcError:cached region invalid, replicaSelector: <nil>}", sender.String())
}
3 changes: 2 additions & 1 deletion oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ type Oracle interface {
GetExternalTimestamp(ctx context.Context) (uint64, error)
SetExternalTimestamp(ctx context.Context, ts uint64) error

GetMinTimestamp(ctx context.Context) (uint64, error)
// GetAllTSOKeyspaceGroupMinTS gets a minimum timestamp from all TSO keyspace groups.
GetAllTSOKeyspaceGroupMinTS(ctx context.Context) (uint64, error)
}

// Future is a future which promises to return a timestamp.
Expand Down
2 changes: 1 addition & 1 deletion oracle/oracles/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (l *localOracle) GetTimestamp(ctx context.Context, _ *oracle.Option) (uint6
return ts, nil
}

func (l *localOracle) GetMinTimestamp(ctx context.Context) (uint64, error) {
func (l *localOracle) GetAllTSOKeyspaceGroupMinTS(ctx context.Context) (uint64, error) {
l.Lock()
defer l.Unlock()
now := time.Now()
Expand Down
4 changes: 2 additions & 2 deletions oracle/oracles/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ func (o *MockOracle) GetTimestamp(ctx context.Context, _ *oracle.Option) (uint64
return ts, nil
}

// GetMinTimestamp implements oracle.Oracle interface.
func (o *MockOracle) GetMinTimestamp(ctx context.Context) (uint64, error) {
// GetAllTSOKeyspaceGroupMinTS implements oracle.Oracle interface.
func (o *MockOracle) GetAllTSOKeyspaceGroupMinTS(ctx context.Context) (uint64, error) {
o.RLock()
defer o.RUnlock()

Expand Down
8 changes: 4 additions & 4 deletions oracle/oracles/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ func (o *pdOracle) GetTimestamp(ctx context.Context, opt *oracle.Option) (uint64
return ts, nil
}

// GetMinTimestamp gets a minimum timestamp for all keyspace groups.
func (o *pdOracle) GetMinTimestamp(ctx context.Context) (uint64, error) {
return o.getMinTimestamp(ctx)
// GetAllTSOKeyspaceGroupMinTS gets a minimum timestamp from all TSO keyspace groups.
func (o *pdOracle) GetAllTSOKeyspaceGroupMinTS(ctx context.Context) (uint64, error) {
return o.getMinTimestampInAllTSOGroup(ctx)
}

type tsFuture struct {
Expand Down Expand Up @@ -171,7 +171,7 @@ func (o *pdOracle) getTimestamp(ctx context.Context, txnScope string) (uint64, e
return oracle.ComposeTS(physical, logical), nil
}

func (o *pdOracle) getMinTimestamp(ctx context.Context) (uint64, error) {
func (o *pdOracle) getMinTimestampInAllTSOGroup(ctx context.Context) (uint64, error) {
now := time.Now()

physical, logical, err := o.c.GetMinTS(ctx)
Expand Down
12 changes: 6 additions & 6 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,10 +425,10 @@ func (s *KVStore) CurrentTimestamp(txnScope string) (uint64, error) {
return startTS, nil
}

// CurrentMinTimestamp returns current timestamp across all keyspace groups.
func (s *KVStore) CurrentMinTimestamp() (uint64, error) {
// CurrentAllTSOKeyspaceGroupMinTs returns a minimum timestamp from all TSO keyspace groups.
func (s *KVStore) CurrentAllTSOKeyspaceGroupMinTs() (uint64, error) {
bo := retry.NewBackofferWithVars(context.Background(), transaction.TsoMaxBackoff, nil)
startTS, err := s.getMinTimestampWithRetry(bo)
startTS, err := s.getAllTSOKeyspaceGroupMinTSWithRetry(bo)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -469,15 +469,15 @@ func (s *KVStore) getTimestampWithRetry(bo *Backoffer, txnScope string) (uint64,
}
}

func (s *KVStore) getMinTimestampWithRetry(bo *Backoffer) (uint64, error) {
func (s *KVStore) getAllTSOKeyspaceGroupMinTSWithRetry(bo *Backoffer) (uint64, error) {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("TiKVStore.getMinTimestampWithRetry", opentracing.ChildOf(span.Context()))
span1 := span.Tracer().StartSpan("TiKVStore.getAllTSOKeyspaceGroupMinTSWithRetry", opentracing.ChildOf(span.Context()))
defer span1.Finish()
bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1))
}

for {
minTS, err := s.oracle.GetMinTimestamp(bo.GetCtx())
minTS, err := s.oracle.GetAllTSOKeyspaceGroupMinTS(bo.GetCtx())
if err == nil {
return minTS, nil
}
Expand Down
Loading

0 comments on commit 907fd9b

Please sign in to comment.