Skip to content

Commit

Permalink
log: more items for diagnosing pipelined dml (#1252)
Browse files Browse the repository at this point in the history
* log: more items for diagnosing pipelined dml

Signed-off-by: ekexium <[email protected]>

* log: flush duration

Signed-off-by: ekexium <[email protected]>

---------

Signed-off-by: ekexium <[email protected]>
Co-authored-by: cfzjywxk <[email protected]>
  • Loading branch information
ekexium and cfzjywxk authored Apr 1, 2024
1 parent 69310ff commit 356eb45
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 10 deletions.
61 changes: 54 additions & 7 deletions txnkv/transaction/pipelined_flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package transaction
import (
"bytes"
"context"
"fmt"
"strconv"
"sync/atomic"
"time"
Expand Down Expand Up @@ -115,6 +116,12 @@ func (action actionPipelinedFlush) handleSingleBatch(
c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations,
) (err error) {
if len(c.primaryKey) == 0 {
logutil.Logger(bo.GetCtx()).Error(
"[pipelined dml] primary key should be set before pipelined flush",
zap.Uint64("startTS", c.startTS),
zap.Uint64("generation", action.generation),
zap.Uint64("session", c.sessionID),
)
return errors.New("[pipelined dml] primary key should be set before pipelined flush")
}

Expand All @@ -132,6 +139,8 @@ func (action actionPipelinedFlush) handleSingleBatch(
logutil.BgLogger().Warn(
"[pipelined dml] slow pipelined flush request",
zap.Uint64("startTS", c.startTS),
zap.Uint64("generation", action.generation),
zap.Uint64("session", c.sessionID),
zap.Stringer("region", &batch.region),
zap.Int("attempts", attempts),
)
Expand Down Expand Up @@ -223,6 +232,7 @@ func (action actionPipelinedFlush) handleSingleBatch(
"[pipelined dml] encounters lock",
zap.Uint64("session", c.sessionID),
zap.Uint64("txnID", c.startTS),
zap.Uint64("generation", action.generation),
zap.Stringer("lock", lock),
)
// If an optimistic transaction encounters a lock with larger TS, this transaction will certainly
Expand Down Expand Up @@ -265,6 +275,13 @@ func (action actionPipelinedFlush) handleSingleBatch(
errors.Errorf("[pipelined dml] flush lockedKeys: %d", len(locks)),
)
if err != nil {
logutil.Logger(bo.GetCtx()).Warn(
"[pipelined dml] backoff failed during flush",
zap.Error(err),
zap.Uint64("startTS", c.startTS),
zap.Uint64("generation", action.generation),
zap.Uint64("session", c.sessionID),
)
return err
}
}
Expand All @@ -284,12 +301,17 @@ func (c *twoPhaseCommitter) pipelinedFlushMutations(bo *retry.Backoffer, mutatio
func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error {
logutil.BgLogger().Info("[pipelined dml] start to commit transaction",
zap.Int("keys", c.txn.GetMemBuffer().Len()),
zap.String("size", units.HumanSize(float64(c.txn.GetMemBuffer().Size()))))
zap.String("size", units.HumanSize(float64(c.txn.GetMemBuffer().Size()))),
zap.Uint64("startTS", c.startTS),
zap.Uint64("session", c.sessionID),
)
commitTS, err := c.store.GetTimestampWithRetry(bo, c.txn.GetScope())
if err != nil {
logutil.Logger(bo.GetCtx()).Warn("[pipelined dml] commit transaction get commitTS failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
zap.Uint64("txnStartTS", c.startTS),
zap.Uint64("session", c.sessionID),
)
return err
}
atomic.StoreUint64(&c.commitTS, commitTS)
Expand All @@ -306,7 +328,12 @@ func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error {
c.mu.RLock()
c.mu.committed = true
c.mu.RUnlock()
logutil.BgLogger().Info("[pipelined dml] transaction is committed")
logutil.BgLogger().Info(
"[pipelined dml] transaction is committed",
zap.Uint64("startTS", c.startTS),
zap.Uint64("commitTS", commitTS),
zap.Uint64("session", c.sessionID),
)

if _, err := util.EvalFailpoint("pipelinedSkipResolveLock"); err == nil {
return nil
Expand Down Expand Up @@ -398,16 +425,36 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end
var resolved atomic.Uint64
handler, err := c.buildPipelinedResolveHandler(true, &resolved)
if err != nil {
logutil.Logger(bo.GetCtx()).Error("[pipelined dml] build buildPipelinedResolveHandler error", zap.Error(err))
logutil.Logger(bo.GetCtx()).Error(
"[pipelined dml] build buildPipelinedResolveHandler error",
zap.Error(err),
zap.Uint64("resolved regions", resolved.Load()),
zap.Uint64("startTS", c.startTS),
zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)),
zap.Uint64("session", c.sessionID),
)
return
}
runner := rangetask.NewRangeTaskRunner("pipelined-dml-commit", c.store, RESOLVE_CONCURRENCY, handler)
runner := rangetask.NewRangeTaskRunner(
fmt.Sprintf("pipelined-dml-commit-%d", c.startTS),
c.store,
RESOLVE_CONCURRENCY,
handler,
)
if err = runner.RunOnRange(bo.GetCtx(), start, end); err != nil {
logutil.Logger(bo.GetCtx()).Error("[pipelined dml] commit transaction secondaries failed",
zap.Uint64("resolved regions", resolved.Load()),
zap.Error(err))
zap.Uint64("startTS", c.startTS),
zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)),
zap.Uint64("session", c.sessionID),
zap.Error(err),
)
} else {
logutil.BgLogger().Info("[pipelined dml] commit transaction secondaries done",
zap.Uint64("resolved regions", resolved.Load()))
zap.Uint64("resolved regions", resolved.Load()),
zap.Uint64("startTS", c.startTS),
zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)),
zap.Uint64("session", c.sessionID),
)
}
}
16 changes: 13 additions & 3 deletions txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,16 +451,26 @@ func (txn *KVTxn) InitPipelinedMemDB() error {
pipelinedMemDB := unionstore.NewPipelinedMemDB(func(ctx context.Context, keys [][]byte) (map[string][]byte, error) {
return txn.snapshot.BatchGetWithTier(ctx, keys, txnsnapshot.BatchGetBufferTier)
}, func(generation uint64, memdb *unionstore.MemDB) (err error) {
startTime := time.Now()
defer func() {
if err != nil {
txn.committer.ttlManager.close()
}
flushedKeys += memdb.Len()
flushedSize += memdb.Size()
logutil.BgLogger().Info(
"[pipelined dml] flushed memdb to kv store",
zap.Uint64("startTS", txn.startTS),
zap.Uint64("generation", generation),
zap.Uint64("session", txn.committer.sessionID),
zap.Int("keys", memdb.Len()),
zap.String("size", units.HumanSize(float64(memdb.Size()))),
zap.Int("flushed keys", flushedKeys),
zap.String("flushed size", units.HumanSize(float64(flushedSize))),
zap.Duration("take time", time.Since(startTime)),
)
}()
logutil.BgLogger().Info("[pipelined dml] flush memdb to kv store",
zap.Int("keys", memdb.Len()), zap.String("size", units.HumanSize(float64(memdb.Size()))),
zap.Int("flushed keys", flushedKeys), zap.String("flushed size", units.HumanSize(float64(flushedSize))))

// The flush function will not be called concurrently.
// TODO: set backoffer from upper context.
bo := retry.NewBackofferWithVars(flushCtx, 20000, nil)
Expand Down

0 comments on commit 356eb45

Please sign in to comment.