Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: Add test case to cover the case that check_txn_status on primary lock of primary txn may affect transaction correctness (#51667) #52538

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions tests/realtikvtest/txntest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_test(
flaky = True,
race = "on",
deps = [
<<<<<<< HEAD
"//expression",
"//kv",
"//parser",
Expand All @@ -20,8 +21,22 @@ go_test(
"//testkit",
"//tests/realtikvtest",
"//util",
=======
"//pkg/config",
"//pkg/errno",
"//pkg/expression",
"//pkg/kv",
"//pkg/parser",
"//pkg/session/txninfo",
"//pkg/store/driver",
"//pkg/testkit",
"//pkg/util",
"//tests/realtikvtest",
"@com_github_pingcap_errors//:errors",
>>>>>>> 3da9e0489b7 (txn: Add test case to cover the case that check_txn_status on primary lock of primary txn may affect transaction correctness (#51667))
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikvrpc",
"@io_opencensus_go//stats/view",
],
)
289 changes: 289 additions & 0 deletions tests/realtikvtest/txntest/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,28 @@
package txntest

import (
"bytes"
"context"
"fmt"
"strconv"
"testing"

<<<<<<< HEAD
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/testkit"
=======
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/testkit"
>>>>>>> 3da9e0489b7 (txn: Add test case to cover the case that check_txn_status on primary lock of primary txn may affect transaction correctness (#51667))
"github.com/pingcap/tidb/tests/realtikvtest"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikvrpc"
)

func TestInTxnPSProtoPointGet(t *testing.T) {
Expand Down Expand Up @@ -251,3 +264,279 @@ func TestAssertionWhenPessimisticLockLost(t *testing.T) {
err := tk1.ExecToErr("commit")
require.NotContains(t, err.Error(), "assertion")
}
<<<<<<< HEAD
=======

func TestSelectLockForPartitionTable(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk1 := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)

tk1.MustExec("use test")
tk1.MustExec("create table t(a int, b int, c int, key idx(a, b, c)) PARTITION BY HASH (c) PARTITIONS 10")
tk1.MustExec("insert into t values (1, 1, 1), (2, 2, 2), (3, 3, 3)")
tk1.MustExec("analyze table t")
tk1.MustExec("begin")
tk1.MustHavePlan("select * from t use index(idx) where a = 1 and b = 1 order by a limit 1 for update", "IndexLookUp")
tk1.MustExec("select * from t use index(idx) where a = 1 and b = 1 order by a limit 1 for update")
ch := make(chan bool, 1)
go func() {
tk2.MustExec("use test")
tk2.MustExec("begin")
ch <- false
// block here, until tk1 finish
tk2.MustExec("select * from t use index(idx) where a = 1 and b = 1 order by a limit 1 for update")
ch <- true
}()

res := <-ch
// Sleep here to make sure SelectLock stmt is executed
time.Sleep(10 * time.Millisecond)

select {
case res = <-ch:
default:
}
require.False(t, res)

tk1.MustExec("commit")
// wait until tk2 finished
res = <-ch
require.True(t, res)
}

func TestTxnEntrySizeLimit(t *testing.T) {
store := testkit.CreateMockStore(t)
tk1 := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk2.MustExec("use test")
tk1.MustExec("create table t (a int, b longtext)")

// cannot insert a large entry by default
tk1.MustContainErrMsg("insert into t values (1, repeat('a', 7340032))", "[kv:8025]entry too large, the max entry size is 6291456")

// increase the entry size limit allow user write large entries
tk1.MustExec("set session tidb_txn_entry_size_limit=8388608")
tk1.MustExec("insert into t values (1, repeat('a', 7340032))")
tk1.MustContainErrMsg("insert into t values (1, repeat('a', 9427968))", "[kv:8025]entry too large, the max entry size is 8388608")

// update session var does not affect other sessions
tk2.MustContainErrMsg("insert into t values (1, repeat('a', 7340032))", "[kv:8025]entry too large, the max entry size is 6291456")
tk3 := testkit.NewTestKit(t, store)
tk3.MustExec("use test")
tk3.MustContainErrMsg("insert into t values (1, repeat('a', 7340032))", "[kv:8025]entry too large, the max entry size is 6291456")

// update session var does not affect internal session used by ddl backfilling
tk1.MustContainErrMsg("alter table t modify column a varchar(255)", "[kv:8025]entry too large, the max entry size is 6291456")

// update global var allows ddl backfilling write large entries
tk1.MustExec("set global tidb_txn_entry_size_limit=8388608")
tk1.MustExec("alter table t modify column a varchar(255)")
tk2.MustExec("alter table t modify column a int")

// update global var does not affect existing sessions
tk2.MustContainErrMsg("insert into t values (1, repeat('a', 7340032))", "[kv:8025]entry too large, the max entry size is 6291456")
tk3.MustContainErrMsg("insert into t values (1, repeat('a', 7340032))", "[kv:8025]entry too large, the max entry size is 6291456")

// update global var affects new sessions
tk4 := testkit.NewTestKit(t, store)
tk4.MustExec("use test")
tk4.MustExec("insert into t values (2, repeat('b', 7340032))")

// reset global var to default
tk1.MustExec("set global tidb_txn_entry_size_limit=0")
tk1.MustContainErrMsg("alter table t modify column a varchar(255)", "[kv:8025]entry too large, the max entry size is 6291456")
tk2.MustContainErrMsg("alter table t modify column a varchar(255)", "[kv:8025]entry too large, the max entry size is 6291456")
tk3.MustContainErrMsg("alter table t modify column a varchar(255)", "[kv:8025]entry too large, the max entry size is 6291456")
tk4.MustContainErrMsg("alter table t modify column a varchar(255)", "[kv:8025]entry too large, the max entry size is 6291456")

// reset session var to default
tk1.MustExec("insert into t values (3, repeat('c', 7340032))")
tk1.MustExec("set session tidb_txn_entry_size_limit=0")
tk1.MustContainErrMsg("insert into t values (1, repeat('a', 7340032))", "[kv:8025]entry too large, the max entry size is 6291456")
}

func TestCheckTxnStatusOnOptimisticTxnBreakConsistency(t *testing.T) {
// This test case overs the issue #51666 (tikv#16620).
if !*realtikvtest.WithRealTiKV {
t.Skip("skip due to not supporting mock storage")
}

// Allow async commit
defer config.RestoreFunc()()
config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.AsyncCommit.SafeWindow = 500 * time.Millisecond
conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0
})

// A helper function to determine whether a KV RPC request is handled on TiKV without RPC error or region error.
isRequestHandled := func(resp *tikvrpc.Response, err error) bool {
if err != nil || resp == nil {
return false
}

regionErr, err := resp.GetRegionError()
if err != nil || regionErr != nil {
return false
}

return true
}

store := realtikvtest.CreateMockStoreAndSetup(t)
tkPrepare1 := testkit.NewTestKit(t, store)
tkPrepare2 := testkit.NewTestKit(t, store)
tk1 := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tkPrepare1.MustExec("use test")
tkPrepare2.MustExec("use test")
tk1.MustExec("use test")
tk2.MustExec("use test")

tk1.MustExec("create table t (id int primary key, v int)")
tk1.MustExec("insert into t values (1, 10), (2, 20)")
// Table t2 for revealing the possibility that the issue causing data-index inconsistency.
tk1.MustExec("create table t2 (id int primary key, v int unique)")
tk1.MustExec("insert into t2 values (1, 10)")

tkPrepare1.MustExec("set @@tidb_enable_async_commit = 1")
tk1.MustExec("set @@tidb_enable_async_commit = 0")

// Prepare a ts collision (currentTxn.StartTS == lastTxn.CommitTS on the same key).
// Loop until we successfully prepare one.
var lastCommitTS uint64
for constructionIters := 0; ; constructionIters++ {
// Reset the value which might have been updated in the previous attempt.
tkPrepare1.MustExec("update t set v = 10 where id = 1")

// Update row 1 in async commit mode
require.NoError(t, failpoint.Enable("tikvclient/beforePrewrite", "pause"))
tkPrepapre1Ch := make(chan struct{})
go func() {
tkPrepare1.MustExec("update t set v = v + 1 where id = 1")
tkPrepapre1Ch <- struct{}{}
}()

// tkPrepare2 Updates TiKV's max_ts by reading. Assuming tkPrepare2's reading is just before tk1's BEGIN,
// we expect that tk1 have startTS == tkPrepare2.startTS + 1 so that the tk1.startTS == TiKV's min_commit_ts.
tkPrepare2.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 10"))
tk1.MustExec("begin optimistic")

require.NoError(t, failpoint.Disable("tikvclient/beforePrewrite"))
select {
case <-tkPrepapre1Ch:
case <-time.After(time.Second):
require.Fail(t, "tkPrepare1 not resumed after unsetting failpoint")
}

var err error
lastCommitTS, err = strconv.ParseUint(tkPrepare1.MustQuery("select json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows()[0][0].(string), 10, 64)
require.NoError(t, err)
currentStartTS, err := strconv.ParseUint(tk1.MustQuery("select @@tidb_current_ts").Rows()[0][0].(string), 10, 64)
require.NoError(t, err)
if currentStartTS == lastCommitTS {
break
}
// Abandon and retry.
tk1.MustExec("rollback")
if constructionIters >= 1000 {
require.Fail(t, "failed to construct the ts collision situation of async commit transaction")
}
}

// Now tk1 is in a transaction whose start ts collides with the commit ts of a previously committed transaction
// that has written row 1. The ts is in variable `lastCommitTS`.

tk1.MustExec("update t set v = v + 100 where id = 1")
tk1.MustExec("update t set v = v + 100 where id = 2")
tk1.MustExec("update t2 set v = v + 1 where id = 1")

// We will construct the following committing procedure for transaction in tk1:
// 1. Successfully prewrites all keys but fail to receive the response of the request that prewrites the primary
// (by simulating RPC error);
// 2. tk2 tries to access keys that were already locked by tk1, and performs resolve-locks. When the issue exists,
// the primary may be rolled back without any rollback record.
// 3. tk1 continues and retries prewriting the primary. In normal cases, it should not succeed as the transaction
// should have been rolled back by tk2's resolve-locks operation, but it succeeds in the issue.
// To simulate the procedure for tk1's commit procedure, we use the onRPCFinishedHook failpoint, and inject a hook
// when committing that makes the first prewrite on tk1's primary fail, and blocks until signaled by the channel
// `continueCommittingSignalCh`.

require.NoError(t, failpoint.Enable("tikvclient/twoPCShortLockTTL", "return"))
require.NoError(t, failpoint.Enable("tikvclient/doNotKeepAlive", "return"))
require.NoError(t, failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", "return"))
require.NoError(t, failpoint.Enable("tikvclient/onRPCFinishedHook", "return"))

defer func() {
require.NoError(t, failpoint.Disable("tikvclient/twoPCShortLockTTL"))
require.NoError(t, failpoint.Disable("tikvclient/doNotKeepAlive"))
require.NoError(t, failpoint.Disable("tikvclient/twoPCRequestBatchSizeLimit"))
require.NoError(t, failpoint.Disable("tikvclient/onRPCFinishedHook"))
}()

continueCommittingSignalCh := make(chan struct{})

primaryReqCount := 0
onRPCFinishedHook := func(req *tikvrpc.Request, resp *tikvrpc.Response, err error) (*tikvrpc.Response, error) {
if req.Type == tikvrpc.CmdPrewrite {
prewriteReq := req.Prewrite()
// The failpoint "twoPCRequestBatchSizeLimit" must takes effect
require.Equal(t, 1, len(prewriteReq.GetMutations()))
if prewriteReq.GetStartVersion() == lastCommitTS &&
bytes.Equal(prewriteReq.GetMutations()[0].Key, prewriteReq.PrimaryLock) &&
isRequestHandled(resp, err) {
primaryReqCount++
if primaryReqCount == 1 {
// Block until signaled
<-continueCommittingSignalCh
// Simulate RPC failure (but TiKV successfully handled the request) for the first attempt
return nil, errors.New("injected rpc error in onRPCFinishedHook")
}
}
}
return resp, err
}

ctxWithHook := context.WithValue(context.Background(), "onRPCFinishedHook", onRPCFinishedHook)

resCh := make(chan error)
go func() {
_, err := tk1.ExecWithContext(ctxWithHook, "commit")
resCh <- err
}()
// tk1 must be blocked by the hook function.
select {
case err := <-resCh:
require.Fail(t, "tk1 not blocked, result: "+fmt.Sprintf("%+q", err))
case <-time.After(time.Millisecond * 50):
}

// tk2 conflicts with tk1 and rolls back tk1 by resolving locks.
tk2.MustExec("update t set v = v + 1 where id = 2")
tk2.MustExec("insert into t2 values (2, 11)")

// tk1 must still be blocked
select {
case err := <-resCh:
require.Fail(t, "tk1 not blocked, result: "+fmt.Sprintf("%+q", err))
case <-time.After(time.Millisecond * 50):
}

// Signal tk1 to continue (retry the prewrite request and continue).
close(continueCommittingSignalCh)

var err error
select {
case err = <-resCh:
case <-time.After(time.Second):
require.Fail(t, "tk1 not resumed")
}

require.Error(t, err)
require.Equal(t, errno.ErrWriteConflict, int(errors.Cause(err).(*errors.Error).Code()))
tk2.MustQuery("select * from t order by id").Check(testkit.Rows("1 11", "2 21"))
tk2.MustExec("admin check table t2")
tk2.MustQuery("select * from t2 order by id").Check(testkit.Rows("1 10", "2 11"))
}
>>>>>>> 3da9e0489b7 (txn: Add test case to cover the case that check_txn_status on primary lock of primary txn may affect transaction correctness (#51667))
Loading