From 89643b0e8c9ee8cbd0fce4f264a8765c5665db17 Mon Sep 17 00:00:00 2001 From: Bin Zhang <42753323+b6g@users.noreply.github.com> Date: Tue, 26 Nov 2024 23:15:26 -0800 Subject: [PATCH] Export commitTS of KVTxn (#1489) Signed-off-by: Bin Zhang --- integration_tests/1pc_test.go | 20 ++++++++++---------- integration_tests/2pc_test.go | 24 ++++++++++++------------ integration_tests/assertion_test.go | 2 +- integration_tests/async_commit_test.go | 6 +++--- integration_tests/isolation_test.go | 2 +- integration_tests/lock_test.go | 12 ++++++------ txnkv/transaction/test_probe.go | 5 ----- txnkv/transaction/txn.go | 5 +++++ 8 files changed, 38 insertions(+), 38 deletions(-) diff --git a/integration_tests/1pc_test.go b/integration_tests/1pc_test.go index de4db18d2d..c69bbdbcfe 100644 --- a/integration_tests/1pc_test.go +++ b/integration_tests/1pc_test.go @@ -112,12 +112,12 @@ func (s *testOnePCSuite) Test1PC() { s.Equal(txn.GetCommitter().GetOnePCCommitTS(), txn.GetCommitter().GetCommitTS()) s.Greater(txn.GetCommitter().GetOnePCCommitTS(), txn.StartTS()) // Check keys are committed with the same version - s.mustGetFromSnapshot(txn.GetCommitTS(), k3, v3) - s.mustGetFromSnapshot(txn.GetCommitTS(), k4, v4) - s.mustGetFromSnapshot(txn.GetCommitTS(), k5, v5) - s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k3) - s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k4) - s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k5) + s.mustGetFromSnapshot(txn.CommitTS(), k3, v3) + s.mustGetFromSnapshot(txn.CommitTS(), k4, v4) + s.mustGetFromSnapshot(txn.CommitTS(), k5, v5) + s.mustGetNoneFromSnapshot(txn.CommitTS()-1, k3) + s.mustGetNoneFromSnapshot(txn.CommitTS()-1, k4) + s.mustGetNoneFromSnapshot(txn.CommitTS()-1, k5) // Overwriting in MVCC v5New := []byte("v5new") @@ -129,8 +129,8 @@ func (s *testOnePCSuite) Test1PC() { s.True(txn.GetCommitter().IsOnePC()) s.Equal(txn.GetCommitter().GetOnePCCommitTS(), txn.GetCommitter().GetCommitTS()) s.Greater(txn.GetCommitter().GetOnePCCommitTS(), txn.StartTS()) - s.mustGetFromSnapshot(txn.GetCommitTS(), k5, v5New) - s.mustGetFromSnapshot(txn.GetCommitTS()-1, k5, v5) + s.mustGetFromSnapshot(txn.CommitTS(), k5, v5New) + s.mustGetFromSnapshot(txn.CommitTS()-1, k5, v5) // Check all keys keys := [][]byte{k1, k2, k3, k4, k5} @@ -175,8 +175,8 @@ func (s *testOnePCSuite) Test1PCIsolation() { s.mustGetFromTxn(txn2, k, v1) s.Nil(txn2.Rollback()) - s.mustGetFromSnapshot(txn.GetCommitTS(), k, v2) - s.mustGetFromSnapshot(txn.GetCommitTS()-1, k, v1) + s.mustGetFromSnapshot(txn.CommitTS(), k, v2) + s.mustGetFromSnapshot(txn.CommitTS()-1, k, v1) } func (s *testOnePCSuite) Test1PCDisallowMultiRegion() { diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index 08de42e0a7..fec8cfb6e5 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -1076,7 +1076,7 @@ func (s *testCommitterSuite) TestPessimisticLockAllowLockWithConflict() { txn.StartAggressiveLocking() s.Nil(txn0.Commit(context.Background())) - s.Greater(txn0.GetCommitTS(), txn.StartTS()) + s.Greater(txn0.CommitTS(), txn.StartTS()) lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} if checkExistence { @@ -1087,9 +1087,9 @@ func (s *testCommitterSuite) TestPessimisticLockAllowLockWithConflict() { } s.Nil(txn.LockKeys(context.Background(), lockCtx, key)) - s.Equal(txn0.GetCommitTS(), lockCtx.MaxLockedWithConflictTS) + s.Equal(txn0.CommitTS(), lockCtx.MaxLockedWithConflictTS) v := lockCtx.Values[string(key)] - s.Equal(txn0.GetCommitTS(), v.LockedWithConflictTS) + s.Equal(txn0.CommitTS(), v.LockedWithConflictTS) s.True(v.Exists) s.Equal(value, v.Value) @@ -1269,12 +1269,12 @@ func (s *testCommitterSuite) TestAggressiveLockingInsert() { s.IsType(errors.Cause(err), &tikverr.ErrWriteConflict{}) lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} s.NoError(insertPessimisticLock(lockCtx, "k8")) - s.Equal(txn2.GetCommitTS(), lockCtx.MaxLockedWithConflictTS) - s.Equal(txn2.GetCommitTS(), lockCtx.Values["k8"].LockedWithConflictTS) + s.Equal(txn2.CommitTS(), lockCtx.MaxLockedWithConflictTS) + s.Equal(txn2.CommitTS(), lockCtx.Values["k8"].LockedWithConflictTS) // Update forUpdateTS to simulate a pessimistic retry. newForUpdateTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) s.Nil(err) - s.GreaterOrEqual(newForUpdateTS, txn2.GetCommitTS()) + s.GreaterOrEqual(newForUpdateTS, txn2.CommitTS()) lockCtx = &kv.LockCtx{ForUpdateTS: newForUpdateTS, WaitStartTime: time.Now()} mustAlreadyExist(insertPessimisticLock(lockCtx, "k7")) s.NoError(insertPessimisticLock(lockCtx, "k8")) @@ -1291,7 +1291,7 @@ func (s *testCommitterSuite) TestAggressiveLockingLockOnlyIfExists() { txn0 := s.begin() s.NoError(txn0.Set([]byte("k1"), []byte("v1"))) s.NoError(txn0.Commit(context.Background())) - txn0CommitTS := txn0.GetCommitTS() + txn0CommitTS := txn0.CommitTS() txn.StartAggressiveLocking() lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now(), ReturnValues: true, LockOnlyIfExists: true} @@ -1312,7 +1312,7 @@ func (s *testCommitterSuite) TestAggressiveLockingLockOnlyIfExists() { txn0 = s.begin() s.NoError(txn0.Delete([]byte("k1"))) s.NoError(txn0.Commit(context.Background())) - txn0CommitTS = txn0.GetCommitTS() + txn0CommitTS = txn0.CommitTS() txn.StartAggressiveLocking() lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now(), ReturnValues: true, LockOnlyIfExists: true} @@ -1461,13 +1461,13 @@ func (s *testCommitterSuite) TestAggressiveLockingLoadValueOptionChanges() { s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2"))) if firstAttemptLockedWithConflict { - s.Equal(txn2.GetCommitTS(), lockCtx.MaxLockedWithConflictTS) - s.Equal(txn2.GetCommitTS(), lockCtx.Values["k1"].LockedWithConflictTS) - s.Equal(txn2.GetCommitTS(), lockCtx.Values["k2"].LockedWithConflictTS) + s.Equal(txn2.CommitTS(), lockCtx.MaxLockedWithConflictTS) + s.Equal(txn2.CommitTS(), lockCtx.Values["k1"].LockedWithConflictTS) + s.Equal(txn2.CommitTS(), lockCtx.Values["k2"].LockedWithConflictTS) } if firstAttemptLockedWithConflict { - forUpdateTS = txn2.GetCommitTS() + 1 + forUpdateTS = txn2.CommitTS() + 1 } else { forUpdateTS++ } diff --git a/integration_tests/assertion_test.go b/integration_tests/assertion_test.go index 73f901b73c..f5699d53e2 100644 --- a/integration_tests/assertion_test.go +++ b/integration_tests/assertion_test.go @@ -81,7 +81,7 @@ func (s *testAssertionSuite) testAssertionImpl(keyPrefix string, pessimistic boo err = prepareTxn.Commit(context.Background()) s.Nil(err) prepareStartTS := prepareTxn.GetCommitter().GetStartTS() - prepareCommitTS := prepareTxn.GetCommitTS() + prepareCommitTS := prepareTxn.CommitTS() // A helper to perform a complete transaction. When multiple keys are passed in, assertion will be set on only // the last key. diff --git a/integration_tests/async_commit_test.go b/integration_tests/async_commit_test.go index 5410984322..4c478f45a9 100644 --- a/integration_tests/async_commit_test.go +++ b/integration_tests/async_commit_test.go @@ -112,7 +112,7 @@ func (s *testAsyncCommitCommon) putKV(key, value []byte, enableAsyncCommit bool) s.Nil(err) err = txn.Commit(context.Background()) s.Nil(err) - return txn.StartTS(), txn.GetCommitTS() + return txn.StartTS(), txn.CommitTS() } func (s *testAsyncCommitCommon) mustGetFromTxn(txn transaction.TxnProbe, key, expectedValue []byte) { @@ -440,8 +440,8 @@ func (s *testAsyncCommitSuite) TestAsyncCommitLinearizability() { s.Nil(err) err = t1.Commit(ctx) s.Nil(err) - commitTS1 := t1.GetCommitTS() - commitTS2 := t2.GetCommitTS() + commitTS1 := t1.CommitTS() + commitTS2 := t2.CommitTS() s.Less(commitTS2, commitTS1) } diff --git a/integration_tests/isolation_test.go b/integration_tests/isolation_test.go index 254fb45088..f8fcadfb21 100644 --- a/integration_tests/isolation_test.go +++ b/integration_tests/isolation_test.go @@ -96,7 +96,7 @@ func (s *testIsolationSuite) SetWithRetry(k, v []byte) writeRecord { if err == nil { return writeRecord{ startTS: txn.StartTS(), - commitTS: txn.GetCommitTS(), + commitTS: txn.CommitTS(), } } } diff --git a/integration_tests/lock_test.go b/integration_tests/lock_test.go index 65d04263f4..0019027849 100644 --- a/integration_tests/lock_test.go +++ b/integration_tests/lock_test.go @@ -142,7 +142,7 @@ func (s *testLockSuite) putKV(key, value []byte) (uint64, uint64) { s.Nil(err) err = txn.Commit(context.Background()) s.Nil(err) - return txn.StartTS(), txn.GetCommitTS() + return txn.StartTS(), txn.CommitTS() } func (s *testLockSuite) prepareAlphabetLocks() { @@ -1311,7 +1311,7 @@ func (s *testLockWithTiKVSuite) TestPrewriteCheckForUpdateTS() { info := simulatedTxn.GetAggressiveLockingKeysInfo() s.Equal(1, len(info)) s.Equal(k1, info[0].Key()) - s.Equal(txn2.GetCommitTS(), info[0].ActualLockForUpdateTS()) + s.Equal(txn2.CommitTS(), info[0].ActualLockForUpdateTS()) simulatedTxn.DoneAggressiveLocking(context.Background()) defer func() { @@ -1324,12 +1324,12 @@ func (s *testLockWithTiKVSuite) TestPrewriteCheckForUpdateTS() { s.Error(err) s.Regexp("[pP]essimistic ?[lL]ock ?[nN]ot ?[fF]ound", err.Error()) - snapshot := s.store.GetSnapshot(txn2.GetCommitTS()) + snapshot := s.store.GetSnapshot(txn2.CommitTS()) v, err := snapshot.Get(context.Background(), k1) s.NoError(err) s.Equal(v2, v) - snapshot = s.store.GetSnapshot(txn2.GetCommitTS() - 1) + snapshot = s.store.GetSnapshot(txn2.CommitTS() - 1) _, err = snapshot.Get(context.Background(), k1) s.Equal(tikverr.ErrNotExist, err) @@ -1369,7 +1369,7 @@ func (s *testLockWithTiKVSuite) TestPrewriteCheckForUpdateTS() { s.NoError(txn.Commit(ctx)) - snapshot = s.store.GetSnapshot(txn.GetCommitTS()) + snapshot = s.store.GetSnapshot(txn.CommitTS()) v, err = snapshot.Get(context.Background(), k2) s.NoError(err) s.Equal(v1, v) @@ -1377,7 +1377,7 @@ func (s *testLockWithTiKVSuite) TestPrewriteCheckForUpdateTS() { s.NoError(err) s.Equal(v1, v) - snapshot = s.store.GetSnapshot(txn.GetCommitTS() - 1) + snapshot = s.store.GetSnapshot(txn.CommitTS() - 1) v, err = snapshot.Get(context.Background(), k2) s.NoError(err) s.Equal(v2, v) diff --git a/txnkv/transaction/test_probe.go b/txnkv/transaction/test_probe.go index b782a3a40c..7e0a582e01 100644 --- a/txnkv/transaction/test_probe.go +++ b/txnkv/transaction/test_probe.go @@ -38,11 +38,6 @@ func (txn TxnProbe) SetStartTS(ts uint64) { txn.startTS = ts } -// GetCommitTS returns the commit ts. -func (txn TxnProbe) GetCommitTS() uint64 { - return txn.commitTS -} - // GetUnionStore returns transaction's embedded unionstore. func (txn TxnProbe) GetUnionStore() *unionstore.KVUnionStore { return txn.us diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 4eaa841127..aeebceb1c9 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -1704,6 +1704,11 @@ func (txn *KVTxn) StartTS() uint64 { return txn.startTS } +// CommitTS returns the commit timestamp of the already committed transaction, or zero if it's not committed yet. +func (txn *KVTxn) CommitTS() uint64 { + return txn.commitTS +} + // Valid returns if the transaction is valid. // A transaction become invalid after commit or rollback. func (txn *KVTxn) Valid() bool {