From 6fda9debc2e1a399013a50265c3b67301d580807 Mon Sep 17 00:00:00 2001 From: ystaticy Date: Fri, 29 Mar 2024 15:29:02 +0800 Subject: [PATCH 1/7] Change get min ts function name (#1251) --- examples/gcworker/go.mod | 2 +- examples/rawkv/go.mod | 2 +- examples/txnkv/1pc_txn/go.mod | 2 +- examples/txnkv/async_commit/go.mod | 2 +- examples/txnkv/delete_range/go.mod | 2 +- examples/txnkv/go.mod | 2 +- examples/txnkv/pessimistic_txn/go.mod | 2 +- examples/txnkv/unsafedestoryrange/go.mod | 2 +- oracle/oracle.go | 3 ++- oracle/oracles/local.go | 2 +- oracle/oracles/mock.go | 4 ++-- oracle/oracles/pd.go | 8 ++++---- tikv/kv.go | 12 ++++++------ 13 files changed, 23 insertions(+), 22 deletions(-) diff --git a/examples/gcworker/go.mod b/examples/gcworker/go.mod index 1da7b81d4d..2159df8033 100644 --- a/examples/gcworker/go.mod +++ b/examples/gcworker/go.mod @@ -30,7 +30,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd/client v0.0.0-20240229062430-b207e514ece1 // indirect + github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 // indirect github.com/twmb/murmur3 v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect diff --git a/examples/rawkv/go.mod b/examples/rawkv/go.mod index a0ea6f67b6..45a1c4d08f 100644 --- a/examples/rawkv/go.mod +++ b/examples/rawkv/go.mod @@ -30,7 +30,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd/client v0.0.0-20240229062430-b207e514ece1 // indirect + github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 // indirect github.com/twmb/murmur3 v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect diff --git a/examples/txnkv/1pc_txn/go.mod b/examples/txnkv/1pc_txn/go.mod index 70825849b9..224ff0bb6d 100644 --- a/examples/txnkv/1pc_txn/go.mod +++ b/examples/txnkv/1pc_txn/go.mod @@ -30,7 +30,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd/client v0.0.0-20240229062430-b207e514ece1 // indirect + github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 // indirect github.com/twmb/murmur3 v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect diff --git a/examples/txnkv/async_commit/go.mod b/examples/txnkv/async_commit/go.mod index 046d68df60..1253bbde79 100644 --- a/examples/txnkv/async_commit/go.mod +++ b/examples/txnkv/async_commit/go.mod @@ -30,7 +30,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd/client v0.0.0-20240229062430-b207e514ece1 // indirect + github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 // indirect github.com/twmb/murmur3 v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect diff --git a/examples/txnkv/delete_range/go.mod b/examples/txnkv/delete_range/go.mod index 704dd51dae..504dd55adf 100644 --- a/examples/txnkv/delete_range/go.mod +++ b/examples/txnkv/delete_range/go.mod @@ -30,7 +30,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd/client v0.0.0-20240229062430-b207e514ece1 // indirect + github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 // indirect github.com/twmb/murmur3 v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect diff --git a/examples/txnkv/go.mod b/examples/txnkv/go.mod index bb1508e216..59cb804d77 100644 --- a/examples/txnkv/go.mod +++ b/examples/txnkv/go.mod @@ -30,7 +30,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd/client v0.0.0-20240229062430-b207e514ece1 // indirect + github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 // indirect github.com/twmb/murmur3 v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect diff --git a/examples/txnkv/pessimistic_txn/go.mod b/examples/txnkv/pessimistic_txn/go.mod index 491287ca24..4059cefa2b 100644 --- a/examples/txnkv/pessimistic_txn/go.mod +++ b/examples/txnkv/pessimistic_txn/go.mod @@ -30,7 +30,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd/client v0.0.0-20240229062430-b207e514ece1 // indirect + github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 // indirect github.com/twmb/murmur3 v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect diff --git a/examples/txnkv/unsafedestoryrange/go.mod b/examples/txnkv/unsafedestoryrange/go.mod index 39450ee27e..202531c6df 100644 --- a/examples/txnkv/unsafedestoryrange/go.mod +++ b/examples/txnkv/unsafedestoryrange/go.mod @@ -30,7 +30,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd/client v0.0.0-20240229062430-b207e514ece1 // indirect + github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 // indirect github.com/twmb/murmur3 v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect diff --git a/oracle/oracle.go b/oracle/oracle.go index a72cff7c4a..5579d3568a 100644 --- a/oracle/oracle.go +++ b/oracle/oracle.go @@ -59,7 +59,8 @@ type Oracle interface { GetExternalTimestamp(ctx context.Context) (uint64, error) SetExternalTimestamp(ctx context.Context, ts uint64) error - GetMinTimestamp(ctx context.Context) (uint64, error) + // GetAllTSOKeyspaceGroupMinTS gets a minimum timestamp from all TSO keyspace groups. + GetAllTSOKeyspaceGroupMinTS(ctx context.Context) (uint64, error) } // Future is a future which promises to return a timestamp. diff --git a/oracle/oracles/local.go b/oracle/oracles/local.go index 018047de01..f8016f468c 100644 --- a/oracle/oracles/local.go +++ b/oracle/oracles/local.go @@ -86,7 +86,7 @@ func (l *localOracle) GetTimestamp(ctx context.Context, _ *oracle.Option) (uint6 return ts, nil } -func (l *localOracle) GetMinTimestamp(ctx context.Context) (uint64, error) { +func (l *localOracle) GetAllTSOKeyspaceGroupMinTS(ctx context.Context) (uint64, error) { l.Lock() defer l.Unlock() now := time.Now() diff --git a/oracle/oracles/mock.go b/oracle/oracles/mock.go index 6e23bd0f47..633d975371 100644 --- a/oracle/oracles/mock.go +++ b/oracle/oracles/mock.go @@ -93,8 +93,8 @@ func (o *MockOracle) GetTimestamp(ctx context.Context, _ *oracle.Option) (uint64 return ts, nil } -// GetMinTimestamp implements oracle.Oracle interface. -func (o *MockOracle) GetMinTimestamp(ctx context.Context) (uint64, error) { +// GetAllTSOKeyspaceGroupMinTS implements oracle.Oracle interface. +func (o *MockOracle) GetAllTSOKeyspaceGroupMinTS(ctx context.Context) (uint64, error) { o.RLock() defer o.RUnlock() diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index 01301fd3c7..83dd41f3c3 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -115,9 +115,9 @@ func (o *pdOracle) GetTimestamp(ctx context.Context, opt *oracle.Option) (uint64 return ts, nil } -// GetMinTimestamp gets a minimum timestamp for all keyspace groups. -func (o *pdOracle) GetMinTimestamp(ctx context.Context) (uint64, error) { - return o.getMinTimestamp(ctx) +// GetAllTSOKeyspaceGroupMinTS gets a minimum timestamp from all TSO keyspace groups. +func (o *pdOracle) GetAllTSOKeyspaceGroupMinTS(ctx context.Context) (uint64, error) { + return o.getMinTimestampInAllTSOGroup(ctx) } type tsFuture struct { @@ -171,7 +171,7 @@ func (o *pdOracle) getTimestamp(ctx context.Context, txnScope string) (uint64, e return oracle.ComposeTS(physical, logical), nil } -func (o *pdOracle) getMinTimestamp(ctx context.Context) (uint64, error) { +func (o *pdOracle) getMinTimestampInAllTSOGroup(ctx context.Context) (uint64, error) { now := time.Now() physical, logical, err := o.c.GetMinTS(ctx) diff --git a/tikv/kv.go b/tikv/kv.go index 34b5fa24df..7d2dec3cfc 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -425,10 +425,10 @@ func (s *KVStore) CurrentTimestamp(txnScope string) (uint64, error) { return startTS, nil } -// CurrentMinTimestamp returns current timestamp across all keyspace groups. -func (s *KVStore) CurrentMinTimestamp() (uint64, error) { +// CurrentAllTSOKeyspaceMinTsGroup returns a minimum timestamp from all TSO keyspace groups. +func (s *KVStore) CurrentAllTSOKeyspaceMinTsGroup() (uint64, error) { bo := retry.NewBackofferWithVars(context.Background(), transaction.TsoMaxBackoff, nil) - startTS, err := s.getMinTimestampWithRetry(bo) + startTS, err := s.getAllTSOKeyspaceGroupMinTSWithRetry(bo) if err != nil { return 0, err } @@ -469,15 +469,15 @@ func (s *KVStore) getTimestampWithRetry(bo *Backoffer, txnScope string) (uint64, } } -func (s *KVStore) getMinTimestampWithRetry(bo *Backoffer) (uint64, error) { +func (s *KVStore) getAllTSOKeyspaceGroupMinTSWithRetry(bo *Backoffer) (uint64, error) { if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("TiKVStore.getMinTimestampWithRetry", opentracing.ChildOf(span.Context())) + span1 := span.Tracer().StartSpan("TiKVStore.getAllTSOKeyspaceGroupMinTSWithRetry", opentracing.ChildOf(span.Context())) defer span1.Finish() bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) } for { - minTS, err := s.oracle.GetMinTimestamp(bo.GetCtx()) + minTS, err := s.oracle.GetAllTSOKeyspaceGroupMinTS(bo.GetCtx()) if err == nil { return minTS, nil } From 69310ffc497d6cec3a31c6ae90628fd96cc8d615 Mon Sep 17 00:00:00 2001 From: ystaticy Date: Fri, 29 Mar 2024 16:05:57 +0800 Subject: [PATCH 2/7] Fix CurrentAllTSOKeyspaceGroupMinTs name (#1255) --- tikv/kv.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tikv/kv.go b/tikv/kv.go index 7d2dec3cfc..201596e8b5 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -425,8 +425,8 @@ func (s *KVStore) CurrentTimestamp(txnScope string) (uint64, error) { return startTS, nil } -// CurrentAllTSOKeyspaceMinTsGroup returns a minimum timestamp from all TSO keyspace groups. -func (s *KVStore) CurrentAllTSOKeyspaceMinTsGroup() (uint64, error) { +// CurrentAllTSOKeyspaceGroupMinTs returns a minimum timestamp from all TSO keyspace groups. +func (s *KVStore) CurrentAllTSOKeyspaceGroupMinTs() (uint64, error) { bo := retry.NewBackofferWithVars(context.Background(), transaction.TsoMaxBackoff, nil) startTS, err := s.getAllTSOKeyspaceGroupMinTSWithRetry(bo) if err != nil { From 356eb45c5dcc9d1a608d4642c68532ec32dcd36d Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 1 Apr 2024 10:16:58 +0800 Subject: [PATCH 3/7] log: more items for diagnosing pipelined dml (#1252) * log: more items for diagnosing pipelined dml Signed-off-by: ekexium * log: flush duration Signed-off-by: ekexium --------- Signed-off-by: ekexium Co-authored-by: cfzjywxk --- txnkv/transaction/pipelined_flush.go | 61 ++++++++++++++++++++++++---- txnkv/transaction/txn.go | 16 ++++++-- 2 files changed, 67 insertions(+), 10 deletions(-) diff --git a/txnkv/transaction/pipelined_flush.go b/txnkv/transaction/pipelined_flush.go index 2ef90cbc8a..288f5da9bb 100644 --- a/txnkv/transaction/pipelined_flush.go +++ b/txnkv/transaction/pipelined_flush.go @@ -17,6 +17,7 @@ package transaction import ( "bytes" "context" + "fmt" "strconv" "sync/atomic" "time" @@ -115,6 +116,12 @@ func (action actionPipelinedFlush) handleSingleBatch( c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations, ) (err error) { if len(c.primaryKey) == 0 { + logutil.Logger(bo.GetCtx()).Error( + "[pipelined dml] primary key should be set before pipelined flush", + zap.Uint64("startTS", c.startTS), + zap.Uint64("generation", action.generation), + zap.Uint64("session", c.sessionID), + ) return errors.New("[pipelined dml] primary key should be set before pipelined flush") } @@ -132,6 +139,8 @@ func (action actionPipelinedFlush) handleSingleBatch( logutil.BgLogger().Warn( "[pipelined dml] slow pipelined flush request", zap.Uint64("startTS", c.startTS), + zap.Uint64("generation", action.generation), + zap.Uint64("session", c.sessionID), zap.Stringer("region", &batch.region), zap.Int("attempts", attempts), ) @@ -223,6 +232,7 @@ func (action actionPipelinedFlush) handleSingleBatch( "[pipelined dml] encounters lock", zap.Uint64("session", c.sessionID), zap.Uint64("txnID", c.startTS), + zap.Uint64("generation", action.generation), zap.Stringer("lock", lock), ) // If an optimistic transaction encounters a lock with larger TS, this transaction will certainly @@ -265,6 +275,13 @@ func (action actionPipelinedFlush) handleSingleBatch( errors.Errorf("[pipelined dml] flush lockedKeys: %d", len(locks)), ) if err != nil { + logutil.Logger(bo.GetCtx()).Warn( + "[pipelined dml] backoff failed during flush", + zap.Error(err), + zap.Uint64("startTS", c.startTS), + zap.Uint64("generation", action.generation), + zap.Uint64("session", c.sessionID), + ) return err } } @@ -284,12 +301,17 @@ func (c *twoPhaseCommitter) pipelinedFlushMutations(bo *retry.Backoffer, mutatio func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error { logutil.BgLogger().Info("[pipelined dml] start to commit transaction", zap.Int("keys", c.txn.GetMemBuffer().Len()), - zap.String("size", units.HumanSize(float64(c.txn.GetMemBuffer().Size())))) + zap.String("size", units.HumanSize(float64(c.txn.GetMemBuffer().Size()))), + zap.Uint64("startTS", c.startTS), + zap.Uint64("session", c.sessionID), + ) commitTS, err := c.store.GetTimestampWithRetry(bo, c.txn.GetScope()) if err != nil { logutil.Logger(bo.GetCtx()).Warn("[pipelined dml] commit transaction get commitTS failed", zap.Error(err), - zap.Uint64("txnStartTS", c.startTS)) + zap.Uint64("txnStartTS", c.startTS), + zap.Uint64("session", c.sessionID), + ) return err } atomic.StoreUint64(&c.commitTS, commitTS) @@ -306,7 +328,12 @@ func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error { c.mu.RLock() c.mu.committed = true c.mu.RUnlock() - logutil.BgLogger().Info("[pipelined dml] transaction is committed") + logutil.BgLogger().Info( + "[pipelined dml] transaction is committed", + zap.Uint64("startTS", c.startTS), + zap.Uint64("commitTS", commitTS), + zap.Uint64("session", c.sessionID), + ) if _, err := util.EvalFailpoint("pipelinedSkipResolveLock"); err == nil { return nil @@ -398,16 +425,36 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end var resolved atomic.Uint64 handler, err := c.buildPipelinedResolveHandler(true, &resolved) if err != nil { - logutil.Logger(bo.GetCtx()).Error("[pipelined dml] build buildPipelinedResolveHandler error", zap.Error(err)) + logutil.Logger(bo.GetCtx()).Error( + "[pipelined dml] build buildPipelinedResolveHandler error", + zap.Error(err), + zap.Uint64("resolved regions", resolved.Load()), + zap.Uint64("startTS", c.startTS), + zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)), + zap.Uint64("session", c.sessionID), + ) return } - runner := rangetask.NewRangeTaskRunner("pipelined-dml-commit", c.store, RESOLVE_CONCURRENCY, handler) + runner := rangetask.NewRangeTaskRunner( + fmt.Sprintf("pipelined-dml-commit-%d", c.startTS), + c.store, + RESOLVE_CONCURRENCY, + handler, + ) if err = runner.RunOnRange(bo.GetCtx(), start, end); err != nil { logutil.Logger(bo.GetCtx()).Error("[pipelined dml] commit transaction secondaries failed", zap.Uint64("resolved regions", resolved.Load()), - zap.Error(err)) + zap.Uint64("startTS", c.startTS), + zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)), + zap.Uint64("session", c.sessionID), + zap.Error(err), + ) } else { logutil.BgLogger().Info("[pipelined dml] commit transaction secondaries done", - zap.Uint64("resolved regions", resolved.Load())) + zap.Uint64("resolved regions", resolved.Load()), + zap.Uint64("startTS", c.startTS), + zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)), + zap.Uint64("session", c.sessionID), + ) } } diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 890a86d42d..c190494eb7 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -451,16 +451,26 @@ func (txn *KVTxn) InitPipelinedMemDB() error { pipelinedMemDB := unionstore.NewPipelinedMemDB(func(ctx context.Context, keys [][]byte) (map[string][]byte, error) { return txn.snapshot.BatchGetWithTier(ctx, keys, txnsnapshot.BatchGetBufferTier) }, func(generation uint64, memdb *unionstore.MemDB) (err error) { + startTime := time.Now() defer func() { if err != nil { txn.committer.ttlManager.close() } flushedKeys += memdb.Len() flushedSize += memdb.Size() + logutil.BgLogger().Info( + "[pipelined dml] flushed memdb to kv store", + zap.Uint64("startTS", txn.startTS), + zap.Uint64("generation", generation), + zap.Uint64("session", txn.committer.sessionID), + zap.Int("keys", memdb.Len()), + zap.String("size", units.HumanSize(float64(memdb.Size()))), + zap.Int("flushed keys", flushedKeys), + zap.String("flushed size", units.HumanSize(float64(flushedSize))), + zap.Duration("take time", time.Since(startTime)), + ) }() - logutil.BgLogger().Info("[pipelined dml] flush memdb to kv store", - zap.Int("keys", memdb.Len()), zap.String("size", units.HumanSize(float64(memdb.Size()))), - zap.Int("flushed keys", flushedKeys), zap.String("flushed size", units.HumanSize(float64(flushedSize)))) + // The flush function will not be called concurrently. // TODO: set backoffer from upper context. bo := retry.NewBackofferWithVars(flushCtx, 20000, nil) From 125a140034d714c1d5e8782ba36c987acb0bfd06 Mon Sep 17 00:00:00 2001 From: crazycs Date: Tue, 2 Apr 2024 11:26:21 +0800 Subject: [PATCH 4/7] fix data race that may cause panic and concurrency request limit bug (#1219) Signed-off-by: crazycs520 --- internal/client/client_batch.go | 41 +++++++++++++++++++++++++++------ internal/client/client_test.go | 29 +++++++++++++++-------- 2 files changed, 54 insertions(+), 16 deletions(-) diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index d29606f90a..5ba07c25ac 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -588,7 +588,7 @@ func (c *batchCommandsClient) send(forwardedHost string, req *tikvpb.BatchComman zap.String("forwardedHost", forwardedHost), zap.Error(err), ) - c.failPendingRequests(err) + c.failRequestsByIDs(err, req.RequestIds) // fast fail requests. return } @@ -604,23 +604,50 @@ func (c *batchCommandsClient) send(forwardedHost string, req *tikvpb.BatchComman zap.Uint64s("requestIDs", req.RequestIds), zap.Error(err), ) - c.failPendingRequests(err) + c.failRequestsByIDs(err, req.RequestIds) // fast fail requests. } } // `failPendingRequests` must be called in locked contexts in order to avoid double closing channels. -func (c *batchCommandsClient) failPendingRequests(err error) { +// when enable-forwarding is true, the `forwardedHost` maybe not empty. +// failPendingRequests fails all pending requests which req.forwardedHost equals to forwardedHost parameter. +// Why need check `forwardedHost`? Here is an example, when enable-forwarding is true, and this client has network issue with store1: +// - some requests are sent to store1 with forwarding, such as forwardedHost is store2, those requests will succeed. +// - some requests are sent to store1 without forwarding, and may fail then `failPendingRequests` would be called, +// if we don't check `forwardedHost` and fail all pending requests, the requests with forwarding will be failed too. this may cause some issue: +// 1. data race. see https://github.com/tikv/client-go/issues/1222 and TestRandomRestartStoreAndForwarding. +// 2. panic which cause by `send on closed channel`, since failPendingRequests will close the entry.res channel, +// but in another batchRecvLoop goroutine, it may receive the response from forwardedHost store2 and try to send the response to entry.res channel, +// then panic by send on closed channel. +func (c *batchCommandsClient) failPendingRequests(err error, forwardedHost string) { util.EvalFailpoint("panicInFailPendingRequests") c.batched.Range(func(key, value interface{}) bool { id, _ := key.(uint64) entry, _ := value.(*batchCommandsEntry) - c.batched.Delete(id) - c.sent.Add(-1) - entry.error(err) + if entry.forwardedHost == forwardedHost { + c.failRequest(err, id, entry) + } return true }) } +// failRequestsByIDs fails requests by requestID. +func (c *batchCommandsClient) failRequestsByIDs(err error, requestIDs []uint64) { + for _, requestID := range requestIDs { + value, ok := c.batched.Load(requestID) + if !ok { + continue + } + c.failRequest(err, requestID, value.(*batchCommandsEntry)) + } +} + +func (c *batchCommandsClient) failRequest(err error, requestID uint64, entry *batchCommandsEntry) { + c.batched.Delete(requestID) + c.sent.Add(-1) + entry.error(err) +} + func (c *batchCommandsClient) waitConnReady() (err error) { state := c.conn.GetState() if state == connectivity.Ready { @@ -793,7 +820,7 @@ func (c *batchCommandsClient) recreateStreamingClient(err error, streamClient *b } *epoch++ - c.failPendingRequests(err) // fail all pending requests. + c.failPendingRequests(err, streamClient.forwardedHost) // fail all pending requests. b := retry.NewBackofferWithVars(context.Background(), math.MaxInt32, nil) for { // try to re-create the streaming in the loop. if c.isStopped() { diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 60d282ea74..3436406ecc 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -59,7 +59,6 @@ import ( "github.com/tikv/client-go/v2/internal/client/mockserver" "github.com/tikv/client-go/v2/internal/logutil" "github.com/tikv/client-go/v2/tikvrpc" - "github.com/tikv/client-go/v2/util/israce" "go.uber.org/zap" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/metadata" @@ -888,9 +887,6 @@ func TestBatchClientReceiveHealthFeedback(t *testing.T) { } func TestRandomRestartStoreAndForwarding(t *testing.T) { - if israce.RaceEnabled { - t.Skip("skip since race bug in issue #1222") - } store1, port1 := mockserver.StartMockTikvService() require.True(t, port1 > 0) require.True(t, store1.IsRunning()) @@ -908,6 +904,8 @@ func TestRandomRestartStoreAndForwarding(t *testing.T) { wg := sync.WaitGroup{} done := int64(0) concurrency := 500 + addr1 := store1.Addr() + addr2 := store2.Addr() wg.Add(1) go func() { defer wg.Done() @@ -931,7 +929,7 @@ func TestRandomRestartStoreAndForwarding(t *testing.T) { } }() - conn, err := client1.getConnArray(store1.Addr(), true) + conn, err := client1.getConnArray(addr1, true) assert.Nil(t, err) for j := 0; j < concurrency; j++ { wg.Add(1) @@ -944,9 +942,9 @@ func TestRandomRestartStoreAndForwarding(t *testing.T) { req := &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Coprocessor{Coprocessor: &coprocessor.Request{}}} forwardedHost := "" if i%2 != 0 { - forwardedHost = store2.Addr() + forwardedHost = addr2 } - _, err := sendBatchRequest(context.Background(), store1.Addr(), forwardedHost, conn.batchConn, req, time.Millisecond*50, 0) + _, err := sendBatchRequest(context.Background(), addr1, forwardedHost, conn.batchConn, req, time.Millisecond*50, 0) if err == nil || err.Error() == "EOF" || err.Error() == "rpc error: code = Unavailable desc = error reading from server: EOF" || @@ -964,11 +962,24 @@ func TestRandomRestartStoreAndForwarding(t *testing.T) { for _, cli := range conn.batchConn.batchCommandsClients { require.Equal(t, int64(9223372036854775807), cli.maxConcurrencyRequestLimit.Load()) require.True(t, cli.available() > 0, fmt.Sprintf("sent: %d", cli.sent.Load())) - // TODO(crazycs520): fix me, see https://github.com/tikv/client-go/pull/1219 - //require.True(t, cli.sent.Load() >= 0, fmt.Sprintf("sent: %d", cli.sent.Load())) + require.True(t, cli.sent.Load() >= 0, fmt.Sprintf("sent: %d", cli.sent.Load())) } } +func TestFastFailRequest(t *testing.T) { + client := NewRPCClient() + defer func() { + err := client.Close() + require.NoError(t, err) + }() + start := time.Now() + unknownAddr := "127.0.0.1:52027" + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}) + _, err := client.sendRequest(context.Background(), unknownAddr, req, time.Second*20) + require.Equal(t, "context deadline exceeded", errors.Cause(err).Error()) + require.True(t, time.Since(start) < time.Second*6) // fast fail when dial target failed. +} + func TestErrConn(t *testing.T) { e := errors.New("conn error") err1 := &ErrConn{Err: e, Addr: "127.0.0.1", Ver: 10} From 146a6329d84aa81493edec1c14a6eb41520af688 Mon Sep 17 00:00:00 2001 From: crazycs Date: Wed, 3 Apr 2024 11:24:15 +0800 Subject: [PATCH 5/7] *: fix panic log when call RegionRequestSender.String method (#1260) Signed-off-by: crazycs520 --- internal/locate/region_request.go | 5 ++++- internal/locate/region_request_test.go | 10 ++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 221dbee771..b655031cc8 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -117,7 +117,10 @@ type RegionRequestSender struct { } func (s *RegionRequestSender) String() string { - return fmt.Sprintf("{rpcError:%v,replicaSelector: %v}", s.rpcError, s.replicaSelector.String()) + if s.replicaSelector == nil { + return fmt.Sprintf("{rpcError:%v, replicaSelector: %v}", s.rpcError, s.replicaSelector) + } + return fmt.Sprintf("{rpcError:%v, replicaSelector: %v}", s.rpcError, s.replicaSelector.String()) } // RegionRequestRuntimeStats records the runtime stats of send region requests. diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index b120440715..c4042343e9 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -818,3 +818,13 @@ func (s *testRegionRequestToSingleStoreSuite) TestClientExt() { s.NotNil(sender.client) s.Nil(sender.getClientExt()) } + +func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestSenderString() { + sender := NewRegionRequestSender(s.cache, &fnClient{}) + loc, err := s.cache.LocateRegionByID(s.bo, s.region) + s.Nil(err) + // invalid region cache before sending request. + s.cache.InvalidateCachedRegion(loc.Region) + sender.SendReqCtx(s.bo, tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}), loc.Region, time.Second, tikvrpc.TiKV) + s.Equal("{rpcError:cached region invalid, replicaSelector: }", sender.String()) +} From c2927c0ec6ed3b3149b4d008f590ca6567c973b7 Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Wed, 3 Apr 2024 12:05:38 +0800 Subject: [PATCH 6/7] cop: fix time detail merge (#1258) * time detail merge Signed-off-by: cfzjywxk * fix test Signed-off-by: cfzjywxk --------- Signed-off-by: cfzjywxk --- integration_tests/snapshot_test.go | 22 ++++++---------------- util/execdetails.go | 13 ++++++++----- 2 files changed, 14 insertions(+), 21 deletions(-) diff --git a/integration_tests/snapshot_test.go b/integration_tests/snapshot_test.go index 3cf28105a1..bba5cac344 100644 --- a/integration_tests/snapshot_test.go +++ b/integration_tests/snapshot_test.go @@ -321,25 +321,15 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats() { } snapshot.MergeExecDetail(detail) expect = "Get:{num_rpc:4, total_time:2s},txnLockFast_backoff:{num:2, total_time:10ms}, " + - "total_process_time: 100ms, total_wait_time: 100ms, " + - "scan_detail: {total_process_keys: 10, " + - "total_process_keys_size: 10, " + - "total_keys: 15, " + - "get_snapshot_time: 500ns, " + - "rocksdb: {delete_skipped_count: 5, " + - "key_skipped_count: 1, " + - "block: {cache_hit_count: 10, read_count: 20, read_byte: 15 Bytes}}}" + "time_detail: {total_process_time: 100ms, total_wait_time: 100ms}, " + + "scan_detail: {total_process_keys: 10, total_process_keys_size: 10, total_keys: 15, get_snapshot_time: 500ns, " + + "rocksdb: {delete_skipped_count: 5, key_skipped_count: 1, block: {cache_hit_count: 10, read_count: 20, read_byte: 15 Bytes}}}" s.Equal(expect, snapshot.FormatStats()) snapshot.MergeExecDetail(detail) expect = "Get:{num_rpc:4, total_time:2s},txnLockFast_backoff:{num:2, total_time:10ms}, " + - "total_process_time: 200ms, total_wait_time: 200ms, " + - "scan_detail: {total_process_keys: 20, " + - "total_process_keys_size: 20, " + - "total_keys: 30, " + - "get_snapshot_time: 1µs, " + - "rocksdb: {delete_skipped_count: 10, " + - "key_skipped_count: 2, " + - "block: {cache_hit_count: 20, read_count: 40, read_byte: 30 Bytes}}}" + "time_detail: {total_process_time: 200ms, total_wait_time: 200ms}, " + + "scan_detail: {total_process_keys: 20, total_process_keys_size: 20, total_keys: 30, get_snapshot_time: 1µs, " + + "rocksdb: {delete_skipped_count: 10, key_skipped_count: 2, block: {cache_hit_count: 20, read_count: 40, read_byte: 30 Bytes}}}" s.Equal(expect, snapshot.FormatStats()) } diff --git a/util/execdetails.go b/util/execdetails.go index a9db7a7cea..2fd0584368 100644 --- a/util/execdetails.go +++ b/util/execdetails.go @@ -635,6 +635,7 @@ func (td *TimeDetail) String() string { return "" } buf := bytes.NewBuffer(make([]byte, 0, 16)) + buf.WriteString("time_detail: {") if td.ProcessTime > 0 { buf.WriteString("total_process_time: ") buf.WriteString(FormatDuration(td.ProcessTime)) @@ -667,17 +668,19 @@ func (td *TimeDetail) String() string { buf.WriteString("tikv_wall_time: ") buf.WriteString(FormatDuration(td.TotalRPCWallTime)) } + buf.WriteString("}") return buf.String() } // Merge merges the time detail into itself. +// Note this function could be called concurrently. func (td *TimeDetail) Merge(detail *TimeDetail) { if detail != nil { - td.ProcessTime += detail.ProcessTime - td.SuspendTime += detail.SuspendTime - td.WaitTime += detail.WaitTime - td.KvReadWallTime += detail.KvReadWallTime - td.TotalRPCWallTime += detail.TotalRPCWallTime + atomic.AddInt64((*int64)(&td.ProcessTime), int64(detail.ProcessTime)) + atomic.AddInt64((*int64)(&td.SuspendTime), int64(detail.SuspendTime)) + atomic.AddInt64((*int64)(&td.WaitTime), int64(detail.WaitTime)) + atomic.AddInt64((*int64)(&td.KvReadWallTime), int64(detail.KvReadWallTime)) + atomic.AddInt64((*int64)(&td.TotalRPCWallTime), int64(detail.TotalRPCWallTime)) } } From 5a4905d2f5534dcf4238e1bdb19cde47775e6081 Mon Sep 17 00:00:00 2001 From: crazycs Date: Wed, 3 Apr 2024 13:22:40 +0800 Subject: [PATCH 7/7] refine timedetail string (#1261) * refine timedetail string Signed-off-by: crazycs520 * add test Signed-off-by: crazycs520 * refine test Signed-off-by: crazycs520 --------- Signed-off-by: crazycs520 --- util/execdetails.go | 7 ++++--- util/misc_test.go | 13 +++++++++++++ 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/util/execdetails.go b/util/execdetails.go index 2fd0584368..b49298d21f 100644 --- a/util/execdetails.go +++ b/util/execdetails.go @@ -635,7 +635,6 @@ func (td *TimeDetail) String() string { return "" } buf := bytes.NewBuffer(make([]byte, 0, 16)) - buf.WriteString("time_detail: {") if td.ProcessTime > 0 { buf.WriteString("total_process_time: ") buf.WriteString(FormatDuration(td.ProcessTime)) @@ -668,8 +667,10 @@ func (td *TimeDetail) String() string { buf.WriteString("tikv_wall_time: ") buf.WriteString(FormatDuration(td.TotalRPCWallTime)) } - buf.WriteString("}") - return buf.String() + if buf.Len() == 0 { + return "" + } + return "time_detail: {" + buf.String() + "}" } // Merge merges the time detail into itself. diff --git a/util/misc_test.go b/util/misc_test.go index ae819ac7f9..0f85230646 100644 --- a/util/misc_test.go +++ b/util/misc_test.go @@ -88,3 +88,16 @@ func TestCompatibleParseGCTime(t *testing.T) { assert.NotNil(err) } } + +func TestTimeDetail(t *testing.T) { + detail := &TimeDetail{KvReadWallTime: time.Millisecond * 2, TotalRPCWallTime: time.Millisecond * 3} + assert.Equal(t, "time_detail: {total_kv_read_wall_time: 2ms, tikv_wall_time: 3ms}", detail.String()) + detail = &TimeDetail{ + ProcessTime: time.Millisecond * 2, + SuspendTime: time.Millisecond * 3, + WaitTime: time.Millisecond * 4, + KvReadWallTime: time.Millisecond * 5, + TotalRPCWallTime: time.Millisecond * 6, + } + assert.Equal(t, "time_detail: {total_process_time: 2ms, total_suspend_time: 3ms, total_wait_time: 4ms, total_kv_read_wall_time: 5ms, tikv_wall_time: 6ms}", detail.String()) +}