Skip to content

Commit

Permalink
do not allow dynamic adjustment
Browse files Browse the repository at this point in the history
Signed-off-by: ekexium <[email protected]>
  • Loading branch information
ekexium committed Nov 13, 2024
1 parent 222c260 commit 58f362c
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 35 deletions.
10 changes: 2 additions & 8 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,17 @@ import (
)

var (
globalConf atomic.Value
PipelinedFlushConcurrency atomic.Uint32
PipelinedResolveConcurrency atomic.Uint32
globalConf atomic.Value
)

const (
// DefStoresRefreshInterval is the default value of StoresRefreshInterval
DefStoresRefreshInterval = 60
DefPipelinedFlushConcurrency = 128
DefPipelinedResolveConcurrency = 8
DefStoresRefreshInterval = 60
)

func init() {
conf := DefaultConfig()
StoreGlobalConfig(&conf)
PipelinedFlushConcurrency.Store(DefPipelinedFlushConcurrency)
PipelinedResolveConcurrency.Store(DefPipelinedResolveConcurrency)
}

// Config contains configuration options.
Expand Down
30 changes: 15 additions & 15 deletions integration_tests/pipelined_memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (s *testPipelinedMemDBSuite) mustGetLock(key []byte) *txnkv.Lock {

func (s *testPipelinedMemDBSuite) TestPipelinedAndFlush() {
ctx := context.Background()
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
txn1, err := s.store.Begin()
s.Nil(err)
Expand Down Expand Up @@ -139,7 +139,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedAndFlush() {

func (s *testPipelinedMemDBSuite) TestPipelinedMemDBBufferGet() {
ctx := context.Background()
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
for i := 0; i < 100; i++ {
key := []byte(strconv.Itoa(i))
Expand All @@ -161,7 +161,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedMemDBBufferGet() {
}

func (s *testPipelinedMemDBSuite) TestPipelinedFlushBlock() {
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
txn.Set([]byte("key1"), []byte("value1"))

Expand Down Expand Up @@ -192,7 +192,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedFlushBlock() {
}

func (s *testPipelinedMemDBSuite) TestPipelinedSkipFlushedLock() {
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
txn.Set([]byte("key1"), []byte("value1"))
flushed, err := txn.GetMemBuffer().Flush(true)
Expand All @@ -206,7 +206,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedSkipFlushedLock() {
s.Nil(txn.Commit(context.Background()))

// can see it after commit
txn, err = s.store.Begin(tikv.WithPipelinedMemDB())
txn, err = s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
defer txn.Rollback()
val, err := txn.Get(context.Background(), []byte("key1"))
Expand All @@ -222,7 +222,7 @@ func (s *testPipelinedMemDBSuite) TestResolveLockRace() {
failpoint.Disable("tikvclient/pipelinedCommitFail")
}()
for i := 0; i < 100; i++ {
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
startTS := txn.StartTS()
s.Nil(err)
for j := 0; j < 100; j++ {
Expand All @@ -246,7 +246,7 @@ func (s *testPipelinedMemDBSuite) TestResolveLockRace() {
}

func (s *testPipelinedMemDBSuite) TestPipelinedCommit() {
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
for i := 0; i < 100; i++ {
key := []byte(strconv.Itoa(i))
Expand Down Expand Up @@ -279,7 +279,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedCommit() {
}

// check the result
txn, err = s.store.Begin(tikv.WithPipelinedMemDB())
txn, err = s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
for i := 0; i < 100; i++ {
key := []byte(strconv.Itoa(i))
Expand All @@ -290,7 +290,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedCommit() {
}

func (s *testPipelinedMemDBSuite) TestPipelinedRollback() {
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
startTS := txn.StartTS()
s.Nil(err)
keys := make([][]byte, 0, 100)
Expand All @@ -304,7 +304,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedRollback() {
s.Nil(txn.GetMemBuffer().FlushWait())
s.Nil(txn.Rollback())
s.Eventuallyf(func() bool {
txn, err := s.store.Begin(tikv.WithStartTS(startTS), tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithStartTS(startTS), tikv.WithDefaultPipelinedTxn())
s.Nil(err)
defer func() { s.Nil(txn.Rollback()) }()
storageBufferedValues, err := txn.GetSnapshot().BatchGetWithTier(context.Background(), keys, txnsnapshot.BatchGetBufferTier)
Expand All @@ -322,7 +322,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedRollback() {
func (s *testPipelinedMemDBSuite) TestPipelinedPrefetch() {
failpoint.Enable("tikvclient/beforeSendReqToRegion", "return")
defer failpoint.Disable("tikvclient/beforeSendReqToRegion")
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)

mustFlush := func(txn *transaction.KVTxn) {
Expand Down Expand Up @@ -384,7 +384,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedPrefetch() {

s.Nil(txn.Commit(context.Background()))

txn, err = s.store.Begin(tikv.WithPipelinedMemDB())
txn, err = s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
txn.Set([]byte("100"), []byte("100")) // snapshot: [0, 1, ..., 99], membuffer: [100]
m, err := txn.BatchGet(context.Background(), [][]byte{[]byte("99"), []byte("100"), []byte("101")})
Expand Down Expand Up @@ -417,7 +417,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedPrefetch() {
txn.Rollback()

// empty memdb should also cache the not exist result.
txn, err = s.store.Begin(tikv.WithPipelinedMemDB())
txn, err = s.store.Begin(tikv.WithDefaultPipelinedTxn())
// batch get cache: [99 -> not exist]
m, err = txn.BatchGet(context.Background(), [][]byte{[]byte("99")})
s.Nil(err)
Expand All @@ -433,7 +433,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedDMLFailedByPKRollback() {
atomic.StoreUint64(&transaction.ManagedLockTTL, 100) // set to 100ms
defer atomic.StoreUint64(&transaction.ManagedLockTTL, originManageTTLVal)

txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
txn.Set([]byte("key1"), []byte("value1"))
txnProbe := transaction.TxnProbe{KVTxn: txn}
Expand Down Expand Up @@ -482,7 +482,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedDMLFailedByPKMaxTTLExceeded() {
restoreGlobalConfFunc()
}()

txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
txn.Set([]byte("key1"), []byte("value1"))
txnProbe := transaction.TxnProbe{KVTxn: txn}
Expand Down
24 changes: 21 additions & 3 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ const (
// DCLabelKey indicates the key of label which represents the dc for Store.
DCLabelKey = "zone"
safeTSUpdateInterval = time.Second * 2

defaultPipelinedFlushConcurrency = 128
defaultPipelinedResolveLockConcurrency = 8
)

func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, error) {
Expand Down Expand Up @@ -944,10 +947,25 @@ func WithStartTS(startTS uint64) TxnOption {
}
}

// WithPipelinedMemDB creates transaction with pipelined memdb.
func WithPipelinedMemDB() TxnOption {
// WithDefaultPipelinedTxn creates pipelined txn with default parameters
func WithDefaultPipelinedTxn() TxnOption {
return func(st *transaction.TxnOptions) {
st.PipelinedMemDB = true
st.PipelinedTxn = transaction.PipelinedTxnOptions{
Enable: true,
FlushConcurrency: defaultPipelinedFlushConcurrency,
ResolveLockConcurrency: defaultPipelinedResolveLockConcurrency,
}
}
}

// WithPipelinedTxn creates pipelined txn with specified parameters
func WithPipelinedTxn(flushConcurrency, resolveLockConcurrency int) TxnOption {
return func(st *transaction.TxnOptions) {
st.PipelinedTxn = transaction.PipelinedTxnOptions{
Enable: true,
FlushConcurrency: flushConcurrency,
ResolveLockConcurrency: resolveLockConcurrency,
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1117,7 +1117,7 @@ func (c *twoPhaseCommitter) calcActionConcurrency(
// TODO: Find a self-adaptive way to control the rate limit here.
switch action.(type) {
case actionPipelinedFlush:
rateLim = min(rateLim, max(1, int(config.PipelinedFlushConcurrency.Load())))
rateLim = min(rateLim, max(1, c.txn.pipelinedFlushConcurrency))
default:
if rateLim > config.GetGlobalConfig().CommitterConcurrency {
rateLim = config.GetGlobalConfig().CommitterConcurrency
Expand Down
3 changes: 1 addition & 2 deletions txnkv/transaction/pipelined_flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/config/retry"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/internal/client"
Expand Down Expand Up @@ -481,7 +480,7 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end
fmt.Sprintf("pipelined-dml-%s", status),
fmt.Sprintf("pipelined-dml-%s-%d", status, c.startTS),
c.store,
int(config.PipelinedResolveConcurrency.Load()),
c.txn.pipelinedResolveLockConcurrency,
handler,
)
runner.SetStatLogInterval(30 * time.Second)
Expand Down
22 changes: 16 additions & 6 deletions txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,18 @@ func (e *tempLockBufferEntry) trySkipLockingOnRetry(returnValue bool, checkExist
return true
}

type PipelinedTxnOptions struct {
Enable bool
FlushConcurrency int
ResolveLockConcurrency int
}

// TxnOptions indicates the option when beginning a transaction.
// TxnOptions are set by the TxnOption values passed to Begin
type TxnOptions struct {
TxnScope string
StartTS *uint64
PipelinedMemDB bool
TxnScope string
StartTS *uint64
PipelinedTxn PipelinedTxnOptions
}

// KVTxn contains methods to interact with a TiKV transaction.
Expand Down Expand Up @@ -170,8 +176,10 @@ type KVTxn struct {

forUpdateTSChecks map[string]uint64

isPipelined bool
pipelinedCancel context.CancelFunc
isPipelined bool
pipelinedCancel context.CancelFunc
pipelinedFlushConcurrency int
pipelinedResolveLockConcurrency int
}

// NewTiKVTxn creates a new KVTxn.
Expand All @@ -190,10 +198,12 @@ func NewTiKVTxn(store kvstore, snapshot *txnsnapshot.KVSnapshot, startTS uint64,
diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull,
RequestSource: snapshot.RequestSource,
}
if !options.PipelinedMemDB {
if !options.PipelinedTxn.Enable {
newTiKVTxn.us = unionstore.NewUnionStore(unionstore.NewMemDB(), snapshot)
return newTiKVTxn, nil
}
newTiKVTxn.pipelinedFlushConcurrency = options.PipelinedTxn.FlushConcurrency
newTiKVTxn.pipelinedResolveLockConcurrency = options.PipelinedTxn.ResolveLockConcurrency
if err := newTiKVTxn.InitPipelinedMemDB(); err != nil {
return nil, err
}
Expand Down

0 comments on commit 58f362c

Please sign in to comment.