From 4618da43c9555116d179eed024981026e92ea6a2 Mon Sep 17 00:00:00 2001 From: ekexium Date: Sat, 26 Oct 2024 00:38:26 +0800 Subject: [PATCH 1/5] feat: allow setting flush concurrency Signed-off-by: ekexium --- config/config.go | 3 ++- examples/gcworker/go.mod | 4 ++-- examples/rawkv/go.mod | 4 ++-- examples/txnkv/1pc_txn/go.mod | 4 ++-- examples/txnkv/async_commit/go.mod | 4 ++-- examples/txnkv/delete_range/go.mod | 4 ++-- examples/txnkv/go.mod | 4 ++-- examples/txnkv/pessimistic_txn/go.mod | 4 ++-- examples/txnkv/unsafedestoryrange/go.mod | 4 ++-- integration_tests/go.mod | 2 +- txnkv/transaction/2pc.go | 22 +++++++++++++++++----- 11 files changed, 36 insertions(+), 23 deletions(-) diff --git a/config/config.go b/config/config.go index 8b062645a..ad214e358 100644 --- a/config/config.go +++ b/config/config.go @@ -48,7 +48,8 @@ import ( ) var ( - globalConf atomic.Value + globalConf atomic.Value + PipelinedFlushConcurrency atomic.Uint32 ) const ( diff --git a/examples/gcworker/go.mod b/examples/gcworker/go.mod index a81da93f2..f03906cc2 100644 --- a/examples/gcworker/go.mod +++ b/examples/gcworker/go.mod @@ -1,6 +1,6 @@ module gcworker -go 1.23 +go 1.23.2 require github.com/tikv/client-go/v2 v2.0.0 @@ -11,7 +11,7 @@ require ( github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect - github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect + github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da // indirect github.com/docker/go-units v0.5.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect diff --git a/examples/rawkv/go.mod b/examples/rawkv/go.mod index ef7c9279a..e41ccccc6 100644 --- a/examples/rawkv/go.mod +++ b/examples/rawkv/go.mod @@ -1,6 +1,6 @@ module rawkv -go 1.23 +go 1.23.2 require github.com/tikv/client-go/v2 v2.0.0 @@ -11,7 +11,7 @@ require ( github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect - github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect + github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da // indirect github.com/docker/go-units v0.5.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect diff --git a/examples/txnkv/1pc_txn/go.mod b/examples/txnkv/1pc_txn/go.mod index f7dcef09c..880fb8a6c 100644 --- a/examples/txnkv/1pc_txn/go.mod +++ b/examples/txnkv/1pc_txn/go.mod @@ -1,6 +1,6 @@ module 1pc_txn -go 1.23 +go 1.23.2 require github.com/tikv/client-go/v2 v2.0.0 @@ -11,7 +11,7 @@ require ( github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect - github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect + github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da // indirect github.com/docker/go-units v0.5.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect diff --git a/examples/txnkv/async_commit/go.mod b/examples/txnkv/async_commit/go.mod index a09833288..d9957ef55 100644 --- a/examples/txnkv/async_commit/go.mod +++ b/examples/txnkv/async_commit/go.mod @@ -1,6 +1,6 @@ module async_commit -go 1.23 +go 1.23.2 require github.com/tikv/client-go/v2 v2.0.0 @@ -11,7 +11,7 @@ require ( github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect - github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect + github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da // indirect github.com/docker/go-units v0.5.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect diff --git a/examples/txnkv/delete_range/go.mod b/examples/txnkv/delete_range/go.mod index 79b958fe0..76f210420 100644 --- a/examples/txnkv/delete_range/go.mod +++ b/examples/txnkv/delete_range/go.mod @@ -1,6 +1,6 @@ module delete_range -go 1.23 +go 1.23.2 require github.com/tikv/client-go/v2 v2.0.0 @@ -11,7 +11,7 @@ require ( github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect - github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect + github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da // indirect github.com/docker/go-units v0.5.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect diff --git a/examples/txnkv/go.mod b/examples/txnkv/go.mod index 19ca706d3..c0a8c9a08 100644 --- a/examples/txnkv/go.mod +++ b/examples/txnkv/go.mod @@ -1,6 +1,6 @@ module txnkv -go 1.23 +go 1.23.2 require github.com/tikv/client-go/v2 v2.0.0 @@ -11,7 +11,7 @@ require ( github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect - github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect + github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da // indirect github.com/docker/go-units v0.5.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect diff --git a/examples/txnkv/pessimistic_txn/go.mod b/examples/txnkv/pessimistic_txn/go.mod index dd5e13bd4..0d590d679 100644 --- a/examples/txnkv/pessimistic_txn/go.mod +++ b/examples/txnkv/pessimistic_txn/go.mod @@ -1,6 +1,6 @@ module pessimistic_txn -go 1.23 +go 1.23.2 require github.com/tikv/client-go/v2 v2.0.0 @@ -11,7 +11,7 @@ require ( github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect - github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect + github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da // indirect github.com/docker/go-units v0.5.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect diff --git a/examples/txnkv/unsafedestoryrange/go.mod b/examples/txnkv/unsafedestoryrange/go.mod index 38911f96f..05105cdfb 100644 --- a/examples/txnkv/unsafedestoryrange/go.mod +++ b/examples/txnkv/unsafedestoryrange/go.mod @@ -1,6 +1,6 @@ module unsafedestoryrange -go 1.23 +go 1.23.2 require github.com/tikv/client-go/v2 v2.0.0 @@ -11,7 +11,7 @@ require ( github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect - github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect + github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da // indirect github.com/docker/go-units v0.5.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect diff --git a/integration_tests/go.mod b/integration_tests/go.mod index efff05159..df4630f3e 100644 --- a/integration_tests/go.mod +++ b/integration_tests/go.mod @@ -1,6 +1,6 @@ module integration_tests -go 1.23 +go 1.23.2 require ( github.com/ninedraft/israce v0.0.3 diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 11c1ed0bd..bdcffca25 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -1102,16 +1102,28 @@ func (c *twoPhaseCommitter) doActionOnBatches( } return nil } - rateLim := len(batches) + rateLim := c.calcActionConcurrency(len(batches), action) + batchExecutor := newBatchExecutor(rateLim, c, action, bo) + return batchExecutor.process(batches) +} + +func (c *twoPhaseCommitter) calcActionConcurrency( + numBatches int, action twoPhaseCommitAction, +) int { + rateLim := numBatches // Set rateLim here for the large transaction. // If the rate limit is too high, tikv will report service is busy. // If the rate limit is too low, we can't full utilize the tikv's throughput. // TODO: Find a self-adaptive way to control the rate limit here. - if rateLim > config.GetGlobalConfig().CommitterConcurrency { - rateLim = config.GetGlobalConfig().CommitterConcurrency + switch action.(type) { + case actionPipelinedFlush: + rateLim = min(rateLim, max(1, int(config.PipelinedFlushConcurrency.Load()))) + default: + if rateLim > config.GetGlobalConfig().CommitterConcurrency { + rateLim = config.GetGlobalConfig().CommitterConcurrency + } } - batchExecutor := newBatchExecutor(rateLim, c, action, bo) - return batchExecutor.process(batches) + return rateLim } func (c *twoPhaseCommitter) keyValueSize(key, value []byte) int { From 8de9ca0589f3a93a7391f813a6e9a7b6efdd8743 Mon Sep 17 00:00:00 2001 From: ekexium Date: Wed, 13 Nov 2024 15:55:10 +0800 Subject: [PATCH 2/5] feat: allow setting resolved lock concurrency for pipelined dml Signed-off-by: ekexium --- config/config.go | 11 ++++++++--- txnkv/transaction/pipelined_flush.go | 4 ++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/config/config.go b/config/config.go index ad214e358..184fe72e0 100644 --- a/config/config.go +++ b/config/config.go @@ -48,18 +48,23 @@ import ( ) var ( - globalConf atomic.Value - PipelinedFlushConcurrency atomic.Uint32 + globalConf atomic.Value + PipelinedFlushConcurrency atomic.Uint32 + PipelinedResolveConcurrency atomic.Uint32 ) const ( // DefStoresRefreshInterval is the default value of StoresRefreshInterval - DefStoresRefreshInterval = 60 + DefStoresRefreshInterval = 60 + DefPipelinedFlushConcurrency = 128 + DefPipelinedResolveConcurrency = 8 ) func init() { conf := DefaultConfig() StoreGlobalConfig(&conf) + PipelinedFlushConcurrency.Store(DefPipelinedFlushConcurrency) + PipelinedResolveConcurrency.Store(DefPipelinedResolveConcurrency) } // Config contains configuration options. diff --git a/txnkv/transaction/pipelined_flush.go b/txnkv/transaction/pipelined_flush.go index 8cadf63df..35d5c6df4 100644 --- a/txnkv/transaction/pipelined_flush.go +++ b/txnkv/transaction/pipelined_flush.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/prometheus/client_golang/prometheus" + "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/config/retry" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/internal/client" @@ -454,7 +455,6 @@ func (c *twoPhaseCommitter) buildPipelinedResolveHandler(commit bool, resolved * // resolveFlushedLocks resolves all locks in the given range [start, end) with the given status. // The resolve process is running in another goroutine so this function won't block. func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end []byte, commit bool) { - const RESOLVE_CONCURRENCY = 8 var resolved atomic.Uint64 handler, err := c.buildPipelinedResolveHandler(commit, &resolved) commitTs := uint64(0) @@ -481,7 +481,7 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end fmt.Sprintf("pipelined-dml-%s", status), fmt.Sprintf("pipelined-dml-%s-%d", status, c.startTS), c.store, - RESOLVE_CONCURRENCY, + int(config.PipelinedResolveConcurrency.Load()), handler, ) runner.SetStatLogInterval(30 * time.Second) From 58f362c71acbf3ec4c732d80ba37d751aff40eb2 Mon Sep 17 00:00:00 2001 From: ekexium Date: Wed, 13 Nov 2024 19:43:21 +0800 Subject: [PATCH 3/5] do not allow dynamic adjustment Signed-off-by: ekexium --- config/config.go | 10 ++------ integration_tests/pipelined_memdb_test.go | 30 +++++++++++------------ tikv/kv.go | 24 +++++++++++++++--- txnkv/transaction/2pc.go | 2 +- txnkv/transaction/pipelined_flush.go | 3 +-- txnkv/transaction/txn.go | 22 ++++++++++++----- 6 files changed, 56 insertions(+), 35 deletions(-) diff --git a/config/config.go b/config/config.go index 184fe72e0..8b062645a 100644 --- a/config/config.go +++ b/config/config.go @@ -48,23 +48,17 @@ import ( ) var ( - globalConf atomic.Value - PipelinedFlushConcurrency atomic.Uint32 - PipelinedResolveConcurrency atomic.Uint32 + globalConf atomic.Value ) const ( // DefStoresRefreshInterval is the default value of StoresRefreshInterval - DefStoresRefreshInterval = 60 - DefPipelinedFlushConcurrency = 128 - DefPipelinedResolveConcurrency = 8 + DefStoresRefreshInterval = 60 ) func init() { conf := DefaultConfig() StoreGlobalConfig(&conf) - PipelinedFlushConcurrency.Store(DefPipelinedFlushConcurrency) - PipelinedResolveConcurrency.Store(DefPipelinedResolveConcurrency) } // Config contains configuration options. diff --git a/integration_tests/pipelined_memdb_test.go b/integration_tests/pipelined_memdb_test.go index cfc240130..025d9df22 100644 --- a/integration_tests/pipelined_memdb_test.go +++ b/integration_tests/pipelined_memdb_test.go @@ -97,7 +97,7 @@ func (s *testPipelinedMemDBSuite) mustGetLock(key []byte) *txnkv.Lock { func (s *testPipelinedMemDBSuite) TestPipelinedAndFlush() { ctx := context.Background() - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) txn1, err := s.store.Begin() s.Nil(err) @@ -139,7 +139,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedAndFlush() { func (s *testPipelinedMemDBSuite) TestPipelinedMemDBBufferGet() { ctx := context.Background() - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) for i := 0; i < 100; i++ { key := []byte(strconv.Itoa(i)) @@ -161,7 +161,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedMemDBBufferGet() { } func (s *testPipelinedMemDBSuite) TestPipelinedFlushBlock() { - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) txn.Set([]byte("key1"), []byte("value1")) @@ -192,7 +192,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedFlushBlock() { } func (s *testPipelinedMemDBSuite) TestPipelinedSkipFlushedLock() { - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) txn.Set([]byte("key1"), []byte("value1")) flushed, err := txn.GetMemBuffer().Flush(true) @@ -206,7 +206,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedSkipFlushedLock() { s.Nil(txn.Commit(context.Background())) // can see it after commit - txn, err = s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err = s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) defer txn.Rollback() val, err := txn.Get(context.Background(), []byte("key1")) @@ -222,7 +222,7 @@ func (s *testPipelinedMemDBSuite) TestResolveLockRace() { failpoint.Disable("tikvclient/pipelinedCommitFail") }() for i := 0; i < 100; i++ { - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) startTS := txn.StartTS() s.Nil(err) for j := 0; j < 100; j++ { @@ -246,7 +246,7 @@ func (s *testPipelinedMemDBSuite) TestResolveLockRace() { } func (s *testPipelinedMemDBSuite) TestPipelinedCommit() { - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) for i := 0; i < 100; i++ { key := []byte(strconv.Itoa(i)) @@ -279,7 +279,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedCommit() { } // check the result - txn, err = s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err = s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) for i := 0; i < 100; i++ { key := []byte(strconv.Itoa(i)) @@ -290,7 +290,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedCommit() { } func (s *testPipelinedMemDBSuite) TestPipelinedRollback() { - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) startTS := txn.StartTS() s.Nil(err) keys := make([][]byte, 0, 100) @@ -304,7 +304,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedRollback() { s.Nil(txn.GetMemBuffer().FlushWait()) s.Nil(txn.Rollback()) s.Eventuallyf(func() bool { - txn, err := s.store.Begin(tikv.WithStartTS(startTS), tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithStartTS(startTS), tikv.WithDefaultPipelinedTxn()) s.Nil(err) defer func() { s.Nil(txn.Rollback()) }() storageBufferedValues, err := txn.GetSnapshot().BatchGetWithTier(context.Background(), keys, txnsnapshot.BatchGetBufferTier) @@ -322,7 +322,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedRollback() { func (s *testPipelinedMemDBSuite) TestPipelinedPrefetch() { failpoint.Enable("tikvclient/beforeSendReqToRegion", "return") defer failpoint.Disable("tikvclient/beforeSendReqToRegion") - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) mustFlush := func(txn *transaction.KVTxn) { @@ -384,7 +384,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedPrefetch() { s.Nil(txn.Commit(context.Background())) - txn, err = s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err = s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) txn.Set([]byte("100"), []byte("100")) // snapshot: [0, 1, ..., 99], membuffer: [100] m, err := txn.BatchGet(context.Background(), [][]byte{[]byte("99"), []byte("100"), []byte("101")}) @@ -417,7 +417,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedPrefetch() { txn.Rollback() // empty memdb should also cache the not exist result. - txn, err = s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err = s.store.Begin(tikv.WithDefaultPipelinedTxn()) // batch get cache: [99 -> not exist] m, err = txn.BatchGet(context.Background(), [][]byte{[]byte("99")}) s.Nil(err) @@ -433,7 +433,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedDMLFailedByPKRollback() { atomic.StoreUint64(&transaction.ManagedLockTTL, 100) // set to 100ms defer atomic.StoreUint64(&transaction.ManagedLockTTL, originManageTTLVal) - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) txn.Set([]byte("key1"), []byte("value1")) txnProbe := transaction.TxnProbe{KVTxn: txn} @@ -482,7 +482,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedDMLFailedByPKMaxTTLExceeded() { restoreGlobalConfFunc() }() - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) txn.Set([]byte("key1"), []byte("value1")) txnProbe := transaction.TxnProbe{KVTxn: txn} diff --git a/tikv/kv.go b/tikv/kv.go index db375ae57..d7d5871d6 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -80,6 +80,9 @@ const ( // DCLabelKey indicates the key of label which represents the dc for Store. DCLabelKey = "zone" safeTSUpdateInterval = time.Second * 2 + + defaultPipelinedFlushConcurrency = 128 + defaultPipelinedResolveLockConcurrency = 8 ) func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, error) { @@ -944,10 +947,25 @@ func WithStartTS(startTS uint64) TxnOption { } } -// WithPipelinedMemDB creates transaction with pipelined memdb. -func WithPipelinedMemDB() TxnOption { +// WithDefaultPipelinedTxn creates pipelined txn with default parameters +func WithDefaultPipelinedTxn() TxnOption { return func(st *transaction.TxnOptions) { - st.PipelinedMemDB = true + st.PipelinedTxn = transaction.PipelinedTxnOptions{ + Enable: true, + FlushConcurrency: defaultPipelinedFlushConcurrency, + ResolveLockConcurrency: defaultPipelinedResolveLockConcurrency, + } + } +} + +// WithPipelinedTxn creates pipelined txn with specified parameters +func WithPipelinedTxn(flushConcurrency, resolveLockConcurrency int) TxnOption { + return func(st *transaction.TxnOptions) { + st.PipelinedTxn = transaction.PipelinedTxnOptions{ + Enable: true, + FlushConcurrency: flushConcurrency, + ResolveLockConcurrency: resolveLockConcurrency, + } } } diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index bdcffca25..bc46eb1d0 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -1117,7 +1117,7 @@ func (c *twoPhaseCommitter) calcActionConcurrency( // TODO: Find a self-adaptive way to control the rate limit here. switch action.(type) { case actionPipelinedFlush: - rateLim = min(rateLim, max(1, int(config.PipelinedFlushConcurrency.Load()))) + rateLim = min(rateLim, max(1, c.txn.pipelinedFlushConcurrency)) default: if rateLim > config.GetGlobalConfig().CommitterConcurrency { rateLim = config.GetGlobalConfig().CommitterConcurrency diff --git a/txnkv/transaction/pipelined_flush.go b/txnkv/transaction/pipelined_flush.go index 35d5c6df4..931a5a219 100644 --- a/txnkv/transaction/pipelined_flush.go +++ b/txnkv/transaction/pipelined_flush.go @@ -28,7 +28,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/prometheus/client_golang/prometheus" - "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/config/retry" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/internal/client" @@ -481,7 +480,7 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end fmt.Sprintf("pipelined-dml-%s", status), fmt.Sprintf("pipelined-dml-%s-%d", status, c.startTS), c.store, - int(config.PipelinedResolveConcurrency.Load()), + c.txn.pipelinedResolveLockConcurrency, handler, ) runner.SetStatLogInterval(30 * time.Second) diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 7c124a678..14b06fb64 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -109,12 +109,18 @@ func (e *tempLockBufferEntry) trySkipLockingOnRetry(returnValue bool, checkExist return true } +type PipelinedTxnOptions struct { + Enable bool + FlushConcurrency int + ResolveLockConcurrency int +} + // TxnOptions indicates the option when beginning a transaction. // TxnOptions are set by the TxnOption values passed to Begin type TxnOptions struct { - TxnScope string - StartTS *uint64 - PipelinedMemDB bool + TxnScope string + StartTS *uint64 + PipelinedTxn PipelinedTxnOptions } // KVTxn contains methods to interact with a TiKV transaction. @@ -170,8 +176,10 @@ type KVTxn struct { forUpdateTSChecks map[string]uint64 - isPipelined bool - pipelinedCancel context.CancelFunc + isPipelined bool + pipelinedCancel context.CancelFunc + pipelinedFlushConcurrency int + pipelinedResolveLockConcurrency int } // NewTiKVTxn creates a new KVTxn. @@ -190,10 +198,12 @@ func NewTiKVTxn(store kvstore, snapshot *txnsnapshot.KVSnapshot, startTS uint64, diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull, RequestSource: snapshot.RequestSource, } - if !options.PipelinedMemDB { + if !options.PipelinedTxn.Enable { newTiKVTxn.us = unionstore.NewUnionStore(unionstore.NewMemDB(), snapshot) return newTiKVTxn, nil } + newTiKVTxn.pipelinedFlushConcurrency = options.PipelinedTxn.FlushConcurrency + newTiKVTxn.pipelinedResolveLockConcurrency = options.PipelinedTxn.ResolveLockConcurrency if err := newTiKVTxn.InitPipelinedMemDB(); err != nil { return nil, err } From d3adfc2e29791cf8b0c18424dd15325f2ce04f26 Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 5 Dec 2024 22:49:46 +0800 Subject: [PATCH 4/5] pipelined dml throttle Signed-off-by: ekexium --- go.mod | 1 + go.sum | 2 ++ tikv/kv.go | 6 +++- txnkv/transaction/txn.go | 68 +++++++++++++++++++++++++++++++++------- 4 files changed, 64 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index d2ff6fc5e..9b8d05afb 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/tikv/client-go/v2 go 1.23 require ( + github.com/VividCortex/ewma v1.2.0 github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da github.com/docker/go-units v0.5.0 diff --git a/go.sum b/go.sum index 60814ed10..90a890ffc 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= +github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= diff --git a/tikv/kv.go b/tikv/kv.go index d7d5871d6..5a8dbb673 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -83,6 +83,7 @@ const ( defaultPipelinedFlushConcurrency = 128 defaultPipelinedResolveLockConcurrency = 8 + defaultPipelinedWriteSpeedRatio = 1.0 ) func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, error) { @@ -954,17 +955,20 @@ func WithDefaultPipelinedTxn() TxnOption { Enable: true, FlushConcurrency: defaultPipelinedFlushConcurrency, ResolveLockConcurrency: defaultPipelinedResolveLockConcurrency, + WriteSpeedRatio: defaultPipelinedWriteSpeedRatio, } } } // WithPipelinedTxn creates pipelined txn with specified parameters -func WithPipelinedTxn(flushConcurrency, resolveLockConcurrency int) TxnOption { +func WithPipelinedTxn(flushConcurrency, resolveLockConcurrency int, + writeSpeedRatio float64) TxnOption { return func(st *transaction.TxnOptions) { st.PipelinedTxn = transaction.PipelinedTxnOptions{ Enable: true, FlushConcurrency: flushConcurrency, ResolveLockConcurrency: resolveLockConcurrency, + WriteSpeedRatio: writeSpeedRatio, } } } diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index fce299b27..23addcf4d 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -49,6 +49,7 @@ import ( "sync/atomic" "time" + "github.com/VividCortex/ewma" "github.com/dgryski/go-farm" "github.com/docker/go-units" "github.com/opentracing/opentracing-go" @@ -74,6 +75,7 @@ import ( // MaxTxnTimeUse is the max time a Txn may use (in ms) from its begin to commit. // We use it to abort the transaction to guarantee GC worker will not influence it. const MaxTxnTimeUse = 24 * 60 * 60 * 1000 +const defaultEWMAAge = 10 type tempLockBufferEntry struct { HasReturnValue bool @@ -113,6 +115,8 @@ type PipelinedTxnOptions struct { Enable bool FlushConcurrency int ResolveLockConcurrency int + // (0,1], 1 = no sleep + WriteSpeedRatio float64 } // TxnOptions indicates the option when beginning a transaction. @@ -180,23 +184,27 @@ type KVTxn struct { pipelinedCancel context.CancelFunc pipelinedFlushConcurrency int pipelinedResolveLockConcurrency int + writeSpeedRatio float64 + // flushBatchDurationEWMA is read before each flush, and written after each flush => no race + flushBatchDurationEWMA ewma.MovingAverage } // NewTiKVTxn creates a new KVTxn. func NewTiKVTxn(store kvstore, snapshot *txnsnapshot.KVSnapshot, startTS uint64, options *TxnOptions) (*KVTxn, error) { cfg := config.GetGlobalConfig() newTiKVTxn := &KVTxn{ - snapshot: snapshot, - store: store, - startTS: startTS, - startTime: time.Now(), - valid: true, - vars: tikv.DefaultVars, - scope: options.TxnScope, - enableAsyncCommit: cfg.EnableAsyncCommit, - enable1PC: cfg.Enable1PC, - diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull, - RequestSource: snapshot.RequestSource, + snapshot: snapshot, + store: store, + startTS: startTS, + startTime: time.Now(), + valid: true, + vars: tikv.DefaultVars, + scope: options.TxnScope, + enableAsyncCommit: cfg.EnableAsyncCommit, + enable1PC: cfg.Enable1PC, + diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull, + RequestSource: snapshot.RequestSource, + flushBatchDurationEWMA: ewma.NewMovingAverage(defaultEWMAAge), } if !options.PipelinedTxn.Enable { newTiKVTxn.us = unionstore.NewUnionStore(unionstore.NewMemDB(), snapshot) @@ -204,6 +212,7 @@ func NewTiKVTxn(store kvstore, snapshot *txnsnapshot.KVSnapshot, startTS uint64, } newTiKVTxn.pipelinedFlushConcurrency = options.PipelinedTxn.FlushConcurrency newTiKVTxn.pipelinedResolveLockConcurrency = options.PipelinedTxn.ResolveLockConcurrency + newTiKVTxn.writeSpeedRatio = options.PipelinedTxn.WriteSpeedRatio if err := newTiKVTxn.InitPipelinedMemDB(); err != nil { return nil, err } @@ -637,7 +646,15 @@ func (txn *KVTxn) InitPipelinedMemDB() error { } mutations.Push(op, false, mustExist, mustNotExist, flags.HasNeedConstraintCheckInPrewrite(), it.Handle()) } - return txn.committer.pipelinedFlushMutations(bo, mutations, generation) + txn.throttle() + flushStart := time.Now() + err = txn.committer.pipelinedFlushMutations(bo, mutations, generation) + if txn.flushBatchDurationEWMA.Value() == 0 { + txn.flushBatchDurationEWMA.Set(float64(time.Since(flushStart).Milliseconds())) + } else { + txn.flushBatchDurationEWMA.Add(float64(time.Since(flushStart).Milliseconds())) + } + return err }) txn.committer.priority = txn.priority.ToPB() txn.committer.syncLog = txn.syncLog @@ -648,6 +665,33 @@ func (txn *KVTxn) InitPipelinedMemDB() error { return nil } +func (txn *KVTxn) throttle() { + if txn.writeSpeedRatio > 1 || txn.writeSpeedRatio <= 0 { + logutil.BgLogger().Error( + "[pipelined dml] invalid write speed ratio", + zap.Float64("writeSpeedRatio", txn.writeSpeedRatio), + zap.Uint64("session", txn.committer.sessionID), + zap.Uint64("startTS", txn.startTS), + ) + return + } + + expectedFlushMs := txn.flushBatchDurationEWMA.Value() + // T_sleep / (T_sleep + T_flush) = 1 - writeSpeedRatio + sleepMs := int((1.0 - txn.writeSpeedRatio) / txn.writeSpeedRatio * expectedFlushMs) + if sleepMs == 0 { + return + } + logutil.BgLogger().Info( + "[pipelined dml] throttle", + zap.Int("sleepMs", sleepMs), + zap.Float64("writeSpeedRatio", txn.writeSpeedRatio), + zap.Uint64("session", txn.committer.sessionID), + zap.Uint64("startTS", txn.startTS), + ) + time.Sleep(time.Duration(sleepMs) * time.Millisecond) +} + // IsCasualConsistency returns if the transaction allows linearizability // inconsistency. func (txn *KVTxn) IsCasualConsistency() bool { From 6b75b34db81fd166ca3ec77508bb3cc0d7d210f2 Mon Sep 17 00:00:00 2001 From: ekexium Date: Tue, 10 Dec 2024 16:53:14 +0800 Subject: [PATCH 5/5] change name to write_throttle_ratio Signed-off-by: ekexium --- metrics/metrics.go | 11 +++++++++++ tikv/kv.go | 13 ++++++++----- txnkv/transaction/txn.go | 19 ++++++++++--------- 3 files changed, 29 insertions(+), 14 deletions(-) diff --git a/metrics/metrics.go b/metrics/metrics.go index ce247600c..a566473c0 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -117,6 +117,7 @@ var ( TiKVPipelinedFlushDuration prometheus.Histogram TiKVValidateReadTSFromPDCount prometheus.Counter TiKVLowResolutionTSOUpdateIntervalSecondsGauge prometheus.Gauge + TiKVPipelinedFlushThrottleSecondsHistogram prometheus.Histogram ) // Label constants. @@ -852,6 +853,15 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Help: "The actual working update interval for the low resolution TSO. As there are adaptive mechanism internally, this value may differ from the config.", }) + TiKVPipelinedFlushThrottleSecondsHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "pipelined_flush_throttle_seconds", + Help: "Throttle durations of pipelined flushes.", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 28), // 0.5ms ~ 18h + }) + initShortcuts() } @@ -948,6 +958,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVPipelinedFlushDuration) prometheus.MustRegister(TiKVValidateReadTSFromPDCount) prometheus.MustRegister(TiKVLowResolutionTSOUpdateIntervalSecondsGauge) + prometheus.MustRegister(TiKVPipelinedFlushThrottleSecondsHistogram) } // readCounter reads the value of a prometheus.Counter. diff --git a/tikv/kv.go b/tikv/kv.go index 5a8dbb673..9828683c1 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -83,7 +83,7 @@ const ( defaultPipelinedFlushConcurrency = 128 defaultPipelinedResolveLockConcurrency = 8 - defaultPipelinedWriteSpeedRatio = 1.0 + defaultPipelinedWriteThrottleRatio = 0.0 ) func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, error) { @@ -955,20 +955,23 @@ func WithDefaultPipelinedTxn() TxnOption { Enable: true, FlushConcurrency: defaultPipelinedFlushConcurrency, ResolveLockConcurrency: defaultPipelinedResolveLockConcurrency, - WriteSpeedRatio: defaultPipelinedWriteSpeedRatio, + WriteThrottleRatio: defaultPipelinedWriteThrottleRatio, } } } // WithPipelinedTxn creates pipelined txn with specified parameters -func WithPipelinedTxn(flushConcurrency, resolveLockConcurrency int, - writeSpeedRatio float64) TxnOption { +func WithPipelinedTxn( + flushConcurrency, + resolveLockConcurrency int, + writeThrottleRatio float64, +) TxnOption { return func(st *transaction.TxnOptions) { st.PipelinedTxn = transaction.PipelinedTxnOptions{ Enable: true, FlushConcurrency: flushConcurrency, ResolveLockConcurrency: resolveLockConcurrency, - WriteSpeedRatio: writeSpeedRatio, + WriteThrottleRatio: writeThrottleRatio, } } } diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 23addcf4d..8605d79f6 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -115,8 +115,8 @@ type PipelinedTxnOptions struct { Enable bool FlushConcurrency int ResolveLockConcurrency int - // (0,1], 1 = no sleep - WriteSpeedRatio float64 + // [0,1), 0 = no sleep, 1 = no write + WriteThrottleRatio float64 } // TxnOptions indicates the option when beginning a transaction. @@ -184,7 +184,7 @@ type KVTxn struct { pipelinedCancel context.CancelFunc pipelinedFlushConcurrency int pipelinedResolveLockConcurrency int - writeSpeedRatio float64 + writeThrottleRatio float64 // flushBatchDurationEWMA is read before each flush, and written after each flush => no race flushBatchDurationEWMA ewma.MovingAverage } @@ -212,7 +212,7 @@ func NewTiKVTxn(store kvstore, snapshot *txnsnapshot.KVSnapshot, startTS uint64, } newTiKVTxn.pipelinedFlushConcurrency = options.PipelinedTxn.FlushConcurrency newTiKVTxn.pipelinedResolveLockConcurrency = options.PipelinedTxn.ResolveLockConcurrency - newTiKVTxn.writeSpeedRatio = options.PipelinedTxn.WriteSpeedRatio + newTiKVTxn.writeThrottleRatio = options.PipelinedTxn.WriteThrottleRatio if err := newTiKVTxn.InitPipelinedMemDB(); err != nil { return nil, err } @@ -666,10 +666,10 @@ func (txn *KVTxn) InitPipelinedMemDB() error { } func (txn *KVTxn) throttle() { - if txn.writeSpeedRatio > 1 || txn.writeSpeedRatio <= 0 { + if txn.writeThrottleRatio >= 1 || txn.writeThrottleRatio < 0 { logutil.BgLogger().Error( "[pipelined dml] invalid write speed ratio", - zap.Float64("writeSpeedRatio", txn.writeSpeedRatio), + zap.Float64("writeThrottleRatio", txn.writeThrottleRatio), zap.Uint64("session", txn.committer.sessionID), zap.Uint64("startTS", txn.startTS), ) @@ -677,15 +677,16 @@ func (txn *KVTxn) throttle() { } expectedFlushMs := txn.flushBatchDurationEWMA.Value() - // T_sleep / (T_sleep + T_flush) = 1 - writeSpeedRatio - sleepMs := int((1.0 - txn.writeSpeedRatio) / txn.writeSpeedRatio * expectedFlushMs) + // T_sleep / (T_sleep + T_flush) = writeThrottleRatio + sleepMs := int(txn.writeThrottleRatio / (1.0 - txn.writeThrottleRatio) * expectedFlushMs) + metrics.TiKVPipelinedFlushThrottleSecondsHistogram.Observe(float64(sleepMs) / 1000) if sleepMs == 0 { return } logutil.BgLogger().Info( "[pipelined dml] throttle", zap.Int("sleepMs", sleepMs), - zap.Float64("writeSpeedRatio", txn.writeSpeedRatio), + zap.Float64("writeThrottleRatio", txn.writeThrottleRatio), zap.Uint64("session", txn.committer.sessionID), zap.Uint64("startTS", txn.startTS), )