Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow setting concurrency for pipelined flush and resolveLocks #1494

Merged
merged 15 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/gcworker/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module gcworker

go 1.23
go 1.23.2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it an auto update from IDE? I think go1.23 is enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forget the reason to do so, probably because of compiling or CI stuff. I will revert it


require github.com/tikv/client-go/v2 v2.0.0

Expand Down
2 changes: 1 addition & 1 deletion examples/rawkv/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module rawkv

go 1.23
go 1.23.2

require github.com/tikv/client-go/v2 v2.0.0

Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/1pc_txn/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module 1pc_txn

go 1.23
go 1.23.2

require github.com/tikv/client-go/v2 v2.0.0

Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/async_commit/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module async_commit

go 1.23
go 1.23.2

require github.com/tikv/client-go/v2 v2.0.0

Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/delete_range/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module delete_range

go 1.23
go 1.23.2

require github.com/tikv/client-go/v2 v2.0.0

Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module txnkv

go 1.23
go 1.23.2

require github.com/tikv/client-go/v2 v2.0.0

Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/pessimistic_txn/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module pessimistic_txn

go 1.23
go 1.23.2

require github.com/tikv/client-go/v2 v2.0.0

Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/unsafedestoryrange/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module unsafedestoryrange

go 1.23
go 1.23.2

require github.com/tikv/client-go/v2 v2.0.0

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/tikv/client-go/v2
go 1.23

