Skip to content

Commit

Permalink
client: Cache tikv request in tidb client side (#1098)
Browse files Browse the repository at this point in the history
* impl priority queue

Signed-off-by: bufferflies <[email protected]>

* replace priority queue

Signed-off-by: bufferflies <[email protected]>

* cache request in tidb side

Signed-off-by: bufferflies <[email protected]>

* fix gosimple

Signed-off-by: bufferflies <[email protected]>

* impl priority

Signed-off-by: bufferflies <[email protected]>

* pass ut

Signed-off-by: bufferflies <[email protected]>

* add

Signed-off-by: bufferflies <[email protected]>

* remove request if the request has been canceled

Signed-off-by: bufferflies <[email protected]>

* remove request if it has been canceled

Signed-off-by: bufferflies <[email protected]>

* add comment for cancel

Signed-off-by: bufferflies <[email protected]>

* not make the loop is busy

Signed-off-by: bufferflies <[email protected]>

* lint

Signed-off-by: bufferflies <[email protected]>

* revert busy loop

Signed-off-by: bufferflies <[email protected]>

* add unit test

Signed-off-by: bufferflies <[email protected]>

* not limit ehigh prioirty test

Signed-off-by: bufferflies <[email protected]>

* pass lint

Signed-off-by: bufferflies <[email protected]>

* support all

Signed-off-by: bufferflies <[email protected]>

* add comment

Signed-off-by: bufferflies <[email protected]>

* squash

Signed-off-by: bufferflies <[email protected]>

* revert all to All

Signed-off-by: bufferflies <[email protected]>

* remove index from entry

Signed-off-by: bufferflies <[email protected]>

* make fail reasons more clear

Signed-off-by: bufferflies <[email protected]>

---------

Signed-off-by: bufferflies <[email protected]>
  • Loading branch information
bufferflies authored Feb 21, 2024
1 parent 0f8c594 commit 824302a
Show file tree
Hide file tree
Showing 7 changed files with 306 additions and 97 deletions.
7 changes: 6 additions & 1 deletion config/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ package config

import (
"fmt"
"math"
"time"

"google.golang.org/grpc/encoding/gzip"
Expand Down Expand Up @@ -89,6 +90,9 @@ type TiKVClient struct {
// TTLRefreshedTxnSize controls whether a transaction should update its TTL or not.
TTLRefreshedTxnSize int64 `toml:"ttl-refreshed-txn-size" json:"ttl-refreshed-txn-size"`
ResolveLockLiteThreshold uint64 `toml:"resolve-lock-lite-threshold" json:"resolve-lock-lite-threshold"`
// MaxConcurrencyRequestLimit is the max concurrency number of request to be sent the tikv
// 0 means auto adjust by feedback.
MaxConcurrencyRequestLimit int64 `toml:"max-concurrency-request-limit" json:"max-concurrency-request-limit"`
}

// AsyncCommit is the config for the async commit feature. The switch to enable it is a system variable.
Expand Down Expand Up @@ -158,7 +162,8 @@ func DefaultTiKVClient() TiKVClient {
},
CoprReqTimeout: 60 * time.Second,

ResolveLockLiteThreshold: 16,
ResolveLockLiteThreshold: 16,
MaxConcurrencyRequestLimit: math.MaxInt64,
}
}

Expand Down
1 change: 0 additions & 1 deletion examples/gcworker/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ require (
github.com/google/btree v1.1.2 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
Expand Down
4 changes: 3 additions & 1 deletion internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
dialTimeout: a.dialTimeout,
tryLock: tryLock{sync.NewCond(new(sync.Mutex)), false},
}
batchClient.maxConcurrencyRequestLimit.Store(cfg.TiKVClient.MaxConcurrencyRequestLimit)
a.batchCommandsClients = append(a.batchCommandsClients, batchClient)
}
}
Expand Down Expand Up @@ -623,10 +624,11 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R

// TiDB RPC server supports batch RPC, but batch connection will send heart beat, It's not necessary since
// request to TiDB is not high frequency.
pri := req.GetResourceControlContext().GetOverridePriority()
if config.GetGlobalConfig().TiKVClient.MaxBatchSize > 0 && enableBatch {
if batchReq := req.ToBatchCommandsRequest(); batchReq != nil {
defer trace.StartRegion(ctx, req.Type.String()).End()
return sendBatchRequest(ctx, addr, req.ForwardedHost, connArray.batchConn, batchReq, timeout)
return sendBatchRequest(ctx, addr, req.ForwardedHost, connArray.batchConn, batchReq, timeout, pri)
}
}

Expand Down
130 changes: 93 additions & 37 deletions internal/client/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ type batchCommandsEntry struct {
// canceled indicated the request is canceled or not.
canceled int32
err error
pri uint64
}

