diff --git a/txnkv/transaction/pipelined_flush.go b/txnkv/transaction/pipelined_flush.go index 2ef90cbc8..288f5da9b 100644 --- a/txnkv/transaction/pipelined_flush.go +++ b/txnkv/transaction/pipelined_flush.go @@ -17,6 +17,7 @@ package transaction import ( "bytes" "context" + "fmt" "strconv" "sync/atomic" "time" @@ -115,6 +116,12 @@ func (action actionPipelinedFlush) handleSingleBatch( c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations, ) (err error) { if len(c.primaryKey) == 0 { + logutil.Logger(bo.GetCtx()).Error( + "[pipelined dml] primary key should be set before pipelined flush", + zap.Uint64("startTS", c.startTS), + zap.Uint64("generation", action.generation), + zap.Uint64("session", c.sessionID), + ) return errors.New("[pipelined dml] primary key should be set before pipelined flush") } @@ -132,6 +139,8 @@ func (action actionPipelinedFlush) handleSingleBatch( logutil.BgLogger().Warn( "[pipelined dml] slow pipelined flush request", zap.Uint64("startTS", c.startTS), + zap.Uint64("generation", action.generation), + zap.Uint64("session", c.sessionID), zap.Stringer("region", &batch.region), zap.Int("attempts", attempts), ) @@ -223,6 +232,7 @@ func (action actionPipelinedFlush) handleSingleBatch( "[pipelined dml] encounters lock", zap.Uint64("session", c.sessionID), zap.Uint64("txnID", c.startTS), + zap.Uint64("generation", action.generation), zap.Stringer("lock", lock), ) // If an optimistic transaction encounters a lock with larger TS, this transaction will certainly @@ -265,6 +275,13 @@ func (action actionPipelinedFlush) handleSingleBatch( errors.Errorf("[pipelined dml] flush lockedKeys: %d", len(locks)), ) if err != nil { + logutil.Logger(bo.GetCtx()).Warn( + "[pipelined dml] backoff failed during flush", + zap.Error(err), + zap.Uint64("startTS", c.startTS), + zap.Uint64("generation", action.generation), + zap.Uint64("session", c.sessionID), + ) return err } } @@ -284,12 +301,17 @@ func (c *twoPhaseCommitter) pipelinedFlushMutations(bo *retry.Backoffer, mutatio func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error { logutil.BgLogger().Info("[pipelined dml] start to commit transaction", zap.Int("keys", c.txn.GetMemBuffer().Len()), - zap.String("size", units.HumanSize(float64(c.txn.GetMemBuffer().Size())))) + zap.String("size", units.HumanSize(float64(c.txn.GetMemBuffer().Size()))), + zap.Uint64("startTS", c.startTS), + zap.Uint64("session", c.sessionID), + ) commitTS, err := c.store.GetTimestampWithRetry(bo, c.txn.GetScope()) if err != nil { logutil.Logger(bo.GetCtx()).Warn("[pipelined dml] commit transaction get commitTS failed", zap.Error(err), - zap.Uint64("txnStartTS", c.startTS)) + zap.Uint64("txnStartTS", c.startTS), + zap.Uint64("session", c.sessionID), + ) return err } atomic.StoreUint64(&c.commitTS, commitTS) @@ -306,7 +328,12 @@ func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error { c.mu.RLock() c.mu.committed = true c.mu.RUnlock() - logutil.BgLogger().Info("[pipelined dml] transaction is committed") + logutil.BgLogger().Info( + "[pipelined dml] transaction is committed", + zap.Uint64("startTS", c.startTS), + zap.Uint64("commitTS", commitTS), + zap.Uint64("session", c.sessionID), + ) if _, err := util.EvalFailpoint("pipelinedSkipResolveLock"); err == nil { return nil @@ -398,16 +425,36 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end var resolved atomic.Uint64 handler, err := c.buildPipelinedResolveHandler(true, &resolved) if err != nil { - logutil.Logger(bo.GetCtx()).Error("[pipelined dml] build buildPipelinedResolveHandler error", zap.Error(err)) + logutil.Logger(bo.GetCtx()).Error( + "[pipelined dml] build buildPipelinedResolveHandler error", + zap.Error(err), + zap.Uint64("resolved regions", resolved.Load()), + zap.Uint64("startTS", c.startTS), + zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)), + zap.Uint64("session", c.sessionID), + ) return } - runner := rangetask.NewRangeTaskRunner("pipelined-dml-commit", c.store, RESOLVE_CONCURRENCY, handler) + runner := rangetask.NewRangeTaskRunner( + fmt.Sprintf("pipelined-dml-commit-%d", c.startTS), + c.store, + RESOLVE_CONCURRENCY, + handler, + ) if err = runner.RunOnRange(bo.GetCtx(), start, end); err != nil { logutil.Logger(bo.GetCtx()).Error("[pipelined dml] commit transaction secondaries failed", zap.Uint64("resolved regions", resolved.Load()), - zap.Error(err)) + zap.Uint64("startTS", c.startTS), + zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)), + zap.Uint64("session", c.sessionID), + zap.Error(err), + ) } else { logutil.BgLogger().Info("[pipelined dml] commit transaction secondaries done", - zap.Uint64("resolved regions", resolved.Load())) + zap.Uint64("resolved regions", resolved.Load()), + zap.Uint64("startTS", c.startTS), + zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)), + zap.Uint64("session", c.sessionID), + ) } } diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 890a86d42..c190494eb 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -451,16 +451,26 @@ func (txn *KVTxn) InitPipelinedMemDB() error { pipelinedMemDB := unionstore.NewPipelinedMemDB(func(ctx context.Context, keys [][]byte) (map[string][]byte, error) { return txn.snapshot.BatchGetWithTier(ctx, keys, txnsnapshot.BatchGetBufferTier) }, func(generation uint64, memdb *unionstore.MemDB) (err error) { + startTime := time.Now() defer func() { if err != nil { txn.committer.ttlManager.close() } flushedKeys += memdb.Len() flushedSize += memdb.Size() + logutil.BgLogger().Info( + "[pipelined dml] flushed memdb to kv store", + zap.Uint64("startTS", txn.startTS), + zap.Uint64("generation", generation), + zap.Uint64("session", txn.committer.sessionID), + zap.Int("keys", memdb.Len()), + zap.String("size", units.HumanSize(float64(memdb.Size()))), + zap.Int("flushed keys", flushedKeys), + zap.String("flushed size", units.HumanSize(float64(flushedSize))), + zap.Duration("take time", time.Since(startTime)), + ) }() - logutil.BgLogger().Info("[pipelined dml] flush memdb to kv store", - zap.Int("keys", memdb.Len()), zap.String("size", units.HumanSize(float64(memdb.Size()))), - zap.Int("flushed keys", flushedKeys), zap.String("flushed size", units.HumanSize(float64(flushedSize)))) + // The flush function will not be called concurrently. // TODO: set backoffer from upper context. bo := retry.NewBackofferWithVars(flushCtx, 20000, nil)