require (
github.com/VividCortex/ewma v1.2.0
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da
github.com/docker/go-units v0.5.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down
30 changes: 15 additions & 15 deletions integration_tests/pipelined_memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (s *testPipelinedMemDBSuite) mustGetLock(key []byte) *txnkv.Lock {

func (s *testPipelinedMemDBSuite) TestPipelinedAndFlush() {
ctx := context.Background()
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
txn1, err := s.store.Begin()
s.Nil(err)
Expand Down Expand Up @@ -139,7 +139,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedAndFlush() {

func (s *testPipelinedMemDBSuite) TestPipelinedMemDBBufferGet() {
ctx := context.Background()
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
for i := 0; i < 100; i++ {
key := []byte(strconv.Itoa(i))
Expand All @@ -161,7 +161,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedMemDBBufferGet() {
}

func (s *testPipelinedMemDBSuite) TestPipelinedFlushBlock() {
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
txn.Set([]byte("key1"), []byte("value1"))

Expand Down Expand Up @@ -192,7 +192,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedFlushBlock() {
}

func (s *testPipelinedMemDBSuite) TestPipelinedSkipFlushedLock() {
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
txn.Set([]byte("key1"), []byte("value1"))
flushed, err := txn.GetMemBuffer().Flush(true)
Expand All @@ -206,7 +206,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedSkipFlushedLock() {
s.Nil(txn.Commit(context.Background()))

// can see it after commit
txn, err = s.store.Begin(tikv.WithPipelinedMemDB())
txn, err = s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
defer txn.Rollback()
val, err := txn.Get(context.Background(), []byte("key1"))
Expand All @@ -222,7 +222,7 @@ func (s *testPipelinedMemDBSuite) TestResolveLockRace() {
failpoint.Disable("tikvclient/pipelinedCommitFail")
}()
for i := 0; i < 100; i++ {
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
startTS := txn.StartTS()
s.Nil(err)
for j := 0; j < 100; j++ {
Expand All @@ -246,7 +246,7 @@ func (s *testPipelinedMemDBSuite) TestResolveLockRace() {
}

func (s *testPipelinedMemDBSuite) TestPipelinedCommit() {
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
for i := 0; i < 100; i++ {
key := []byte(strconv.Itoa(i))
Expand Down Expand Up @@ -279,7 +279,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedCommit() {
}

// check the result
txn, err = s.store.Begin(tikv.WithPipelinedMemDB())
txn, err = s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
for i := 0; i < 100; i++ {
key := []byte(strconv.Itoa(i))
Expand All @@ -290,7 +290,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedCommit() {
}

func (s *testPipelinedMemDBSuite) TestPipelinedRollback() {
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
startTS := txn.StartTS()
s.Nil(err)
keys := make([][]byte, 0, 100)
Expand All @@ -304,7 +304,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedRollback() {
s.Nil(txn.GetMemBuffer().FlushWait())
s.Nil(txn.Rollback())
s.Eventuallyf(func() bool {
txn, err := s.store.Begin(tikv.WithStartTS(startTS), tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithStartTS(startTS), tikv.WithDefaultPipelinedTxn())
s.Nil(err)
defer func() { s.Nil(txn.Rollback()) }()
storageBufferedValues, err := txn.GetSnapshot().BatchGetWithTier(context.Background(), keys, txnsnapshot.BatchGetBufferTier)
Expand All @@ -322,7 +322,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedRollback() {
func (s *testPipelinedMemDBSuite) TestPipelinedPrefetch() {
failpoint.Enable("tikvclient/beforeSendReqToRegion", "return")
defer failpoint.Disable("tikvclient/beforeSendReqToRegion")
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)

mustFlush := func(txn *transaction.KVTxn) {
Expand Down Expand Up @@ -384,7 +384,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedPrefetch() {

s.Nil(txn.Commit(context.Background()))

txn, err = s.store.Begin(tikv.WithPipelinedMemDB())
txn, err = s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
txn.Set([]byte("100"), []byte("100")) // snapshot: [0, 1, ..., 99], membuffer: [100]
m, err := txn.BatchGet(context.Background(), [][]byte{[]byte("99"), []byte("100"), []byte("101")})
Expand Down Expand Up @@ -417,7 +417,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedPrefetch() {
txn.Rollback()

// empty memdb should also cache the not exist result.
txn, err = s.store.Begin(tikv.WithPipelinedMemDB())
txn, err = s.store.Begin(tikv.WithDefaultPipelinedTxn())
// batch get cache: [99 -> not exist]
m, err = txn.BatchGet(context.Background(), [][]byte{[]byte("99")})
s.Nil(err)
Expand All @@ -433,7 +433,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedDMLFailedByPKRollback() {
atomic.StoreUint64(&transaction.ManagedLockTTL, 100) // set to 100ms
defer atomic.StoreUint64(&transaction.ManagedLockTTL, originManageTTLVal)

txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
txn.Set([]byte("key1"), []byte("value1"))
txnProbe := transaction.TxnProbe{KVTxn: txn}
Expand Down Expand Up @@ -482,7 +482,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedDMLFailedByPKMaxTTLExceeded() {
restoreGlobalConfFunc()
}()

txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
txn, err := s.store.Begin(tikv.WithDefaultPipelinedTxn())
s.Nil(err)
txn.Set([]byte("key1"), []byte("value1"))
txnProbe := transaction.TxnProbe{KVTxn: txn}
Expand Down
11 changes: 10 additions & 1 deletion metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ var (
TiKVValidateReadTSFromPDCount prometheus.Counter
TiKVLowResolutionTSOUpdateIntervalSecondsGauge prometheus.Gauge
TiKVStaleRegionFromPDCounter prometheus.Counter
TiKVPipelinedFlushThrottleSecondsHistogram prometheus.Histogram
)

// Label constants.
Expand Down Expand Up @@ -852,14 +853,21 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) {
Name: "low_resolution_tso_update_interval_seconds",
Help: "The actual working update interval for the low resolution TSO. As there are adaptive mechanism internally, this value may differ from the config.",
})

TiKVStaleRegionFromPDCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "stale_region_from_pd",
Help: "Counter of stale region from PD",
})
TiKVPipelinedFlushThrottleSecondsHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "pipelined_flush_throttle_seconds",
Help: "Throttle durations of pipelined flushes.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 28), // 0.5ms ~ 18h
})

initShortcuts()
}
Expand Down Expand Up @@ -958,6 +966,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVValidateReadTSFromPDCount)
prometheus.MustRegister(TiKVLowResolutionTSOUpdateIntervalSecondsGauge)
prometheus.MustRegister(TiKVStaleRegionFromPDCounter)
prometheus.MustRegister(TiKVPipelinedFlushThrottleSecondsHistogram)
}

// readCounter reads the value of a prometheus.Counter.
Expand Down
31 changes: 28 additions & 3 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ const (
// DCLabelKey indicates the key of label which represents the dc for Store.
DCLabelKey = "zone"
safeTSUpdateInterval = time.Second * 2

defaultPipelinedFlushConcurrency = 128
defaultPipelinedResolveLockConcurrency = 8
defaultPipelinedWriteThrottleRatio = 0.0
)

func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, error) {
Expand Down Expand Up @@ -948,10 +952,31 @@ func WithStartTS(startTS uint64) TxnOption {
}
}

// WithPipelinedMemDB creates transaction with pipelined memdb.
func WithPipelinedMemDB() TxnOption {
// WithDefaultPipelinedTxn creates pipelined txn with default parameters
func WithDefaultPipelinedTxn() TxnOption {
return func(st *transaction.TxnOptions) {
st.PipelinedMemDB = true
st.PipelinedTxn = transaction.PipelinedTxnOptions{
Enable: true,
FlushConcurrency: defaultPipelinedFlushConcurrency,
ResolveLockConcurrency: defaultPipelinedResolveLockConcurrency,
WriteThrottleRatio: defaultPipelinedWriteThrottleRatio,
}
}
}

// WithPipelinedTxn creates pipelined txn with specified parameters
func WithPipelinedTxn(
flushConcurrency,
resolveLockConcurrency int,
writeThrottleRatio float64,
) TxnOption {
return func(st *transaction.TxnOptions) {
st.PipelinedTxn = transaction.PipelinedTxnOptions{
Enable: true,
FlushConcurrency: flushConcurrency,
ResolveLockConcurrency: resolveLockConcurrency,
WriteThrottleRatio: writeThrottleRatio,
}
}
}

Expand Down
22 changes: 17 additions & 5 deletions txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1102,16 +1102,28 @@ func (c *twoPhaseCommitter) doActionOnBatches(
}
return nil
}
rateLim := len(batches)
rateLim := c.calcActionConcurrency(len(batches), action)
batchExecutor := newBatchExecutor(rateLim, c, action, bo)
return batchExecutor.process(batches)
}

func (c *twoPhaseCommitter) calcActionConcurrency(
numBatches int, action twoPhaseCommitAction,
) int {
rateLim := numBatches
// Set rateLim here for the large transaction.
// If the rate limit is too high, tikv will report service is busy.
// If the rate limit is too low, we can't full utilize the tikv's throughput.
// TODO: Find a self-adaptive way to control the rate limit here.
if rateLim > config.GetGlobalConfig().CommitterConcurrency {
rateLim = config.GetGlobalConfig().CommitterConcurrency
switch action.(type) {
case actionPipelinedFlush:
rateLim = min(rateLim, max(1, c.txn.pipelinedFlushConcurrency))
default:
if rateLim > config.GetGlobalConfig().CommitterConcurrency {
rateLim = config.GetGlobalConfig().CommitterConcurrency
}
}
batchExecutor := newBatchExecutor(rateLim, c, action, bo)
return batchExecutor.process(batches)
return rateLim
}

func (c *twoPhaseCommitter) keyValueSize(key, value []byte) int {
Expand Down
3 changes: 1 addition & 2 deletions txnkv/transaction/pipelined_flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,6 @@ func (c *twoPhaseCommitter) buildPipelinedResolveHandler(commit bool, resolved *
// resolveFlushedLocks resolves all locks in the given range [start, end) with the given status.
// The resolve process is running in another goroutine so this function won't block.
func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end []byte, commit bool) {
const RESOLVE_CONCURRENCY = 8
var resolved atomic.Uint64
handler, err := c.buildPipelinedResolveHandler(commit, &resolved)
commitTs := uint64(0)
Expand All @@ -481,7 +480,7 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end
fmt.Sprintf("pipelined-dml-%s", status),
fmt.Sprintf("pipelined-dml-%s-%d", status, c.startTS),
c.store,
RESOLVE_CONCURRENCY,
c.txn.pipelinedResolveLockConcurrency,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The throttle flow control works when flushing.

And the resolve-lock phase can also consume many resources, and it may also needs to be controlled by throttle, what about adding a TODO here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throttling the resolve lock goroutine is in our current plan I think. For now we rely on the concurrency to constrain it. If we want a better control of its resource usage we might need to consider a more comprehensive mechanism, e.g. one that helps reduce the overhead of resolving locks of multiple p-txns running in parallel.

handler,
)
runner.SetStatLogInterval(30 * time.Second)
Expand Down
Loading
Loading