func (b *batchCommandsEntry) isCanceled() bool {
return atomic.LoadInt32(&b.canceled) == 1
}

// TODO: implement by the request priority.
func (b *batchCommandsEntry) priority() int {
return 0
func (b *batchCommandsEntry) priority() uint64 {
return b.pri
}

func (b *batchCommandsEntry) error(err error) {
Expand Down Expand Up @@ -107,33 +107,58 @@ func (b *batchCommandsBuilder) push(entry *batchCommandsEntry) {
b.entries.Push(entry)
}

// build builds BatchCommandsRequests and calls collect() for each valid entry.
const highTaskPriority = 10

func (b *batchCommandsBuilder) hasHighPriorityTask() bool {
return b.entries.highestPriority() >= highTaskPriority
}

// buildWithLimit builds BatchCommandsRequests with the given limit.
// the highest priority tasks don't consume any limit,
// so the limit only works for normal tasks.
// The first return value is the request that doesn't need forwarding.
// The second is a map that maps forwarded hosts to requests.
func (b *batchCommandsBuilder) build(
collect func(id uint64, e *batchCommandsEntry),
func (b *batchCommandsBuilder) buildWithLimit(limit int64, collect func(id uint64, e *batchCommandsEntry),
) (*tikvpb.BatchCommandsRequest, map[string]*tikvpb.BatchCommandsRequest) {
for _, entry := range b.entries.All() {
e := entry.(*batchCommandsEntry)
if e.isCanceled() {
continue
count := int64(0)
build := func(reqs []Item) {
for _, e := range reqs {
e := e.(*batchCommandsEntry)
if e.isCanceled() {
continue
}
if e.priority() < highTaskPriority {
count++
}

if collect != nil {
collect(b.idAlloc, e)
}
if e.forwardedHost == "" {
b.requestIDs = append(b.requestIDs, b.idAlloc)
b.requests = append(b.requests, e.req)
} else {
batchReq, ok := b.forwardingReqs[e.forwardedHost]
if !ok {
batchReq = &tikvpb.BatchCommandsRequest{}
b.forwardingReqs[e.forwardedHost] = batchReq
}
batchReq.RequestIds = append(batchReq.RequestIds, b.idAlloc)
batchReq.Requests = append(batchReq.Requests, e.req)
}
b.idAlloc++
}
if collect != nil {
collect(b.idAlloc, e)
}
for (count < limit && b.entries.Len() > 0) || b.hasHighPriorityTask() {
n := limit
if limit == 0 {
n = 1
}
if e.forwardedHost == "" {
b.requestIDs = append(b.requestIDs, b.idAlloc)
b.requests = append(b.requests, e.req)
} else {
batchReq, ok := b.forwardingReqs[e.forwardedHost]
if !ok {
batchReq = &tikvpb.BatchCommandsRequest{}
b.forwardingReqs[e.forwardedHost] = batchReq
}
batchReq.RequestIds = append(batchReq.RequestIds, b.idAlloc)
batchReq.Requests = append(batchReq.Requests, e.req)
reqs := b.entries.Take(int(n))
if len(reqs) == 0 {
break
}
b.idAlloc++
build(reqs)
}
var req *tikvpb.BatchCommandsRequest
if len(b.requests) > 0 {
Expand All @@ -145,20 +170,22 @@ func (b *batchCommandsBuilder) build(
return req, b.forwardingReqs
}

// cancel all requests, only used in test.
func (b *batchCommandsBuilder) cancel(e error) {
for _, entry := range b.entries.All() {
for _, entry := range b.entries.all() {
entry.(*batchCommandsEntry).error(e)
}
b.entries.reset()
}

// reset resets the builder to the initial state.
// Should call it before collecting a new batch.
func (b *batchCommandsBuilder) reset() {
b.entries.clean()
// NOTE: We can't simply set entries = entries[:0] here.
// The data in the cap part of the slice would reference the prewrite keys whose
// underlying memory is borrowed from memdb. The reference cause GC can't release
// the memdb, leading to serious memory leak problems in the large transaction case.
b.entries.Reset()
for i := 0; i < len(b.requests); i++ {
b.requests[i] = nil
}
Expand Down Expand Up @@ -336,8 +363,7 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
a.fetchMorePendingRequests(int(cfg.MaxBatchSize), int(bestBatchWaitSize), cfg.MaxBatchWaitTime)
}
}
a.pendingRequests.Observe(float64(len(a.batchCommandsCh)))
a.batchSize.Observe(float64(a.reqBuilder.len()))
a.pendingRequests.Observe(float64(len(a.batchCommandsCh) + a.reqBuilder.len()))
length := a.reqBuilder.len()
if uint(length) == 0 {
// The batch command channel is closed.
Expand All @@ -354,6 +380,11 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
}
}

const (
SendFailedReasonNoAvailableLimit = "concurrency limit exceeded"
SendFailedReasonTryLockForSendFail = "tryLockForSend fail"
)

func (a *batchConn) getClientAndSend() {
if val, err := util.EvalFailpoint("mockBatchClientSendDelay"); err == nil {
if timeout, ok := val.(int); ok && timeout > 0 {
Expand All @@ -366,37 +397,50 @@ func (a *batchConn) getClientAndSend() {
cli *batchCommandsClient
target string
)
reasons := make([]string, 0)
hasHighPriorityTask := a.reqBuilder.hasHighPriorityTask()
for i := 0; i < len(a.batchCommandsClients); i++ {
a.index = (a.index + 1) % uint32(len(a.batchCommandsClients))
target = a.batchCommandsClients[a.index].target
// The lock protects the batchCommandsClient from been closed while it's in use.
if a.batchCommandsClients[a.index].tryLockForSend() {
cli = a.batchCommandsClients[a.index]
break
c := a.batchCommandsClients[a.index]
if hasHighPriorityTask || c.available() > 0 {
if c.tryLockForSend() {
cli = c
break
} else {
reasons = append(reasons, SendFailedReasonTryLockForSendFail)
}
} else {
reasons = append(reasons, SendFailedReasonNoAvailableLimit)
}
}
if cli == nil {
logutil.BgLogger().Warn("no available connections", zap.String("target", target))
logutil.BgLogger().Info("no available connections", zap.String("target", target), zap.Any("reasons", reasons))
metrics.TiKVNoAvailableConnectionCounter.Inc()

// Please ensure the error is handled in region cache correctly.
a.reqBuilder.cancel(errors.New("no available connections"))
return
}
defer cli.unlockForSend()

req, forwardingReqs := a.reqBuilder.build(func(id uint64, e *batchCommandsEntry) {
available := cli.available()
batch := 0
req, forwardingReqs := a.reqBuilder.buildWithLimit(available, func(id uint64, e *batchCommandsEntry) {
cli.batched.Store(id, e)
cli.sent.Add(1)
if trace.IsEnabled() {
trace.Log(e.ctx, "rpc", "send")
}
})
if req != nil {
batch += len(req.RequestIds)
cli.send("", req)
}
for forwardedHost, req := range forwardingReqs {
batch += len(req.RequestIds)
cli.send(forwardedHost, req)
}
if batch > 0 {
a.batchSize.Observe(float64(batch))
}
}

type tryLock struct {
Expand Down Expand Up @@ -507,12 +551,20 @@ type batchCommandsClient struct {
closed int32
// tryLock protects client when re-create the streaming.
tryLock
// sent is the number of the requests are processed by tikv server.
sent atomic.Int64
// maxConcurrencyRequestLimit is the max allowed number of requests to be sent the tikv
maxConcurrencyRequestLimit atomic.Int64
}

func (c *batchCommandsClient) isStopped() bool {
return atomic.LoadInt32(&c.closed) != 0
}

func (c *batchCommandsClient) available() int64 {
return c.maxConcurrencyRequestLimit.Load() - c.sent.Load()
}

func (c *batchCommandsClient) send(forwardedHost string, req *tikvpb.BatchCommandsRequest) {
err := c.initBatchClient(forwardedHost)
if err != nil {
Expand Down Expand Up @@ -549,6 +601,7 @@ func (c *batchCommandsClient) failPendingRequests(err error) {
id, _ := key.(uint64)
entry, _ := value.(*batchCommandsEntry)
c.batched.Delete(id)
c.sent.Add(-1)
entry.error(err)
return true
})
Expand Down Expand Up @@ -661,6 +714,7 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport
entry.res <- responses[i]
}
c.batched.Delete(requestID)
c.sent.Add(-1)
}

transportLayerLoad := resp.GetTransportLayerLoad()
Expand Down Expand Up @@ -779,6 +833,7 @@ func sendBatchRequest(
batchConn *batchConn,
req *tikvpb.BatchCommandsRequest_Request,
timeout time.Duration,
priority uint64,
) (*tikvrpc.Response, error) {
entry := &batchCommandsEntry{
ctx: ctx,
Expand All @@ -787,6 +842,7 @@ func sendBatchRequest(
forwardedHost: forwardedHost,
canceled: 0,
err: nil,
pri: priority,
}
timer := time.NewTimer(timeout)
defer timer.Stop()
Expand Down
Loading

0 comments on commit 824302a

Please sign in to comment.