diff --git a/examples/gcworker/go.mod b/examples/gcworker/go.mod index 711e70930..9bb9a4827 100644 --- a/examples/gcworker/go.mod +++ b/examples/gcworker/go.mod @@ -1,6 +1,6 @@ module gcworker -go 1.23 +go 1.23.2 require github.com/tikv/client-go/v2 v2.0.0 diff --git a/examples/rawkv/go.mod b/examples/rawkv/go.mod index 39e643a43..091015156 100644 --- a/examples/rawkv/go.mod +++ b/examples/rawkv/go.mod @@ -1,6 +1,6 @@ module rawkv -go 1.23 +go 1.23.2 require github.com/tikv/client-go/v2 v2.0.0 diff --git a/examples/txnkv/1pc_txn/go.mod b/examples/txnkv/1pc_txn/go.mod index 179bce58d..f95ebbb94 100644 --- a/examples/txnkv/1pc_txn/go.mod +++ b/examples/txnkv/1pc_txn/go.mod @@ -1,6 +1,6 @@ module 1pc_txn -go 1.23 +go 1.23.2 require github.com/tikv/client-go/v2 v2.0.0 diff --git a/examples/txnkv/async_commit/go.mod b/examples/txnkv/async_commit/go.mod index c63516097..ee0307ad5 100644 --- a/examples/txnkv/async_commit/go.mod +++ b/examples/txnkv/async_commit/go.mod @@ -1,6 +1,6 @@ module async_commit -go 1.23 +go 1.23.2 require github.com/tikv/client-go/v2 v2.0.0 diff --git a/examples/txnkv/delete_range/go.mod b/examples/txnkv/delete_range/go.mod index 33036f2a7..7fbb8a44d 100644 --- a/examples/txnkv/delete_range/go.mod +++ b/examples/txnkv/delete_range/go.mod @@ -1,6 +1,6 @@ module delete_range -go 1.23 +go 1.23.2 require github.com/tikv/client-go/v2 v2.0.0 diff --git a/examples/txnkv/go.mod b/examples/txnkv/go.mod index b70e0325d..20af7476e 100644 --- a/examples/txnkv/go.mod +++ b/examples/txnkv/go.mod @@ -1,6 +1,6 @@ module txnkv -go 1.23 +go 1.23.2 require github.com/tikv/client-go/v2 v2.0.0 diff --git a/examples/txnkv/pessimistic_txn/go.mod b/examples/txnkv/pessimistic_txn/go.mod index ccefa42c0..8207a978c 100644 --- a/examples/txnkv/pessimistic_txn/go.mod +++ b/examples/txnkv/pessimistic_txn/go.mod @@ -1,6 +1,6 @@ module pessimistic_txn -go 1.23 +go 1.23.2 require github.com/tikv/client-go/v2 v2.0.0 diff --git a/examples/txnkv/unsafedestoryrange/go.mod b/examples/txnkv/unsafedestoryrange/go.mod index 9a562abe2..31a2a90ba 100644 --- a/examples/txnkv/unsafedestoryrange/go.mod +++ b/examples/txnkv/unsafedestoryrange/go.mod @@ -1,6 +1,6 @@ module unsafedestoryrange -go 1.23 +go 1.23.2 require github.com/tikv/client-go/v2 v2.0.0 diff --git a/go.mod b/go.mod index d2ff6fc5e..9b8d05afb 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/tikv/client-go/v2 go 1.23 require ( + github.com/VividCortex/ewma v1.2.0 github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da github.com/docker/go-units v0.5.0 diff --git a/go.sum b/go.sum index 60814ed10..90a890ffc 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= +github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= diff --git a/integration_tests/go.mod b/integration_tests/go.mod index 48d623f9f..46654aa17 100644 --- a/integration_tests/go.mod +++ b/integration_tests/go.mod @@ -1,6 +1,6 @@ module integration_tests -go 1.23 +go 1.23.2 require ( github.com/ninedraft/israce v0.0.3 diff --git a/integration_tests/pipelined_memdb_test.go b/integration_tests/pipelined_memdb_test.go index cfc240130..025d9df22 100644 --- a/integration_tests/pipelined_memdb_test.go +++ b/integration_tests/pipelined_memdb_test.go @@ -97,7 +97,7 @@ func (s *testPipelinedMemDBSuite) mustGetLock(key []byte) *txnkv.Lock { func (s *testPipelinedMemDBSuite) TestPipelinedAndFlush() { ctx := context.Background() - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) txn1, err := s.store.Begin() s.Nil(err) @@ -139,7 +139,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedAndFlush() { func (s *testPipelinedMemDBSuite) TestPipelinedMemDBBufferGet() { ctx := context.Background() - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) for i := 0; i < 100; i++ { key := []byte(strconv.Itoa(i)) @@ -161,7 +161,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedMemDBBufferGet() { } func (s *testPipelinedMemDBSuite) TestPipelinedFlushBlock() { - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) txn.Set([]byte("key1"), []byte("value1")) @@ -192,7 +192,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedFlushBlock() { } func (s *testPipelinedMemDBSuite) TestPipelinedSkipFlushedLock() { - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) txn.Set([]byte("key1"), []byte("value1")) flushed, err := txn.GetMemBuffer().Flush(true) @@ -206,7 +206,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedSkipFlushedLock() { s.Nil(txn.Commit(context.Background())) // can see it after commit - txn, err = s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err = s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) defer txn.Rollback() val, err := txn.Get(context.Background(), []byte("key1")) @@ -222,7 +222,7 @@ func (s *testPipelinedMemDBSuite) TestResolveLockRace() { failpoint.Disable("tikvclient/pipelinedCommitFail") }() for i := 0; i < 100; i++ { - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) startTS := txn.StartTS() s.Nil(err) for j := 0; j < 100; j++ { @@ -246,7 +246,7 @@ func (s *testPipelinedMemDBSuite) TestResolveLockRace() { } func (s *testPipelinedMemDBSuite) TestPipelinedCommit() { - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) for i := 0; i < 100; i++ { key := []byte(strconv.Itoa(i)) @@ -279,7 +279,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedCommit() { } // check the result - txn, err = s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err = s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) for i := 0; i < 100; i++ { key := []byte(strconv.Itoa(i)) @@ -290,7 +290,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedCommit() { } func (s *testPipelinedMemDBSuite) TestPipelinedRollback() { - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) startTS := txn.StartTS() s.Nil(err) keys := make([][]byte, 0, 100) @@ -304,7 +304,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedRollback() { s.Nil(txn.GetMemBuffer().FlushWait()) s.Nil(txn.Rollback()) s.Eventuallyf(func() bool { - txn, err := s.store.Begin(tikv.WithStartTS(startTS), tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithStartTS(startTS), tikv.WithDefaultPipelinedTxn()) s.Nil(err) defer func() { s.Nil(txn.Rollback()) }() storageBufferedValues, err := txn.GetSnapshot().BatchGetWithTier(context.Background(), keys, txnsnapshot.BatchGetBufferTier) @@ -322,7 +322,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedRollback() { func (s *testPipelinedMemDBSuite) TestPipelinedPrefetch() { failpoint.Enable("tikvclient/beforeSendReqToRegion", "return") defer failpoint.Disable("tikvclient/beforeSendReqToRegion") - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) mustFlush := func(txn *transaction.KVTxn) { @@ -384,7 +384,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedPrefetch() { s.Nil(txn.Commit(context.Background())) - txn, err = s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err = s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) txn.Set([]byte("100"), []byte("100")) // snapshot: [0, 1, ..., 99], membuffer: [100] m, err := txn.BatchGet(context.Background(), [][]byte{[]byte("99"), []byte("100"), []byte("101")}) @@ -417,7 +417,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedPrefetch() { txn.Rollback() // empty memdb should also cache the not exist result. - txn, err = s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err = s.store.Begin(tikv.WithDefaultPipelinedTxn()) // batch get cache: [99 -> not exist] m, err = txn.BatchGet(context.Background(), [][]byte{[]byte("99")}) s.Nil(err) @@ -433,7 +433,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedDMLFailedByPKRollback() { atomic.StoreUint64(&transaction.ManagedLockTTL, 100) // set to 100ms defer atomic.StoreUint64(&transaction.ManagedLockTTL, originManageTTLVal) - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) txn.Set([]byte("key1"), []byte("value1")) txnProbe := transaction.TxnProbe{KVTxn: txn} @@ -482,7 +482,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedDMLFailedByPKMaxTTLExceeded() { restoreGlobalConfFunc() }() - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) txn.Set([]byte("key1"), []byte("value1")) txnProbe := transaction.TxnProbe{KVTxn: txn} diff --git a/metrics/metrics.go b/metrics/metrics.go index ce247600c..a566473c0 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -117,6 +117,7 @@ var ( TiKVPipelinedFlushDuration prometheus.Histogram TiKVValidateReadTSFromPDCount prometheus.Counter TiKVLowResolutionTSOUpdateIntervalSecondsGauge prometheus.Gauge + TiKVPipelinedFlushThrottleSecondsHistogram prometheus.Histogram ) // Label constants. @@ -852,6 +853,15 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Help: "The actual working update interval for the low resolution TSO. As there are adaptive mechanism internally, this value may differ from the config.", }) + TiKVPipelinedFlushThrottleSecondsHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "pipelined_flush_throttle_seconds", + Help: "Throttle durations of pipelined flushes.", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 28), // 0.5ms ~ 18h + }) + initShortcuts() } @@ -948,6 +958,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVPipelinedFlushDuration) prometheus.MustRegister(TiKVValidateReadTSFromPDCount) prometheus.MustRegister(TiKVLowResolutionTSOUpdateIntervalSecondsGauge) + prometheus.MustRegister(TiKVPipelinedFlushThrottleSecondsHistogram) } // readCounter reads the value of a prometheus.Counter. diff --git a/tikv/kv.go b/tikv/kv.go index db375ae57..9828683c1 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -80,6 +80,10 @@ const ( // DCLabelKey indicates the key of label which represents the dc for Store. DCLabelKey = "zone" safeTSUpdateInterval = time.Second * 2 + + defaultPipelinedFlushConcurrency = 128 + defaultPipelinedResolveLockConcurrency = 8 + defaultPipelinedWriteThrottleRatio = 0.0 ) func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, error) { @@ -944,10 +948,31 @@ func WithStartTS(startTS uint64) TxnOption { } } -// WithPipelinedMemDB creates transaction with pipelined memdb. -func WithPipelinedMemDB() TxnOption { +// WithDefaultPipelinedTxn creates pipelined txn with default parameters +func WithDefaultPipelinedTxn() TxnOption { return func(st *transaction.TxnOptions) { - st.PipelinedMemDB = true + st.PipelinedTxn = transaction.PipelinedTxnOptions{ + Enable: true, + FlushConcurrency: defaultPipelinedFlushConcurrency, + ResolveLockConcurrency: defaultPipelinedResolveLockConcurrency, + WriteThrottleRatio: defaultPipelinedWriteThrottleRatio, + } + } +} + +// WithPipelinedTxn creates pipelined txn with specified parameters +func WithPipelinedTxn( + flushConcurrency, + resolveLockConcurrency int, + writeThrottleRatio float64, +) TxnOption { + return func(st *transaction.TxnOptions) { + st.PipelinedTxn = transaction.PipelinedTxnOptions{ + Enable: true, + FlushConcurrency: flushConcurrency, + ResolveLockConcurrency: resolveLockConcurrency, + WriteThrottleRatio: writeThrottleRatio, + } } } diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 3298b10de..6a739b361 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -1102,16 +1102,28 @@ func (c *twoPhaseCommitter) doActionOnBatches( } return nil } - rateLim := len(batches) + rateLim := c.calcActionConcurrency(len(batches), action) + batchExecutor := newBatchExecutor(rateLim, c, action, bo) + return batchExecutor.process(batches) +} + +func (c *twoPhaseCommitter) calcActionConcurrency( + numBatches int, action twoPhaseCommitAction, +) int { + rateLim := numBatches // Set rateLim here for the large transaction. // If the rate limit is too high, tikv will report service is busy. // If the rate limit is too low, we can't full utilize the tikv's throughput. // TODO: Find a self-adaptive way to control the rate limit here. - if rateLim > config.GetGlobalConfig().CommitterConcurrency { - rateLim = config.GetGlobalConfig().CommitterConcurrency + switch action.(type) { + case actionPipelinedFlush: + rateLim = min(rateLim, max(1, c.txn.pipelinedFlushConcurrency)) + default: + if rateLim > config.GetGlobalConfig().CommitterConcurrency { + rateLim = config.GetGlobalConfig().CommitterConcurrency + } } - batchExecutor := newBatchExecutor(rateLim, c, action, bo) - return batchExecutor.process(batches) + return rateLim } func (c *twoPhaseCommitter) keyValueSize(key, value []byte) int { diff --git a/txnkv/transaction/pipelined_flush.go b/txnkv/transaction/pipelined_flush.go index ce26d4647..c2dff6c4a 100644 --- a/txnkv/transaction/pipelined_flush.go +++ b/txnkv/transaction/pipelined_flush.go @@ -454,7 +454,6 @@ func (c *twoPhaseCommitter) buildPipelinedResolveHandler(commit bool, resolved * // resolveFlushedLocks resolves all locks in the given range [start, end) with the given status. // The resolve process is running in another goroutine so this function won't block. func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end []byte, commit bool) { - const RESOLVE_CONCURRENCY = 8 var resolved atomic.Uint64 handler, err := c.buildPipelinedResolveHandler(commit, &resolved) commitTs := uint64(0) @@ -481,7 +480,7 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end fmt.Sprintf("pipelined-dml-%s", status), fmt.Sprintf("pipelined-dml-%s-%d", status, c.startTS), c.store, - RESOLVE_CONCURRENCY, + c.txn.pipelinedResolveLockConcurrency, handler, ) runner.SetStatLogInterval(30 * time.Second) diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index aeebceb1c..8605d79f6 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -49,6 +49,7 @@ import ( "sync/atomic" "time" + "github.com/VividCortex/ewma" "github.com/dgryski/go-farm" "github.com/docker/go-units" "github.com/opentracing/opentracing-go" @@ -74,6 +75,7 @@ import ( // MaxTxnTimeUse is the max time a Txn may use (in ms) from its begin to commit. // We use it to abort the transaction to guarantee GC worker will not influence it. const MaxTxnTimeUse = 24 * 60 * 60 * 1000 +const defaultEWMAAge = 10 type tempLockBufferEntry struct { HasReturnValue bool @@ -109,12 +111,20 @@ func (e *tempLockBufferEntry) trySkipLockingOnRetry(returnValue bool, checkExist return true } +type PipelinedTxnOptions struct { + Enable bool + FlushConcurrency int + ResolveLockConcurrency int + // [0,1), 0 = no sleep, 1 = no write + WriteThrottleRatio float64 +} + // TxnOptions indicates the option when beginning a transaction. // TxnOptions are set by the TxnOption values passed to Begin type TxnOptions struct { - TxnScope string - StartTS *uint64 - PipelinedMemDB bool + TxnScope string + StartTS *uint64 + PipelinedTxn PipelinedTxnOptions } // KVTxn contains methods to interact with a TiKV transaction. @@ -170,30 +180,39 @@ type KVTxn struct { forUpdateTSChecks map[string]uint64 - isPipelined bool - pipelinedCancel context.CancelFunc + isPipelined bool + pipelinedCancel context.CancelFunc + pipelinedFlushConcurrency int + pipelinedResolveLockConcurrency int + writeThrottleRatio float64 + // flushBatchDurationEWMA is read before each flush, and written after each flush => no race + flushBatchDurationEWMA ewma.MovingAverage } // NewTiKVTxn creates a new KVTxn. func NewTiKVTxn(store kvstore, snapshot *txnsnapshot.KVSnapshot, startTS uint64, options *TxnOptions) (*KVTxn, error) { cfg := config.GetGlobalConfig() newTiKVTxn := &KVTxn{ - snapshot: snapshot, - store: store, - startTS: startTS, - startTime: time.Now(), - valid: true, - vars: tikv.DefaultVars, - scope: options.TxnScope, - enableAsyncCommit: cfg.EnableAsyncCommit, - enable1PC: cfg.Enable1PC, - diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull, - RequestSource: snapshot.RequestSource, - } - if !options.PipelinedMemDB { + snapshot: snapshot, + store: store, + startTS: startTS, + startTime: time.Now(), + valid: true, + vars: tikv.DefaultVars, + scope: options.TxnScope, + enableAsyncCommit: cfg.EnableAsyncCommit, + enable1PC: cfg.Enable1PC, + diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull, + RequestSource: snapshot.RequestSource, + flushBatchDurationEWMA: ewma.NewMovingAverage(defaultEWMAAge), + } + if !options.PipelinedTxn.Enable { newTiKVTxn.us = unionstore.NewUnionStore(unionstore.NewMemDB(), snapshot) return newTiKVTxn, nil } + newTiKVTxn.pipelinedFlushConcurrency = options.PipelinedTxn.FlushConcurrency + newTiKVTxn.pipelinedResolveLockConcurrency = options.PipelinedTxn.ResolveLockConcurrency + newTiKVTxn.writeThrottleRatio = options.PipelinedTxn.WriteThrottleRatio if err := newTiKVTxn.InitPipelinedMemDB(); err != nil { return nil, err } @@ -627,7 +646,15 @@ func (txn *KVTxn) InitPipelinedMemDB() error { } mutations.Push(op, false, mustExist, mustNotExist, flags.HasNeedConstraintCheckInPrewrite(), it.Handle()) } - return txn.committer.pipelinedFlushMutations(bo, mutations, generation) + txn.throttle() + flushStart := time.Now() + err = txn.committer.pipelinedFlushMutations(bo, mutations, generation) + if txn.flushBatchDurationEWMA.Value() == 0 { + txn.flushBatchDurationEWMA.Set(float64(time.Since(flushStart).Milliseconds())) + } else { + txn.flushBatchDurationEWMA.Add(float64(time.Since(flushStart).Milliseconds())) + } + return err }) txn.committer.priority = txn.priority.ToPB() txn.committer.syncLog = txn.syncLog @@ -638,6 +665,34 @@ func (txn *KVTxn) InitPipelinedMemDB() error { return nil } +func (txn *KVTxn) throttle() { + if txn.writeThrottleRatio >= 1 || txn.writeThrottleRatio < 0 { + logutil.BgLogger().Error( + "[pipelined dml] invalid write speed ratio", + zap.Float64("writeThrottleRatio", txn.writeThrottleRatio), + zap.Uint64("session", txn.committer.sessionID), + zap.Uint64("startTS", txn.startTS), + ) + return + } + + expectedFlushMs := txn.flushBatchDurationEWMA.Value() + // T_sleep / (T_sleep + T_flush) = writeThrottleRatio + sleepMs := int(txn.writeThrottleRatio / (1.0 - txn.writeThrottleRatio) * expectedFlushMs) + metrics.TiKVPipelinedFlushThrottleSecondsHistogram.Observe(float64(sleepMs) / 1000) + if sleepMs == 0 { + return + } + logutil.BgLogger().Info( + "[pipelined dml] throttle", + zap.Int("sleepMs", sleepMs), + zap.Float64("writeThrottleRatio", txn.writeThrottleRatio), + zap.Uint64("session", txn.committer.sessionID), + zap.Uint64("startTS", txn.startTS), + ) + time.Sleep(time.Duration(sleepMs) * time.Millisecond) +} + // IsCasualConsistency returns if the transaction allows linearizability // inconsistency. func (txn *KVTxn) IsCasualConsistency() bool {