Skip to content

Commit

Permalink
feat: allow setting resolved lock concurrency for pipelined dml
Browse files Browse the repository at this point in the history
Signed-off-by: ekexium <[email protected]>
  • Loading branch information
ekexium committed Nov 13, 2024
1 parent 4618da4 commit 8de9ca0
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 5 deletions.
11 changes: 8 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,23 @@ import (
)

var (
globalConf atomic.Value
PipelinedFlushConcurrency atomic.Uint32
globalConf atomic.Value
PipelinedFlushConcurrency atomic.Uint32
PipelinedResolveConcurrency atomic.Uint32
)

const (
// DefStoresRefreshInterval is the default value of StoresRefreshInterval
DefStoresRefreshInterval = 60
DefStoresRefreshInterval = 60
DefPipelinedFlushConcurrency = 128
DefPipelinedResolveConcurrency = 8
)

func init() {
conf := DefaultConfig()
StoreGlobalConfig(&conf)
PipelinedFlushConcurrency.Store(DefPipelinedFlushConcurrency)
PipelinedResolveConcurrency.Store(DefPipelinedResolveConcurrency)
}

// Config contains configuration options.
Expand Down
4 changes: 2 additions & 2 deletions txnkv/transaction/pipelined_flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/config/retry"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/internal/client"
Expand Down Expand Up @@ -454,7 +455,6 @@ func (c *twoPhaseCommitter) buildPipelinedResolveHandler(commit bool, resolved *
// resolveFlushedLocks resolves all locks in the given range [start, end) with the given status.
// The resolve process is running in another goroutine so this function won't block.
func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end []byte, commit bool) {
const RESOLVE_CONCURRENCY = 8
var resolved atomic.Uint64
handler, err := c.buildPipelinedResolveHandler(commit, &resolved)
commitTs := uint64(0)
Expand All @@ -481,7 +481,7 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end
fmt.Sprintf("pipelined-dml-%s", status),
fmt.Sprintf("pipelined-dml-%s-%d", status, c.startTS),
c.store,
RESOLVE_CONCURRENCY,
int(config.PipelinedResolveConcurrency.Load()),
handler,
)
runner.SetStatLogInterval(30 * time.Second)
Expand Down

0 comments on commit 8de9ca0

Please sign in to comment.