diff --git a/config/config.go b/config/config.go index 184fe72e0..8b062645a 100644 --- a/config/config.go +++ b/config/config.go @@ -48,23 +48,17 @@ import ( ) var ( - globalConf atomic.Value - PipelinedFlushConcurrency atomic.Uint32 - PipelinedResolveConcurrency atomic.Uint32 + globalConf atomic.Value ) const ( // DefStoresRefreshInterval is the default value of StoresRefreshInterval - DefStoresRefreshInterval = 60 - DefPipelinedFlushConcurrency = 128 - DefPipelinedResolveConcurrency = 8 + DefStoresRefreshInterval = 60 ) func init() { conf := DefaultConfig() StoreGlobalConfig(&conf) - PipelinedFlushConcurrency.Store(DefPipelinedFlushConcurrency) - PipelinedResolveConcurrency.Store(DefPipelinedResolveConcurrency) } // Config contains configuration options. diff --git a/integration_tests/pipelined_memdb_test.go b/integration_tests/pipelined_memdb_test.go index cfc240130..025d9df22 100644 --- a/integration_tests/pipelined_memdb_test.go +++ b/integration_tests/pipelined_memdb_test.go @@ -97,7 +97,7 @@ func (s *testPipelinedMemDBSuite) mustGetLock(key []byte) *txnkv.Lock { func (s *testPipelinedMemDBSuite) TestPipelinedAndFlush() { ctx := context.Background() - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) txn1, err := s.store.Begin() s.Nil(err) @@ -139,7 +139,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedAndFlush() { func (s *testPipelinedMemDBSuite) TestPipelinedMemDBBufferGet() { ctx := context.Background() - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) for i := 0; i < 100; i++ { key := []byte(strconv.Itoa(i)) @@ -161,7 +161,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedMemDBBufferGet() { } func (s *testPipelinedMemDBSuite) TestPipelinedFlushBlock() { - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) txn.Set([]byte("key1"), []byte("value1")) @@ -192,7 +192,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedFlushBlock() { } func (s *testPipelinedMemDBSuite) TestPipelinedSkipFlushedLock() { - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) txn.Set([]byte("key1"), []byte("value1")) flushed, err := txn.GetMemBuffer().Flush(true) @@ -206,7 +206,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedSkipFlushedLock() { s.Nil(txn.Commit(context.Background())) // can see it after commit - txn, err = s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err = s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) defer txn.Rollback() val, err := txn.Get(context.Background(), []byte("key1")) @@ -222,7 +222,7 @@ func (s *testPipelinedMemDBSuite) TestResolveLockRace() { failpoint.Disable("tikvclient/pipelinedCommitFail") }() for i := 0; i < 100; i++ { - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) startTS := txn.StartTS() s.Nil(err) for j := 0; j < 100; j++ { @@ -246,7 +246,7 @@ func (s *testPipelinedMemDBSuite) TestResolveLockRace() { } func (s *testPipelinedMemDBSuite) TestPipelinedCommit() { - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) for i := 0; i < 100; i++ { key := []byte(strconv.Itoa(i)) @@ -279,7 +279,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedCommit() { } // check the result - txn, err = s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err = s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) for i := 0; i < 100; i++ { key := []byte(strconv.Itoa(i)) @@ -290,7 +290,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedCommit() { } func (s *testPipelinedMemDBSuite) TestPipelinedRollback() { - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) startTS := txn.StartTS() s.Nil(err) keys := make([][]byte, 0, 100) @@ -304,7 +304,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedRollback() { s.Nil(txn.GetMemBuffer().FlushWait()) s.Nil(txn.Rollback()) s.Eventuallyf(func() bool { - txn, err := s.store.Begin(tikv.WithStartTS(startTS), tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithStartTS(startTS), tikv.WithDefaultPipelinedTxn()) s.Nil(err) defer func() { s.Nil(txn.Rollback()) }() storageBufferedValues, err := txn.GetSnapshot().BatchGetWithTier(context.Background(), keys, txnsnapshot.BatchGetBufferTier) @@ -322,7 +322,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedRollback() { func (s *testPipelinedMemDBSuite) TestPipelinedPrefetch() { failpoint.Enable("tikvclient/beforeSendReqToRegion", "return") defer failpoint.Disable("tikvclient/beforeSendReqToRegion") - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) mustFlush := func(txn *transaction.KVTxn) { @@ -384,7 +384,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedPrefetch() { s.Nil(txn.Commit(context.Background())) - txn, err = s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err = s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) txn.Set([]byte("100"), []byte("100")) // snapshot: [0, 1, ..., 99], membuffer: [100] m, err := txn.BatchGet(context.Background(), [][]byte{[]byte("99"), []byte("100"), []byte("101")}) @@ -417,7 +417,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedPrefetch() { txn.Rollback() // empty memdb should also cache the not exist result. - txn, err = s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err = s.store.Begin(tikv.WithDefaultPipelinedTxn()) // batch get cache: [99 -> not exist] m, err = txn.BatchGet(context.Background(), [][]byte{[]byte("99")}) s.Nil(err) @@ -433,7 +433,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedDMLFailedByPKRollback() { atomic.StoreUint64(&transaction.ManagedLockTTL, 100) // set to 100ms defer atomic.StoreUint64(&transaction.ManagedLockTTL, originManageTTLVal) - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) txn.Set([]byte("key1"), []byte("value1")) txnProbe := transaction.TxnProbe{KVTxn: txn} @@ -482,7 +482,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedDMLFailedByPKMaxTTLExceeded() { restoreGlobalConfFunc() }() - txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn()) s.Nil(err) txn.Set([]byte("key1"), []byte("value1")) txnProbe := transaction.TxnProbe{KVTxn: txn} diff --git a/tikv/kv.go b/tikv/kv.go index db375ae57..d7d5871d6 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -80,6 +80,9 @@ const ( // DCLabelKey indicates the key of label which represents the dc for Store. DCLabelKey = "zone" safeTSUpdateInterval = time.Second * 2 + + defaultPipelinedFlushConcurrency = 128 + defaultPipelinedResolveLockConcurrency = 8 ) func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, error) { @@ -944,10 +947,25 @@ func WithStartTS(startTS uint64) TxnOption { } } -// WithPipelinedMemDB creates transaction with pipelined memdb. -func WithPipelinedMemDB() TxnOption { +// WithDefaultPipelinedTxn creates pipelined txn with default parameters +func WithDefaultPipelinedTxn() TxnOption { return func(st *transaction.TxnOptions) { - st.PipelinedMemDB = true + st.PipelinedTxn = transaction.PipelinedTxnOptions{ + Enable: true, + FlushConcurrency: defaultPipelinedFlushConcurrency, + ResolveLockConcurrency: defaultPipelinedResolveLockConcurrency, + } + } +} + +// WithPipelinedTxn creates pipelined txn with specified parameters +func WithPipelinedTxn(flushConcurrency, resolveLockConcurrency int) TxnOption { + return func(st *transaction.TxnOptions) { + st.PipelinedTxn = transaction.PipelinedTxnOptions{ + Enable: true, + FlushConcurrency: flushConcurrency, + ResolveLockConcurrency: resolveLockConcurrency, + } } } diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index bdcffca25..bc46eb1d0 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -1117,7 +1117,7 @@ func (c *twoPhaseCommitter) calcActionConcurrency( // TODO: Find a self-adaptive way to control the rate limit here. switch action.(type) { case actionPipelinedFlush: - rateLim = min(rateLim, max(1, int(config.PipelinedFlushConcurrency.Load()))) + rateLim = min(rateLim, max(1, c.txn.pipelinedFlushConcurrency)) default: if rateLim > config.GetGlobalConfig().CommitterConcurrency { rateLim = config.GetGlobalConfig().CommitterConcurrency diff --git a/txnkv/transaction/pipelined_flush.go b/txnkv/transaction/pipelined_flush.go index 35d5c6df4..931a5a219 100644 --- a/txnkv/transaction/pipelined_flush.go +++ b/txnkv/transaction/pipelined_flush.go @@ -28,7 +28,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/prometheus/client_golang/prometheus" - "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/config/retry" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/internal/client" @@ -481,7 +480,7 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end fmt.Sprintf("pipelined-dml-%s", status), fmt.Sprintf("pipelined-dml-%s-%d", status, c.startTS), c.store, - int(config.PipelinedResolveConcurrency.Load()), + c.txn.pipelinedResolveLockConcurrency, handler, ) runner.SetStatLogInterval(30 * time.Second) diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 7c124a678..14b06fb64 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -109,12 +109,18 @@ func (e *tempLockBufferEntry) trySkipLockingOnRetry(returnValue bool, checkExist return true } +type PipelinedTxnOptions struct { + Enable bool + FlushConcurrency int + ResolveLockConcurrency int +} + // TxnOptions indicates the option when beginning a transaction. // TxnOptions are set by the TxnOption values passed to Begin type TxnOptions struct { - TxnScope string - StartTS *uint64 - PipelinedMemDB bool + TxnScope string + StartTS *uint64 + PipelinedTxn PipelinedTxnOptions } // KVTxn contains methods to interact with a TiKV transaction. @@ -170,8 +176,10 @@ type KVTxn struct { forUpdateTSChecks map[string]uint64 - isPipelined bool - pipelinedCancel context.CancelFunc + isPipelined bool + pipelinedCancel context.CancelFunc + pipelinedFlushConcurrency int + pipelinedResolveLockConcurrency int } // NewTiKVTxn creates a new KVTxn. @@ -190,10 +198,12 @@ func NewTiKVTxn(store kvstore, snapshot *txnsnapshot.KVSnapshot, startTS uint64, diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull, RequestSource: snapshot.RequestSource, } - if !options.PipelinedMemDB { + if !options.PipelinedTxn.Enable { newTiKVTxn.us = unionstore.NewUnionStore(unionstore.NewMemDB(), snapshot) return newTiKVTxn, nil } + newTiKVTxn.pipelinedFlushConcurrency = options.PipelinedTxn.FlushConcurrency + newTiKVTxn.pipelinedResolveLockConcurrency = options.PipelinedTxn.ResolveLockConcurrency if err := newTiKVTxn.InitPipelinedMemDB(); err != nil { return nil, err }