diff --git a/tests/realtikvtest/txntest/BUILD.bazel b/tests/realtikvtest/txntest/BUILD.bazel index a688a8caac32d..510bb0ecfbb59 100644 --- a/tests/realtikvtest/txntest/BUILD.bazel +++ b/tests/realtikvtest/txntest/BUILD.bazel @@ -12,6 +12,7 @@ go_test( flaky = True, race = "on", deps = [ +<<<<<<< HEAD "//expression", "//kv", "//parser", @@ -20,8 +21,22 @@ go_test( "//testkit", "//tests/realtikvtest", "//util", +======= + "//pkg/config", + "//pkg/errno", + "//pkg/expression", + "//pkg/kv", + "//pkg/parser", + "//pkg/session/txninfo", + "//pkg/store/driver", + "//pkg/testkit", + "//pkg/util", + "//tests/realtikvtest", + "@com_github_pingcap_errors//:errors", +>>>>>>> 3da9e0489b7 (txn: Add test case to cover the case that check_txn_status on primary lock of primary txn may affect transaction correctness (#51667)) "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//tikvrpc", "@io_opencensus_go//stats/view", ], ) diff --git a/tests/realtikvtest/txntest/txn_test.go b/tests/realtikvtest/txntest/txn_test.go index 908329d32f618..5b81e726d155f 100644 --- a/tests/realtikvtest/txntest/txn_test.go +++ b/tests/realtikvtest/txntest/txn_test.go @@ -15,15 +15,28 @@ package txntest import ( + "bytes" "context" "fmt" + "strconv" "testing" +<<<<<<< HEAD "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/testkit" +======= + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/errno" + "github.com/pingcap/tidb/pkg/expression" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/testkit" +>>>>>>> 3da9e0489b7 (txn: Add test case to cover the case that check_txn_status on primary lock of primary txn may affect transaction correctness (#51667)) "github.com/pingcap/tidb/tests/realtikvtest" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/tikvrpc" ) func TestInTxnPSProtoPointGet(t *testing.T) { @@ -251,3 +264,279 @@ func TestAssertionWhenPessimisticLockLost(t *testing.T) { err := tk1.ExecToErr("commit") require.NotContains(t, err.Error(), "assertion") } +<<<<<<< HEAD +======= + +func TestSelectLockForPartitionTable(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk1 := testkit.NewTestKit(t, store) + tk2 := testkit.NewTestKit(t, store) + + tk1.MustExec("use test") + tk1.MustExec("create table t(a int, b int, c int, key idx(a, b, c)) PARTITION BY HASH (c) PARTITIONS 10") + tk1.MustExec("insert into t values (1, 1, 1), (2, 2, 2), (3, 3, 3)") + tk1.MustExec("analyze table t") + tk1.MustExec("begin") + tk1.MustHavePlan("select * from t use index(idx) where a = 1 and b = 1 order by a limit 1 for update", "IndexLookUp") + tk1.MustExec("select * from t use index(idx) where a = 1 and b = 1 order by a limit 1 for update") + ch := make(chan bool, 1) + go func() { + tk2.MustExec("use test") + tk2.MustExec("begin") + ch <- false + // block here, until tk1 finish + tk2.MustExec("select * from t use index(idx) where a = 1 and b = 1 order by a limit 1 for update") + ch <- true + }() + + res := <-ch + // Sleep here to make sure SelectLock stmt is executed + time.Sleep(10 * time.Millisecond) + + select { + case res = <-ch: + default: + } + require.False(t, res) + + tk1.MustExec("commit") + // wait until tk2 finished + res = <-ch + require.True(t, res) +} + +func TestTxnEntrySizeLimit(t *testing.T) { + store := testkit.CreateMockStore(t) + tk1 := testkit.NewTestKit(t, store) + tk2 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + tk2.MustExec("use test") + tk1.MustExec("create table t (a int, b longtext)") + + // cannot insert a large entry by default + tk1.MustContainErrMsg("insert into t values (1, repeat('a', 7340032))", "[kv:8025]entry too large, the max entry size is 6291456") + + // increase the entry size limit allow user write large entries + tk1.MustExec("set session tidb_txn_entry_size_limit=8388608") + tk1.MustExec("insert into t values (1, repeat('a', 7340032))") + tk1.MustContainErrMsg("insert into t values (1, repeat('a', 9427968))", "[kv:8025]entry too large, the max entry size is 8388608") + + // update session var does not affect other sessions + tk2.MustContainErrMsg("insert into t values (1, repeat('a', 7340032))", "[kv:8025]entry too large, the max entry size is 6291456") + tk3 := testkit.NewTestKit(t, store) + tk3.MustExec("use test") + tk3.MustContainErrMsg("insert into t values (1, repeat('a', 7340032))", "[kv:8025]entry too large, the max entry size is 6291456") + + // update session var does not affect internal session used by ddl backfilling + tk1.MustContainErrMsg("alter table t modify column a varchar(255)", "[kv:8025]entry too large, the max entry size is 6291456") + + // update global var allows ddl backfilling write large entries + tk1.MustExec("set global tidb_txn_entry_size_limit=8388608") + tk1.MustExec("alter table t modify column a varchar(255)") + tk2.MustExec("alter table t modify column a int") + + // update global var does not affect existing sessions + tk2.MustContainErrMsg("insert into t values (1, repeat('a', 7340032))", "[kv:8025]entry too large, the max entry size is 6291456") + tk3.MustContainErrMsg("insert into t values (1, repeat('a', 7340032))", "[kv:8025]entry too large, the max entry size is 6291456") + + // update global var affects new sessions + tk4 := testkit.NewTestKit(t, store) + tk4.MustExec("use test") + tk4.MustExec("insert into t values (2, repeat('b', 7340032))") + + // reset global var to default + tk1.MustExec("set global tidb_txn_entry_size_limit=0") + tk1.MustContainErrMsg("alter table t modify column a varchar(255)", "[kv:8025]entry too large, the max entry size is 6291456") + tk2.MustContainErrMsg("alter table t modify column a varchar(255)", "[kv:8025]entry too large, the max entry size is 6291456") + tk3.MustContainErrMsg("alter table t modify column a varchar(255)", "[kv:8025]entry too large, the max entry size is 6291456") + tk4.MustContainErrMsg("alter table t modify column a varchar(255)", "[kv:8025]entry too large, the max entry size is 6291456") + + // reset session var to default + tk1.MustExec("insert into t values (3, repeat('c', 7340032))") + tk1.MustExec("set session tidb_txn_entry_size_limit=0") + tk1.MustContainErrMsg("insert into t values (1, repeat('a', 7340032))", "[kv:8025]entry too large, the max entry size is 6291456") +} + +func TestCheckTxnStatusOnOptimisticTxnBreakConsistency(t *testing.T) { + // This test case overs the issue #51666 (tikv#16620). + if !*realtikvtest.WithRealTiKV { + t.Skip("skip due to not supporting mock storage") + } + + // Allow async commit + defer config.RestoreFunc()() + config.UpdateGlobal(func(conf *config.Config) { + conf.TiKVClient.AsyncCommit.SafeWindow = 500 * time.Millisecond + conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0 + }) + + // A helper function to determine whether a KV RPC request is handled on TiKV without RPC error or region error. + isRequestHandled := func(resp *tikvrpc.Response, err error) bool { + if err != nil || resp == nil { + return false + } + + regionErr, err := resp.GetRegionError() + if err != nil || regionErr != nil { + return false + } + + return true + } + + store := realtikvtest.CreateMockStoreAndSetup(t) + tkPrepare1 := testkit.NewTestKit(t, store) + tkPrepare2 := testkit.NewTestKit(t, store) + tk1 := testkit.NewTestKit(t, store) + tk2 := testkit.NewTestKit(t, store) + tkPrepare1.MustExec("use test") + tkPrepare2.MustExec("use test") + tk1.MustExec("use test") + tk2.MustExec("use test") + + tk1.MustExec("create table t (id int primary key, v int)") + tk1.MustExec("insert into t values (1, 10), (2, 20)") + // Table t2 for revealing the possibility that the issue causing data-index inconsistency. + tk1.MustExec("create table t2 (id int primary key, v int unique)") + tk1.MustExec("insert into t2 values (1, 10)") + + tkPrepare1.MustExec("set @@tidb_enable_async_commit = 1") + tk1.MustExec("set @@tidb_enable_async_commit = 0") + + // Prepare a ts collision (currentTxn.StartTS == lastTxn.CommitTS on the same key). + // Loop until we successfully prepare one. + var lastCommitTS uint64 + for constructionIters := 0; ; constructionIters++ { + // Reset the value which might have been updated in the previous attempt. + tkPrepare1.MustExec("update t set v = 10 where id = 1") + + // Update row 1 in async commit mode + require.NoError(t, failpoint.Enable("tikvclient/beforePrewrite", "pause")) + tkPrepapre1Ch := make(chan struct{}) + go func() { + tkPrepare1.MustExec("update t set v = v + 1 where id = 1") + tkPrepapre1Ch <- struct{}{} + }() + + // tkPrepare2 Updates TiKV's max_ts by reading. Assuming tkPrepare2's reading is just before tk1's BEGIN, + // we expect that tk1 have startTS == tkPrepare2.startTS + 1 so that the tk1.startTS == TiKV's min_commit_ts. + tkPrepare2.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 10")) + tk1.MustExec("begin optimistic") + + require.NoError(t, failpoint.Disable("tikvclient/beforePrewrite")) + select { + case <-tkPrepapre1Ch: + case <-time.After(time.Second): + require.Fail(t, "tkPrepare1 not resumed after unsetting failpoint") + } + + var err error + lastCommitTS, err = strconv.ParseUint(tkPrepare1.MustQuery("select json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows()[0][0].(string), 10, 64) + require.NoError(t, err) + currentStartTS, err := strconv.ParseUint(tk1.MustQuery("select @@tidb_current_ts").Rows()[0][0].(string), 10, 64) + require.NoError(t, err) + if currentStartTS == lastCommitTS { + break + } + // Abandon and retry. + tk1.MustExec("rollback") + if constructionIters >= 1000 { + require.Fail(t, "failed to construct the ts collision situation of async commit transaction") + } + } + + // Now tk1 is in a transaction whose start ts collides with the commit ts of a previously committed transaction + // that has written row 1. The ts is in variable `lastCommitTS`. + + tk1.MustExec("update t set v = v + 100 where id = 1") + tk1.MustExec("update t set v = v + 100 where id = 2") + tk1.MustExec("update t2 set v = v + 1 where id = 1") + + // We will construct the following committing procedure for transaction in tk1: + // 1. Successfully prewrites all keys but fail to receive the response of the request that prewrites the primary + // (by simulating RPC error); + // 2. tk2 tries to access keys that were already locked by tk1, and performs resolve-locks. When the issue exists, + // the primary may be rolled back without any rollback record. + // 3. tk1 continues and retries prewriting the primary. In normal cases, it should not succeed as the transaction + // should have been rolled back by tk2's resolve-locks operation, but it succeeds in the issue. + // To simulate the procedure for tk1's commit procedure, we use the onRPCFinishedHook failpoint, and inject a hook + // when committing that makes the first prewrite on tk1's primary fail, and blocks until signaled by the channel + // `continueCommittingSignalCh`. + + require.NoError(t, failpoint.Enable("tikvclient/twoPCShortLockTTL", "return")) + require.NoError(t, failpoint.Enable("tikvclient/doNotKeepAlive", "return")) + require.NoError(t, failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", "return")) + require.NoError(t, failpoint.Enable("tikvclient/onRPCFinishedHook", "return")) + + defer func() { + require.NoError(t, failpoint.Disable("tikvclient/twoPCShortLockTTL")) + require.NoError(t, failpoint.Disable("tikvclient/doNotKeepAlive")) + require.NoError(t, failpoint.Disable("tikvclient/twoPCRequestBatchSizeLimit")) + require.NoError(t, failpoint.Disable("tikvclient/onRPCFinishedHook")) + }() + + continueCommittingSignalCh := make(chan struct{}) + + primaryReqCount := 0 + onRPCFinishedHook := func(req *tikvrpc.Request, resp *tikvrpc.Response, err error) (*tikvrpc.Response, error) { + if req.Type == tikvrpc.CmdPrewrite { + prewriteReq := req.Prewrite() + // The failpoint "twoPCRequestBatchSizeLimit" must takes effect + require.Equal(t, 1, len(prewriteReq.GetMutations())) + if prewriteReq.GetStartVersion() == lastCommitTS && + bytes.Equal(prewriteReq.GetMutations()[0].Key, prewriteReq.PrimaryLock) && + isRequestHandled(resp, err) { + primaryReqCount++ + if primaryReqCount == 1 { + // Block until signaled + <-continueCommittingSignalCh + // Simulate RPC failure (but TiKV successfully handled the request) for the first attempt + return nil, errors.New("injected rpc error in onRPCFinishedHook") + } + } + } + return resp, err + } + + ctxWithHook := context.WithValue(context.Background(), "onRPCFinishedHook", onRPCFinishedHook) + + resCh := make(chan error) + go func() { + _, err := tk1.ExecWithContext(ctxWithHook, "commit") + resCh <- err + }() + // tk1 must be blocked by the hook function. + select { + case err := <-resCh: + require.Fail(t, "tk1 not blocked, result: "+fmt.Sprintf("%+q", err)) + case <-time.After(time.Millisecond * 50): + } + + // tk2 conflicts with tk1 and rolls back tk1 by resolving locks. + tk2.MustExec("update t set v = v + 1 where id = 2") + tk2.MustExec("insert into t2 values (2, 11)") + + // tk1 must still be blocked + select { + case err := <-resCh: + require.Fail(t, "tk1 not blocked, result: "+fmt.Sprintf("%+q", err)) + case <-time.After(time.Millisecond * 50): + } + + // Signal tk1 to continue (retry the prewrite request and continue). + close(continueCommittingSignalCh) + + var err error + select { + case err = <-resCh: + case <-time.After(time.Second): + require.Fail(t, "tk1 not resumed") + } + + require.Error(t, err) + require.Equal(t, errno.ErrWriteConflict, int(errors.Cause(err).(*errors.Error).Code())) + tk2.MustQuery("select * from t order by id").Check(testkit.Rows("1 11", "2 21")) + tk2.MustExec("admin check table t2") + tk2.MustQuery("select * from t2 order by id").Check(testkit.Rows("1 10", "2 11")) +} +>>>>>>> 3da9e0489b7 (txn: Add test case to cover the case that check_txn_status on primary lock of primary txn may affect transaction correctness (#51667))