From 08aa706635047b5f31681a7b153c2721fdfe1834 Mon Sep 17 00:00:00 2001 From: zyguan Date: Fri, 22 Mar 2024 11:54:22 +0800 Subject: [PATCH 1/4] test: make TestReplicaReadAccessPathByGenError stable (#1243) * test: make TestReplicaReadAccessPathByGenError stable Signed-off-by: zyguan * reduce maxAccessErrCnt Signed-off-by: zyguan --------- Signed-off-by: zyguan --- internal/locate/replica_selector_test.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/internal/locate/replica_selector_test.go b/internal/locate/replica_selector_test.go index ebce8dd872..4572319ab0 100644 --- a/internal/locate/replica_selector_test.go +++ b/internal/locate/replica_selector_test.go @@ -17,6 +17,7 @@ import ( "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" "github.com/pkg/errors" "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/config" @@ -31,6 +32,7 @@ import ( "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util/israce" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) type testReplicaSelectorSuite struct { @@ -2508,12 +2510,16 @@ func TestReplicaReadAccessPathByLearnerCase(t *testing.T) { func TestReplicaReadAccessPathByGenError(t *testing.T) { s := new(testReplicaSelectorSuite) s.SetupTest(t) - defer s.TearDownTest() + defer func(lv zapcore.Level) { + log.SetLevel(lv) + s.TearDownTest() + }(log.GetLevel()) + log.SetLevel(zapcore.ErrorLevel) - maxAccessErrCnt := 6 + maxAccessErrCnt := 4 if israce.RaceEnabled { // When run this test with race, it will take a long time, so we reduce the maxAccessErrCnt to 3 to speed up test to avoid timeout. - maxAccessErrCnt = 3 + maxAccessErrCnt = 2 } totalValidCaseCount := 0 totalCaseCount := 0 From 05aaba6cc6f7c9e7f4d6e29b8d0c1acdca810202 Mon Sep 17 00:00:00 2001 From: zyguan Date: Fri, 22 Mar 2024 15:07:37 +0800 Subject: [PATCH 2/4] tikvrpc: avoid data race on `XxxRequest.Context` (#1242) * tikvrpc: avoid data race on `XxxRequest.Context` Signed-off-by: zyguan * fix grammar of codegen comment Signed-off-by: zyguan * address comments Signed-off-by: zyguan * check diff of go generate Signed-off-by: zyguan * fix a typo Signed-off-by: zyguan --------- Signed-off-by: zyguan --- .github/workflows/test.yml | 5 + internal/locate/replica_selector_test.go | 2 +- tikvrpc/cmds_generated.go | 384 +++++++++++++++++++++++ tikvrpc/gen.sh | 99 ++++++ tikvrpc/tikvrpc.go | 117 ++----- tikvrpc/tikvrpc_test.go | 90 ++++++ 6 files changed, 609 insertions(+), 88 deletions(-) create mode 100644 tikvrpc/cmds_generated.go create mode 100755 tikvrpc/gen.sh diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 1c2428c976..1491bd6046 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -44,6 +44,11 @@ jobs: with: go-version: 1.21.6 + - name: Go generate and check diff + run: | + go generate ./... + git diff --exit-code + - name: Lint uses: golangci/golangci-lint-action@v3 with: diff --git a/internal/locate/replica_selector_test.go b/internal/locate/replica_selector_test.go index 4572319ab0..bb6cd01a60 100644 --- a/internal/locate/replica_selector_test.go +++ b/internal/locate/replica_selector_test.go @@ -2518,7 +2518,7 @@ func TestReplicaReadAccessPathByGenError(t *testing.T) { maxAccessErrCnt := 4 if israce.RaceEnabled { - // When run this test with race, it will take a long time, so we reduce the maxAccessErrCnt to 3 to speed up test to avoid timeout. + // When run this test with race, it will take a long time, so we reduce the maxAccessErrCnt to 2 to speed up test to avoid timeout. maxAccessErrCnt = 2 } totalValidCaseCount := 0 diff --git a/tikvrpc/cmds_generated.go b/tikvrpc/cmds_generated.go new file mode 100644 index 0000000000..095b0e5c13 --- /dev/null +++ b/tikvrpc/cmds_generated.go @@ -0,0 +1,384 @@ +// Code generated by gen.sh. DO NOT EDIT. + +package tikvrpc + +import ( + "github.com/pingcap/kvproto/pkg/kvrpcpb" +) + +func patchCmdCtx(req *Request, cmd CmdType, ctx *kvrpcpb.Context) bool { + switch cmd { + case CmdGet: + if req.rev == 0 { + req.Get().Context = ctx + } else { + cmd := *req.Get() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdScan: + if req.rev == 0 { + req.Scan().Context = ctx + } else { + cmd := *req.Scan() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdPrewrite: + if req.rev == 0 { + req.Prewrite().Context = ctx + } else { + cmd := *req.Prewrite() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdPessimisticLock: + if req.rev == 0 { + req.PessimisticLock().Context = ctx + } else { + cmd := *req.PessimisticLock() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdPessimisticRollback: + if req.rev == 0 { + req.PessimisticRollback().Context = ctx + } else { + cmd := *req.PessimisticRollback() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdCommit: + if req.rev == 0 { + req.Commit().Context = ctx + } else { + cmd := *req.Commit() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdCleanup: + if req.rev == 0 { + req.Cleanup().Context = ctx + } else { + cmd := *req.Cleanup() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdBatchGet: + if req.rev == 0 { + req.BatchGet().Context = ctx + } else { + cmd := *req.BatchGet() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdBatchRollback: + if req.rev == 0 { + req.BatchRollback().Context = ctx + } else { + cmd := *req.BatchRollback() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdScanLock: + if req.rev == 0 { + req.ScanLock().Context = ctx + } else { + cmd := *req.ScanLock() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdResolveLock: + if req.rev == 0 { + req.ResolveLock().Context = ctx + } else { + cmd := *req.ResolveLock() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdGC: + if req.rev == 0 { + req.GC().Context = ctx + } else { + cmd := *req.GC() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdDeleteRange: + if req.rev == 0 { + req.DeleteRange().Context = ctx + } else { + cmd := *req.DeleteRange() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdRawGet: + if req.rev == 0 { + req.RawGet().Context = ctx + } else { + cmd := *req.RawGet() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdRawBatchGet: + if req.rev == 0 { + req.RawBatchGet().Context = ctx + } else { + cmd := *req.RawBatchGet() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdRawPut: + if req.rev == 0 { + req.RawPut().Context = ctx + } else { + cmd := *req.RawPut() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdRawBatchPut: + if req.rev == 0 { + req.RawBatchPut().Context = ctx + } else { + cmd := *req.RawBatchPut() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdRawDelete: + if req.rev == 0 { + req.RawDelete().Context = ctx + } else { + cmd := *req.RawDelete() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdRawBatchDelete: + if req.rev == 0 { + req.RawBatchDelete().Context = ctx + } else { + cmd := *req.RawBatchDelete() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdRawDeleteRange: + if req.rev == 0 { + req.RawDeleteRange().Context = ctx + } else { + cmd := *req.RawDeleteRange() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdRawScan: + if req.rev == 0 { + req.RawScan().Context = ctx + } else { + cmd := *req.RawScan() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdRawGetKeyTTL: + if req.rev == 0 { + req.RawGetKeyTTL().Context = ctx + } else { + cmd := *req.RawGetKeyTTL() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdRawCompareAndSwap: + if req.rev == 0 { + req.RawCompareAndSwap().Context = ctx + } else { + cmd := *req.RawCompareAndSwap() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdRawChecksum: + if req.rev == 0 { + req.RawChecksum().Context = ctx + } else { + cmd := *req.RawChecksum() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdUnsafeDestroyRange: + if req.rev == 0 { + req.UnsafeDestroyRange().Context = ctx + } else { + cmd := *req.UnsafeDestroyRange() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdRegisterLockObserver: + if req.rev == 0 { + req.RegisterLockObserver().Context = ctx + } else { + cmd := *req.RegisterLockObserver() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdCheckLockObserver: + if req.rev == 0 { + req.CheckLockObserver().Context = ctx + } else { + cmd := *req.CheckLockObserver() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdRemoveLockObserver: + if req.rev == 0 { + req.RemoveLockObserver().Context = ctx + } else { + cmd := *req.RemoveLockObserver() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdPhysicalScanLock: + if req.rev == 0 { + req.PhysicalScanLock().Context = ctx + } else { + cmd := *req.PhysicalScanLock() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdCop: + if req.rev == 0 { + req.Cop().Context = ctx + } else { + cmd := *req.Cop() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdBatchCop: + if req.rev == 0 { + req.BatchCop().Context = ctx + } else { + cmd := *req.BatchCop() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdMvccGetByKey: + if req.rev == 0 { + req.MvccGetByKey().Context = ctx + } else { + cmd := *req.MvccGetByKey() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdMvccGetByStartTs: + if req.rev == 0 { + req.MvccGetByStartTs().Context = ctx + } else { + cmd := *req.MvccGetByStartTs() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdSplitRegion: + if req.rev == 0 { + req.SplitRegion().Context = ctx + } else { + cmd := *req.SplitRegion() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdTxnHeartBeat: + if req.rev == 0 { + req.TxnHeartBeat().Context = ctx + } else { + cmd := *req.TxnHeartBeat() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdCheckTxnStatus: + if req.rev == 0 { + req.CheckTxnStatus().Context = ctx + } else { + cmd := *req.CheckTxnStatus() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdCheckSecondaryLocks: + if req.rev == 0 { + req.CheckSecondaryLocks().Context = ctx + } else { + cmd := *req.CheckSecondaryLocks() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdFlashbackToVersion: + if req.rev == 0 { + req.FlashbackToVersion().Context = ctx + } else { + cmd := *req.FlashbackToVersion() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdPrepareFlashbackToVersion: + if req.rev == 0 { + req.PrepareFlashbackToVersion().Context = ctx + } else { + cmd := *req.PrepareFlashbackToVersion() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdFlush: + if req.rev == 0 { + req.Flush().Context = ctx + } else { + cmd := *req.Flush() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + case CmdBufferBatchGet: + if req.rev == 0 { + req.BufferBatchGet().Context = ctx + } else { + cmd := *req.BufferBatchGet() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ + default: + return false + } + return true +} diff --git a/tikvrpc/gen.sh b/tikvrpc/gen.sh new file mode 100755 index 0000000000..0911c424a8 --- /dev/null +++ b/tikvrpc/gen.sh @@ -0,0 +1,99 @@ +#!/bin/bash + +# Copyright 2024 TiKV Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +output="cmds_generated.go" + +cat < $output +// Code generated by gen.sh. DO NOT EDIT. + +package tikvrpc + +import ( + "github.com/pingcap/kvproto/pkg/kvrpcpb" +) +EOF + +cmds=( + Get + Scan + Prewrite + PessimisticLock + PessimisticRollback + Commit + Cleanup + BatchGet + BatchRollback + ScanLock + ResolveLock + GC + DeleteRange + RawGet + RawBatchGet + RawPut + RawBatchPut + RawDelete + RawBatchDelete + RawDeleteRange + RawScan + RawGetKeyTTL + RawCompareAndSwap + RawChecksum + UnsafeDestroyRange + RegisterLockObserver + CheckLockObserver + RemoveLockObserver + PhysicalScanLock + Cop + BatchCop + MvccGetByKey + MvccGetByStartTs + SplitRegion + TxnHeartBeat + CheckTxnStatus + CheckSecondaryLocks + FlashbackToVersion + PrepareFlashbackToVersion + Flush + BufferBatchGet +) + +cat <> $output + +func patchCmdCtx(req *Request, cmd CmdType, ctx *kvrpcpb.Context) bool { + switch cmd { +EOF + +for cmd in "${cmds[@]}"; do +cat <> $output + case Cmd${cmd}: + if req.rev == 0 { + req.${cmd}().Context = ctx + } else { + cmd := *req.${cmd}() + cmd.Context = ctx + req.Req = &cmd + } + req.rev++ +EOF +done + +cat <> $output + default: + return false + } + return true +} +EOF diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index ddc97d153e..50821516e8 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -85,7 +85,7 @@ const ( CmdRawBatchDelete CmdRawDeleteRange CmdRawScan - CmdGetKeyTTL + CmdRawGetKeyTTL CmdRawCompareAndSwap CmdRawChecksum @@ -118,6 +118,11 @@ const ( CmdEmpty CmdType = 3072 + iota ) +// CmdType aliases. +const ( + CmdGetKeyTTL = CmdRawGetKeyTTL +) + func (t CmdType) String() string { switch t { case CmdGet: @@ -164,6 +169,10 @@ func (t CmdType) String() string { return "RawScan" case CmdRawChecksum: return "RawChecksum" + case CmdRawGetKeyTTL: + return "RawGetKeyTTL" + case CmdRawCompareAndSwap: + return "RawCompareAndSwap" case CmdUnsafeDestroyRange: return "UnsafeDestroyRange" case CmdRegisterLockObserver: @@ -225,7 +234,11 @@ func (t CmdType) String() string { // Request wraps all kv/coprocessor requests. type Request struct { Type CmdType - Req interface{} + // Req is one of the request type defined in kvrpcpb. + // + // WARN: It may be read concurrently in batch-send-loop, so you should ONLY modify it via `AttachContext`, + // otherwise there could be a risk of data race. + Req interface{} kvrpcpb.Context ReadReplicaScope string // remove txnScope after tidb removed txnScope @@ -244,6 +257,9 @@ type Request struct { ReadType string // InputRequestSource is the input source of the request, if it's not empty, the final RequestSource sent to store will be attached with the retry info. InputRequestSource string + + // rev represents the revision of the request, it's increased when `Req.Context` gets patched. + rev uint32 } // NewRequest returns new kv rpc request. @@ -731,104 +747,31 @@ type MPPStreamResponse struct { Lease } +//go:generate bash gen.sh + // AttachContext sets the request context to the request, // return false if encounter unknown request type. // Parameter `rpcCtx` use `kvrpcpb.Context` instead of `*kvrpcpb.Context` to avoid concurrent modification by shallow copy. func AttachContext(req *Request, rpcCtx kvrpcpb.Context) bool { ctx := &rpcCtx + cmd := req.Type + // CmdCopStream and CmdCop share the same request type. + if cmd == CmdCopStream { + cmd = CmdCop + } + if patchCmdCtx(req, cmd, ctx) { + return true + } switch req.Type { - case CmdGet: - req.Get().Context = ctx - case CmdScan: - req.Scan().Context = ctx - case CmdPrewrite: - req.Prewrite().Context = ctx - case CmdPessimisticLock: - req.PessimisticLock().Context = ctx - case CmdPessimisticRollback: - req.PessimisticRollback().Context = ctx - case CmdCommit: - req.Commit().Context = ctx - case CmdCleanup: - req.Cleanup().Context = ctx - case CmdBatchGet: - req.BatchGet().Context = ctx - case CmdBatchRollback: - req.BatchRollback().Context = ctx - case CmdScanLock: - req.ScanLock().Context = ctx - case CmdResolveLock: - req.ResolveLock().Context = ctx - case CmdGC: - req.GC().Context = ctx - case CmdDeleteRange: - req.DeleteRange().Context = ctx - case CmdRawGet: - req.RawGet().Context = ctx - case CmdRawBatchGet: - req.RawBatchGet().Context = ctx - case CmdRawPut: - req.RawPut().Context = ctx - case CmdRawBatchPut: - req.RawBatchPut().Context = ctx - case CmdRawDelete: - req.RawDelete().Context = ctx - case CmdRawBatchDelete: - req.RawBatchDelete().Context = ctx - case CmdRawDeleteRange: - req.RawDeleteRange().Context = ctx - case CmdRawScan: - req.RawScan().Context = ctx - case CmdGetKeyTTL: - req.RawGetKeyTTL().Context = ctx - case CmdRawCompareAndSwap: - req.RawCompareAndSwap().Context = ctx - case CmdRawChecksum: - req.RawChecksum().Context = ctx - case CmdUnsafeDestroyRange: - req.UnsafeDestroyRange().Context = ctx - case CmdRegisterLockObserver: - req.RegisterLockObserver().Context = ctx - case CmdCheckLockObserver: - req.CheckLockObserver().Context = ctx - case CmdRemoveLockObserver: - req.RemoveLockObserver().Context = ctx - case CmdPhysicalScanLock: - req.PhysicalScanLock().Context = ctx - case CmdCop: - req.Cop().Context = ctx - case CmdCopStream: - req.Cop().Context = ctx - case CmdBatchCop: - req.BatchCop().Context = ctx // Dispatching MPP tasks don't need a region context, because it's a request for store but not region. case CmdMPPTask: case CmdMPPConn: case CmdMPPCancel: case CmdMPPAlive: - case CmdMvccGetByKey: - req.MvccGetByKey().Context = ctx - case CmdMvccGetByStartTs: - req.MvccGetByStartTs().Context = ctx - case CmdSplitRegion: - req.SplitRegion().Context = ctx + // Empty command doesn't need a region context. case CmdEmpty: - req.SplitRegion().Context = ctx - case CmdTxnHeartBeat: - req.TxnHeartBeat().Context = ctx - case CmdCheckTxnStatus: - req.CheckTxnStatus().Context = ctx - case CmdCheckSecondaryLocks: - req.CheckSecondaryLocks().Context = ctx - case CmdFlashbackToVersion: - req.FlashbackToVersion().Context = ctx - case CmdPrepareFlashbackToVersion: - req.PrepareFlashbackToVersion().Context = ctx - case CmdFlush: - req.Flush().Context = ctx - case CmdBufferBatchGet: - req.BufferBatchGet().Context = ctx + default: return false } diff --git a/tikvrpc/tikvrpc_test.go b/tikvrpc/tikvrpc_test.go index e3d5e25fb3..5a301e09c9 100644 --- a/tikvrpc/tikvrpc_test.go +++ b/tikvrpc/tikvrpc_test.go @@ -35,8 +35,14 @@ package tikvrpc import ( + "fmt" + "math/rand" + "sync" "testing" + "time" + "github.com/pingcap/kvproto/pkg/coprocessor" + "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/stretchr/testify/assert" ) @@ -47,3 +53,87 @@ func TestBatchResponse(t *testing.T) { assert.Nil(t, batchResp) assert.NotNil(t, err) } + +// https://github.com/pingcap/tidb/issues/51921 +func TestTiDB51921(t *testing.T) { + for _, r := range []*Request{ + NewRequest(CmdGet, &kvrpcpb.GetRequest{}), + NewRequest(CmdScan, &kvrpcpb.ScanRequest{}), + NewRequest(CmdPrewrite, &kvrpcpb.PrewriteRequest{}), + NewRequest(CmdPessimisticLock, &kvrpcpb.PessimisticLockRequest{}), + NewRequest(CmdPessimisticRollback, &kvrpcpb.PessimisticRollbackRequest{}), + NewRequest(CmdCommit, &kvrpcpb.CommitRequest{}), + NewRequest(CmdCleanup, &kvrpcpb.CleanupRequest{}), + NewRequest(CmdBatchGet, &kvrpcpb.BatchGetRequest{}), + NewRequest(CmdBatchRollback, &kvrpcpb.BatchRollbackRequest{}), + NewRequest(CmdScanLock, &kvrpcpb.ScanLockRequest{}), + NewRequest(CmdResolveLock, &kvrpcpb.ResolveLockRequest{}), + NewRequest(CmdGC, &kvrpcpb.GCRequest{}), + NewRequest(CmdDeleteRange, &kvrpcpb.DeleteRangeRequest{}), + NewRequest(CmdRawGet, &kvrpcpb.RawGetRequest{}), + NewRequest(CmdRawBatchGet, &kvrpcpb.RawBatchGetRequest{}), + NewRequest(CmdRawPut, &kvrpcpb.RawPutRequest{}), + NewRequest(CmdRawBatchPut, &kvrpcpb.RawBatchPutRequest{}), + NewRequest(CmdRawDelete, &kvrpcpb.RawDeleteRequest{}), + NewRequest(CmdRawBatchDelete, &kvrpcpb.RawBatchDeleteRequest{}), + NewRequest(CmdRawDeleteRange, &kvrpcpb.RawDeleteRangeRequest{}), + NewRequest(CmdRawScan, &kvrpcpb.RawScanRequest{}), + NewRequest(CmdRawGetKeyTTL, &kvrpcpb.RawGetKeyTTLRequest{}), + NewRequest(CmdRawCompareAndSwap, &kvrpcpb.RawCASRequest{}), + NewRequest(CmdRawChecksum, &kvrpcpb.RawChecksumRequest{}), + NewRequest(CmdUnsafeDestroyRange, &kvrpcpb.UnsafeDestroyRangeRequest{}), + NewRequest(CmdRegisterLockObserver, &kvrpcpb.RegisterLockObserverRequest{}), + NewRequest(CmdCheckLockObserver, &kvrpcpb.CheckLockObserverRequest{}), + NewRequest(CmdRemoveLockObserver, &kvrpcpb.RemoveLockObserverRequest{}), + NewRequest(CmdPhysicalScanLock, &kvrpcpb.PhysicalScanLockRequest{}), + NewRequest(CmdCop, &coprocessor.Request{}), + NewRequest(CmdCopStream, &coprocessor.Request{}), + NewRequest(CmdBatchCop, &coprocessor.BatchRequest{}), + NewRequest(CmdMvccGetByKey, &kvrpcpb.MvccGetByKeyRequest{}), + NewRequest(CmdMvccGetByStartTs, &kvrpcpb.MvccGetByStartTsRequest{}), + NewRequest(CmdSplitRegion, &kvrpcpb.SplitRegionRequest{}), + NewRequest(CmdTxnHeartBeat, &kvrpcpb.TxnHeartBeatRequest{}), + NewRequest(CmdCheckTxnStatus, &kvrpcpb.CheckTxnStatusRequest{}), + NewRequest(CmdCheckSecondaryLocks, &kvrpcpb.CheckSecondaryLocksRequest{}), + NewRequest(CmdFlashbackToVersion, &kvrpcpb.FlashbackToVersionRequest{}), + NewRequest(CmdPrepareFlashbackToVersion, &kvrpcpb.PrepareFlashbackToVersionRequest{}), + NewRequest(CmdFlush, &kvrpcpb.FlushRequest{}), + NewRequest(CmdBufferBatchGet, &kvrpcpb.BufferBatchGetRequest{}), + } { + req := r + t.Run(fmt.Sprintf("%s#%d", req.Type.String(), req.Type), func(t *testing.T) { + if req.ToBatchCommandsRequest() == nil { + t.Skipf("%s doesn't support batch commands", req.Type.String()) + } + done := make(chan struct{}) + cmds := make(chan *tikvpb.BatchCommandsRequest_Request, 8) + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + defer wg.Done() + for { + select { + case <-done: + close(cmds) + return + default: + // mock relocate and retry + AttachContext(req, kvrpcpb.Context{RegionId: rand.Uint64()}) + cmds <- req.ToBatchCommandsRequest() + } + } + }() + go func() { + defer wg.Done() + for cmd := range cmds { + // mock send and marshal in batch-send-loop + cmd.Marshal() + } + }() + + time.Sleep(time.Second / 4) + close(done) + wg.Wait() + }) + } +} From 603dc7b2e77a95a26e6483c115d692390741e203 Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Wed, 27 Mar 2024 10:30:46 +0800 Subject: [PATCH 3/4] cop: add kv read wall time to print result (#1248) * add kv read wall time to print result Signed-off-by: cfzjywxk * format Signed-off-by: cfzjywxk --------- Signed-off-by: cfzjywxk --- util/execdetails.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/util/execdetails.go b/util/execdetails.go index 30f108a4a0..a9db7a7cea 100644 --- a/util/execdetails.go +++ b/util/execdetails.go @@ -653,6 +653,13 @@ func (td *TimeDetail) String() string { buf.WriteString("total_wait_time: ") buf.WriteString(FormatDuration(td.WaitTime)) } + if td.KvReadWallTime > 0 { + if buf.Len() > 0 { + buf.WriteString(", ") + } + buf.WriteString("total_kv_read_wall_time: ") + buf.WriteString(FormatDuration(td.KvReadWallTime)) + } if td.TotalRPCWallTime > 0 { if buf.Len() > 0 { buf.WriteString(", ") @@ -663,6 +670,17 @@ func (td *TimeDetail) String() string { return buf.String() } +// Merge merges the time detail into itself. +func (td *TimeDetail) Merge(detail *TimeDetail) { + if detail != nil { + td.ProcessTime += detail.ProcessTime + td.SuspendTime += detail.SuspendTime + td.WaitTime += detail.WaitTime + td.KvReadWallTime += detail.KvReadWallTime + td.TotalRPCWallTime += detail.TotalRPCWallTime + } +} + // MergeFromTimeDetail merges time detail from pb into itself. func (td *TimeDetail) MergeFromTimeDetail(timeDetailV2 *kvrpcpb.TimeDetailV2, timeDetail *kvrpcpb.TimeDetail) { if timeDetailV2 != nil { From 81d8dea0ebc465dadc38c7797669e5c4dcad0359 Mon Sep 17 00:00:00 2001 From: zyguan Date: Thu, 28 Mar 2024 14:00:22 +0800 Subject: [PATCH 4/4] tikv: ensure safe-ts won't be max uint64 (#1250) * tikv: ensure safe-ts won't be max uint64 Signed-off-by: zyguan * fix a typo Signed-off-by: zyguan * fix lint issue Signed-off-by: zyguan * address the comment and fix test Signed-off-by: zyguan --------- Signed-off-by: zyguan --- integration_tests/pd_api_test.go | 5 +- tikv/kv.go | 35 +++++-- tikv/kv_test.go | 160 ++++++++++++++++++++++++++----- 3 files changed, 166 insertions(+), 34 deletions(-) diff --git a/integration_tests/pd_api_test.go b/integration_tests/pd_api_test.go index 2da4c4bfb0..add750f565 100644 --- a/integration_tests/pd_api_test.go +++ b/integration_tests/pd_api_test.go @@ -17,7 +17,6 @@ package tikv_test import ( "context" "fmt" - "math" "strings" "sync/atomic" "testing" @@ -183,8 +182,8 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() { // Try to get the minimum resolved timestamp of the cluster from TiKV. require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`)) // Make sure the store's min resolved ts is not initialized. - s.waitForMinSafeTS(oracle.GlobalTxnScope, math.MaxUint64) - require.Equal(uint64(math.MaxUint64), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + s.waitForMinSafeTS(oracle.GlobalTxnScope, 0) + require.Equal(uint64(0), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) // Try to get the minimum resolved timestamp of the cluster from PD. diff --git a/tikv/kv.go b/tikv/kv.go index 172f6f614e..34b5fa24df 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -572,6 +572,15 @@ func (s *KVStore) GetMinSafeTS(txnScope string) uint64 { return 0 } +func (s *KVStore) setMinSafeTS(txnScope string, safeTS uint64) { + // ensure safeTS is not set to max uint64 + if safeTS == math.MaxUint64 { + logutil.BgLogger().Warn("skip setting min-safe-ts to max uint64", zap.String("txnScope", txnScope), zap.Stack("stack")) + return + } + s.minSafeTS.Store(txnScope, safeTS) +} + // Ctx returns ctx. func (s *KVStore) Ctx() context.Context { return s.ctx @@ -607,6 +616,11 @@ func (s *KVStore) getSafeTS(storeID uint64) (bool, uint64) { // setSafeTS sets safeTs for store storeID, export for testing func (s *KVStore) setSafeTS(storeID, safeTS uint64) { + // ensure safeTS is not set to max uint64 + if safeTS == math.MaxUint64 { + logutil.BgLogger().Warn("skip setting safe-ts to max uint64", zap.Uint64("storeID", storeID), zap.Stack("stack")) + return + } s.safeTSMap.Store(storeID, safeTS) } @@ -614,11 +628,12 @@ func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) { minSafeTS := uint64(math.MaxUint64) // when there is no store, return 0 in order to let minStartTS become startTS directly if len(storeIDs) < 1 { - s.minSafeTS.Store(txnScope, 0) + s.setMinSafeTS(txnScope, 0) } for _, store := range storeIDs { ok, safeTS := s.getSafeTS(store) if ok { + // safeTS is guaranteed to be less than math.MaxUint64 (by setSafeTS and its callers) if safeTS != 0 && safeTS < minSafeTS { minSafeTS = safeTS } @@ -626,7 +641,7 @@ func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) { minSafeTS = 0 } } - s.minSafeTS.Store(txnScope, minSafeTS) + s.setMinSafeTS(txnScope, minSafeTS) } func (s *KVStore) safeTSUpdater() { @@ -690,8 +705,8 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { safeTS uint64 storeIDStr = strconv.FormatUint(storeID, 10) ) - // If getting the minimum resolved timestamp from PD failed or returned 0, try to get it from TiKV. - if storeMinResolvedTSs == nil || storeMinResolvedTSs[storeID] == 0 || err != nil { + // If getting the minimum resolved timestamp from PD failed or returned 0/MaxUint64, try to get it from TiKV. + if storeMinResolvedTSs == nil || !isValidSafeTS(storeMinResolvedTSs[storeID]) || err != nil { resp, err := tikvClient.SendRequest( ctx, storeAddr, tikvrpc.NewRequest( tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{ @@ -785,17 +800,17 @@ func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool { clusterMinSafeTS, _, err := s.getMinResolvedTSByStoresIDs(ctx, nil) if err != nil { logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err)) - } else if clusterMinSafeTS != 0 { + } else if isValidSafeTS(clusterMinSafeTS) { // Update ts and metrics. preClusterMinSafeTS := s.GetMinSafeTS(oracle.GlobalTxnScope) - // If preClusterMinSafeTS is maxUint64, it means that the min safe ts has not been initialized. + // preClusterMinSafeTS is guaranteed to be less than math.MaxUint64 (by this method and setMinSafeTS) // related to https://github.com/tikv/client-go/issues/991 - if preClusterMinSafeTS != math.MaxUint64 && preClusterMinSafeTS > clusterMinSafeTS { + if preClusterMinSafeTS > clusterMinSafeTS { skipSafeTSUpdateCounter.Inc() preSafeTSTime := oracle.GetTimeFromTS(preClusterMinSafeTS) clusterMinSafeTSGap.Set(time.Since(preSafeTSTime).Seconds()) } else { - s.minSafeTS.Store(oracle.GlobalTxnScope, clusterMinSafeTS) + s.setMinSafeTS(oracle.GlobalTxnScope, clusterMinSafeTS) successSafeTSUpdateCounter.Inc() safeTSTime := oracle.GetTimeFromTS(clusterMinSafeTS) clusterMinSafeTSGap.Set(time.Since(safeTSTime).Seconds()) @@ -807,6 +822,10 @@ func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool { return false } +func isValidSafeTS(ts uint64) bool { + return ts != 0 && ts != math.MaxUint64 +} + // EnableResourceControl enables the resource control. func EnableResourceControl() { client.ResourceControlSwitch.Store(true) diff --git a/tikv/kv_test.go b/tikv/kv_test.go index 9f9af85006..edf6182ed5 100644 --- a/tikv/kv_test.go +++ b/tikv/kv_test.go @@ -17,6 +17,7 @@ package tikv import ( "context" "fmt" + "math" "sync/atomic" "testing" "time" @@ -29,6 +30,7 @@ import ( "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util" + pdhttp "github.com/tikv/pd/client/http" ) func TestKV(t *testing.T) { @@ -38,18 +40,27 @@ func TestKV(t *testing.T) { type testKVSuite struct { suite.Suite - store *KVStore - cluster *mocktikv.Cluster - tikvStoreID uint64 - tiflashStoreID uint64 - tiflashPeerStoreID uint64 + store *KVStore + cluster *mocktikv.Cluster + tikvStoreID uint64 + tiflashStoreID uint64 + + mockGetMinResolvedTSByStoresIDs atomic.Pointer[func(ctx context.Context, ids []uint64) (uint64, map[uint64]uint64, error)] } func (s *testKVSuite) SetupTest() { client, cluster, pdClient, err := testutils.NewMockTiKV("", nil) s.Require().Nil(err) testutils.BootstrapWithSingleStore(cluster) - store, err := NewTestTiKVStore(client, pdClient, nil, nil, 0) + s.setGetMinResolvedTSByStoresIDs(func(ctx context.Context, ids []uint64) (uint64, map[uint64]uint64, error) { + return 0, nil, nil + }) + store, err := NewTestTiKVStore(client, pdClient, nil, nil, 0, Option(func(store *KVStore) { + store.pdHttpClient = &mockPDHTTPClient{ + Client: pdhttp.NewClientWithServiceDiscovery("test", nil), + mockGetMinResolvedTSByStoresIDs: &s.mockGetMinResolvedTSByStoresIDs, + } + })) s.Require().Nil(err) s.store = store @@ -58,14 +69,18 @@ func (s *testKVSuite) SetupTest() { storeIDs, _, _, _ := mocktikv.BootstrapWithMultiStores(s.cluster, 2) s.tikvStoreID = storeIDs[0] s.tiflashStoreID = storeIDs[1] - tiflashPeerAddrID := cluster.AllocIDs(1) - s.tiflashPeerStoreID = tiflashPeerAddrID[0] - s.cluster.UpdateStorePeerAddr(s.tiflashStoreID, s.storeAddr(s.tiflashPeerStoreID), &metapb.StoreLabel{Key: "engine", Value: "tiflash"}) - s.store.regionCache.SetRegionCacheStore(s.tikvStoreID, s.storeAddr(s.tikvStoreID), s.storeAddr(s.tikvStoreID), tikvrpc.TiKV, 1, nil) var labels []*metapb.StoreLabel - labels = append(labels, &metapb.StoreLabel{Key: "engine", Value: "tiflash"}) - s.store.regionCache.SetRegionCacheStore(s.tiflashStoreID, s.storeAddr(s.tiflashStoreID), s.storeAddr(s.tiflashPeerStoreID), tikvrpc.TiFlash, 1, labels) + labels = append(cluster.GetStore(s.tikvStoreID).Labels, + &metapb.StoreLabel{Key: DCLabelKey, Value: "z1"}) + s.cluster.UpdateStorePeerAddr(s.tikvStoreID, s.storeAddr(s.tikvStoreID), labels...) + s.store.regionCache.SetRegionCacheStore(s.tikvStoreID, s.storeAddr(s.tikvStoreID), s.storeAddr(s.tikvStoreID), tikvrpc.TiKV, 1, labels) + + labels = append(cluster.GetStore(s.tiflashStoreID).Labels, + &metapb.StoreLabel{Key: DCLabelKey, Value: "z2"}, + &metapb.StoreLabel{Key: "engine", Value: "tiflash"}) + s.cluster.UpdateStorePeerAddr(s.tiflashStoreID, s.storeAddr(s.tiflashStoreID), labels...) + s.store.regionCache.SetRegionCacheStore(s.tiflashStoreID, s.storeAddr(s.tiflashStoreID), s.storeAddr(s.tiflashStoreID), tikvrpc.TiFlash, 1, labels) } @@ -77,6 +92,10 @@ func (s *testKVSuite) storeAddr(id uint64) string { return fmt.Sprintf("store%d", id) } +func (s *testKVSuite) setGetMinResolvedTSByStoresIDs(f func(ctx context.Context, ids []uint64) (uint64, map[uint64]uint64, error)) { + s.mockGetMinResolvedTSByStoresIDs.Store(&f) +} + type storeSafeTsMockClient struct { Client requestCount int32 @@ -89,7 +108,7 @@ func (c *storeSafeTsMockClient) SendRequest(ctx context.Context, addr string, re } atomic.AddInt32(&c.requestCount, 1) resp := &tikvrpc.Response{} - if addr == c.testSuite.storeAddr(c.testSuite.tiflashPeerStoreID) { + if addr == c.testSuite.storeAddr(c.testSuite.tiflashStoreID) { resp.Resp = &kvrpcpb.StoreSafeTSResponse{SafeTs: 80} } else { resp.Resp = &kvrpcpb.StoreSafeTSResponse{SafeTs: 100} @@ -105,22 +124,117 @@ func (c *storeSafeTsMockClient) CloseAddr(addr string) error { return c.Client.CloseAddr(addr) } -func (s *testKVSuite) TestMinSafeTs() { +type mockPDHTTPClient struct { + pdhttp.Client + mockGetMinResolvedTSByStoresIDs *atomic.Pointer[func(ctx context.Context, ids []uint64) (uint64, map[uint64]uint64, error)] +} + +func (c *mockPDHTTPClient) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uint64) (uint64, map[uint64]uint64, error) { + if f := c.mockGetMinResolvedTSByStoresIDs.Load(); f != nil { + return (*f)(ctx, storeIDs) + } + return c.Client.GetMinResolvedTSByStoresIDs(ctx, storeIDs) +} + +func (s *testKVSuite) TestMinSafeTsFromStores() { mockClient := storeSafeTsMockClient{ Client: s.store.GetTiKVClient(), testSuite: s, } s.store.SetTiKVClient(&mockClient) - // wait for updateMinSafeTS - var retryCount int - for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 80 { - time.Sleep(2 * time.Second) - if retryCount > 5 { - break - } - retryCount++ - } + s.Eventually(func() bool { + ts := s.store.GetMinSafeTS(oracle.GlobalTxnScope) + s.Require().False(math.MaxUint64 == ts) + return ts == 80 + }, 15*time.Second, time.Second) s.Require().GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(2)) s.Require().Equal(uint64(80), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + ok, ts := s.store.getSafeTS(s.tikvStoreID) + s.Require().True(ok) + s.Require().Equal(uint64(100), ts) +} + +func (s *testKVSuite) TestMinSafeTsFromPD() { + mockClient := storeSafeTsMockClient{Client: s.store.GetTiKVClient(), testSuite: s} + s.store.SetTiKVClient(&mockClient) + s.setGetMinResolvedTSByStoresIDs(func(ctx context.Context, ids []uint64) (uint64, map[uint64]uint64, error) { + return 90, nil, nil + }) + s.Eventually(func() bool { + ts := s.store.GetMinSafeTS(oracle.GlobalTxnScope) + s.Require().False(math.MaxUint64 == ts) + return ts == 90 + }, 15*time.Second, time.Second) + s.Require().Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0)) + s.Require().Equal(uint64(90), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) +} + +func (s *testKVSuite) TestMinSafeTsFromPDByStores() { + mockClient := storeSafeTsMockClient{Client: s.store.GetTiKVClient(), testSuite: s} + s.store.SetTiKVClient(&mockClient) + s.setGetMinResolvedTSByStoresIDs(func(ctx context.Context, ids []uint64) (uint64, map[uint64]uint64, error) { + m := make(map[uint64]uint64) + for _, id := range ids { + m[id] = uint64(100) + id + } + return math.MaxUint64, m, nil + }) + s.Eventually(func() bool { + ts := s.store.GetMinSafeTS(oracle.GlobalTxnScope) + s.Require().False(math.MaxUint64 == ts) + return ts == uint64(100)+s.tikvStoreID + }, 15*time.Second, time.Second) + s.Require().Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0)) + s.Require().Equal(uint64(100)+s.tikvStoreID, s.store.GetMinSafeTS(oracle.GlobalTxnScope)) +} + +func (s *testKVSuite) TestMinSafeTsFromMixed1() { + mockClient := storeSafeTsMockClient{Client: s.store.GetTiKVClient(), testSuite: s} + s.store.SetTiKVClient(&mockClient) + s.setGetMinResolvedTSByStoresIDs(func(ctx context.Context, ids []uint64) (uint64, map[uint64]uint64, error) { + m := make(map[uint64]uint64) + for _, id := range ids { + if id == s.tiflashStoreID { + m[id] = 0 + } else { + m[id] = uint64(10) + } + } + return math.MaxUint64, m, nil + }) + s.Eventually(func() bool { + ts := s.store.GetMinSafeTS("z1") + s.Require().False(math.MaxUint64 == ts) + return ts == uint64(10) + }, 15*time.Second, time.Second) + s.Require().GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1)) + s.Require().Equal(uint64(10), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + s.Require().Equal(uint64(10), s.store.GetMinSafeTS("z1")) + s.Require().Equal(uint64(80), s.store.GetMinSafeTS("z2")) +} + +func (s *testKVSuite) TestMinSafeTsFromMixed2() { + mockClient := storeSafeTsMockClient{Client: s.store.GetTiKVClient(), testSuite: s} + s.store.SetTiKVClient(&mockClient) + s.setGetMinResolvedTSByStoresIDs(func(ctx context.Context, ids []uint64) (uint64, map[uint64]uint64, error) { + m := make(map[uint64]uint64) + for _, id := range ids { + if id == s.tiflashStoreID { + m[id] = uint64(10) + } else { + m[id] = math.MaxUint64 + } + } + return math.MaxUint64, m, nil + }) + s.Eventually(func() bool { + ts := s.store.GetMinSafeTS("z2") + s.Require().False(math.MaxUint64 == ts) + return ts == uint64(10) + }, 15*time.Second, time.Second) + s.Require().GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1)) + s.Require().Equal(uint64(10), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + s.Require().Equal(uint64(100), s.store.GetMinSafeTS("z1")) + s.Require().Equal(uint64(10), s.store.GetMinSafeTS("z2")) }