Skip to content

Commit

Permalink
Export commitTS of KVTxn (#1489)
Browse files Browse the repository at this point in the history
 

Signed-off-by: Bin Zhang <[email protected]>
  • Loading branch information
b6g authored Nov 27, 2024
1 parent 05d115b commit 89643b0
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 38 deletions.
20 changes: 10 additions & 10 deletions integration_tests/1pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ func (s *testOnePCSuite) Test1PC() {
s.Equal(txn.GetCommitter().GetOnePCCommitTS(), txn.GetCommitter().GetCommitTS())
s.Greater(txn.GetCommitter().GetOnePCCommitTS(), txn.StartTS())
// Check keys are committed with the same version
s.mustGetFromSnapshot(txn.GetCommitTS(), k3, v3)
s.mustGetFromSnapshot(txn.GetCommitTS(), k4, v4)
s.mustGetFromSnapshot(txn.GetCommitTS(), k5, v5)
s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k3)
s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k4)
s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k5)
s.mustGetFromSnapshot(txn.CommitTS(), k3, v3)
s.mustGetFromSnapshot(txn.CommitTS(), k4, v4)
s.mustGetFromSnapshot(txn.CommitTS(), k5, v5)
s.mustGetNoneFromSnapshot(txn.CommitTS()-1, k3)
s.mustGetNoneFromSnapshot(txn.CommitTS()-1, k4)
s.mustGetNoneFromSnapshot(txn.CommitTS()-1, k5)

// Overwriting in MVCC
v5New := []byte("v5new")
Expand All @@ -129,8 +129,8 @@ func (s *testOnePCSuite) Test1PC() {
s.True(txn.GetCommitter().IsOnePC())
s.Equal(txn.GetCommitter().GetOnePCCommitTS(), txn.GetCommitter().GetCommitTS())
s.Greater(txn.GetCommitter().GetOnePCCommitTS(), txn.StartTS())
s.mustGetFromSnapshot(txn.GetCommitTS(), k5, v5New)
s.mustGetFromSnapshot(txn.GetCommitTS()-1, k5, v5)
s.mustGetFromSnapshot(txn.CommitTS(), k5, v5New)
s.mustGetFromSnapshot(txn.CommitTS()-1, k5, v5)

// Check all keys
keys := [][]byte{k1, k2, k3, k4, k5}
Expand Down Expand Up @@ -175,8 +175,8 @@ func (s *testOnePCSuite) Test1PCIsolation() {
s.mustGetFromTxn(txn2, k, v1)
s.Nil(txn2.Rollback())

s.mustGetFromSnapshot(txn.GetCommitTS(), k, v2)
s.mustGetFromSnapshot(txn.GetCommitTS()-1, k, v1)
s.mustGetFromSnapshot(txn.CommitTS(), k, v2)
s.mustGetFromSnapshot(txn.CommitTS()-1, k, v1)
}

func (s *testOnePCSuite) Test1PCDisallowMultiRegion() {
Expand Down
24 changes: 12 additions & 12 deletions integration_tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1076,7 +1076,7 @@ func (s *testCommitterSuite) TestPessimisticLockAllowLockWithConflict() {
txn.StartAggressiveLocking()

s.Nil(txn0.Commit(context.Background()))
s.Greater(txn0.GetCommitTS(), txn.StartTS())
s.Greater(txn0.CommitTS(), txn.StartTS())

lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
if checkExistence {
Expand All @@ -1087,9 +1087,9 @@ func (s *testCommitterSuite) TestPessimisticLockAllowLockWithConflict() {
}
s.Nil(txn.LockKeys(context.Background(), lockCtx, key))

s.Equal(txn0.GetCommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Equal(txn0.CommitTS(), lockCtx.MaxLockedWithConflictTS)
v := lockCtx.Values[string(key)]
s.Equal(txn0.GetCommitTS(), v.LockedWithConflictTS)
s.Equal(txn0.CommitTS(), v.LockedWithConflictTS)
s.True(v.Exists)
s.Equal(value, v.Value)

Expand Down Expand Up @@ -1269,12 +1269,12 @@ func (s *testCommitterSuite) TestAggressiveLockingInsert() {
s.IsType(errors.Cause(err), &tikverr.ErrWriteConflict{})
lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
s.NoError(insertPessimisticLock(lockCtx, "k8"))
s.Equal(txn2.GetCommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Equal(txn2.GetCommitTS(), lockCtx.Values["k8"].LockedWithConflictTS)
s.Equal(txn2.CommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Equal(txn2.CommitTS(), lockCtx.Values["k8"].LockedWithConflictTS)
// Update forUpdateTS to simulate a pessimistic retry.
newForUpdateTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
s.Nil(err)
s.GreaterOrEqual(newForUpdateTS, txn2.GetCommitTS())
s.GreaterOrEqual(newForUpdateTS, txn2.CommitTS())
lockCtx = &kv.LockCtx{ForUpdateTS: newForUpdateTS, WaitStartTime: time.Now()}
mustAlreadyExist(insertPessimisticLock(lockCtx, "k7"))
s.NoError(insertPessimisticLock(lockCtx, "k8"))
Expand All @@ -1291,7 +1291,7 @@ func (s *testCommitterSuite) TestAggressiveLockingLockOnlyIfExists() {
txn0 := s.begin()
s.NoError(txn0.Set([]byte("k1"), []byte("v1")))
s.NoError(txn0.Commit(context.Background()))
txn0CommitTS := txn0.GetCommitTS()
txn0CommitTS := txn0.CommitTS()

txn.StartAggressiveLocking()
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now(), ReturnValues: true, LockOnlyIfExists: true}
Expand All @@ -1312,7 +1312,7 @@ func (s *testCommitterSuite) TestAggressiveLockingLockOnlyIfExists() {
txn0 = s.begin()
s.NoError(txn0.Delete([]byte("k1")))
s.NoError(txn0.Commit(context.Background()))
txn0CommitTS = txn0.GetCommitTS()
txn0CommitTS = txn0.CommitTS()

txn.StartAggressiveLocking()
lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now(), ReturnValues: true, LockOnlyIfExists: true}
Expand Down Expand Up @@ -1461,13 +1461,13 @@ func (s *testCommitterSuite) TestAggressiveLockingLoadValueOptionChanges() {
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2")))

if firstAttemptLockedWithConflict {
s.Equal(txn2.GetCommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Equal(txn2.GetCommitTS(), lockCtx.Values["k1"].LockedWithConflictTS)
s.Equal(txn2.GetCommitTS(), lockCtx.Values["k2"].LockedWithConflictTS)
s.Equal(txn2.CommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Equal(txn2.CommitTS(), lockCtx.Values["k1"].LockedWithConflictTS)
s.Equal(txn2.CommitTS(), lockCtx.Values["k2"].LockedWithConflictTS)
}

if firstAttemptLockedWithConflict {
forUpdateTS = txn2.GetCommitTS() + 1
forUpdateTS = txn2.CommitTS() + 1
} else {
forUpdateTS++
}
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/assertion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *testAssertionSuite) testAssertionImpl(keyPrefix string, pessimistic boo
err = prepareTxn.Commit(context.Background())
s.Nil(err)
prepareStartTS := prepareTxn.GetCommitter().GetStartTS()
prepareCommitTS := prepareTxn.GetCommitTS()
prepareCommitTS := prepareTxn.CommitTS()

// A helper to perform a complete transaction. When multiple keys are passed in, assertion will be set on only
// the last key.
Expand Down
6 changes: 3 additions & 3 deletions integration_tests/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (s *testAsyncCommitCommon) putKV(key, value []byte, enableAsyncCommit bool)
s.Nil(err)
err = txn.Commit(context.Background())
s.Nil(err)
return txn.StartTS(), txn.GetCommitTS()
return txn.StartTS(), txn.CommitTS()
}

func (s *testAsyncCommitCommon) mustGetFromTxn(txn transaction.TxnProbe, key, expectedValue []byte) {
Expand Down Expand Up @@ -440,8 +440,8 @@ func (s *testAsyncCommitSuite) TestAsyncCommitLinearizability() {
s.Nil(err)
err = t1.Commit(ctx)
s.Nil(err)
commitTS1 := t1.GetCommitTS()
commitTS2 := t2.GetCommitTS()
commitTS1 := t1.CommitTS()
commitTS2 := t2.CommitTS()
s.Less(commitTS2, commitTS1)
}

Expand Down
2 changes: 1 addition & 1 deletion integration_tests/isolation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (s *testIsolationSuite) SetWithRetry(k, v []byte) writeRecord {
if err == nil {
return writeRecord{
startTS: txn.StartTS(),
commitTS: txn.GetCommitTS(),
commitTS: txn.CommitTS(),
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions integration_tests/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (s *testLockSuite) putKV(key, value []byte) (uint64, uint64) {
s.Nil(err)
err = txn.Commit(context.Background())
s.Nil(err)
return txn.StartTS(), txn.GetCommitTS()
return txn.StartTS(), txn.CommitTS()
}

func (s *testLockSuite) prepareAlphabetLocks() {
Expand Down Expand Up @@ -1311,7 +1311,7 @@ func (s *testLockWithTiKVSuite) TestPrewriteCheckForUpdateTS() {
info := simulatedTxn.GetAggressiveLockingKeysInfo()
s.Equal(1, len(info))
s.Equal(k1, info[0].Key())
s.Equal(txn2.GetCommitTS(), info[0].ActualLockForUpdateTS())
s.Equal(txn2.CommitTS(), info[0].ActualLockForUpdateTS())

simulatedTxn.DoneAggressiveLocking(context.Background())
defer func() {
Expand All @@ -1324,12 +1324,12 @@ func (s *testLockWithTiKVSuite) TestPrewriteCheckForUpdateTS() {
s.Error(err)
s.Regexp("[pP]essimistic ?[lL]ock ?[nN]ot ?[fF]ound", err.Error())

snapshot := s.store.GetSnapshot(txn2.GetCommitTS())
snapshot := s.store.GetSnapshot(txn2.CommitTS())
v, err := snapshot.Get(context.Background(), k1)
s.NoError(err)
s.Equal(v2, v)

snapshot = s.store.GetSnapshot(txn2.GetCommitTS() - 1)
snapshot = s.store.GetSnapshot(txn2.CommitTS() - 1)
_, err = snapshot.Get(context.Background(), k1)
s.Equal(tikverr.ErrNotExist, err)

Expand Down Expand Up @@ -1369,15 +1369,15 @@ func (s *testLockWithTiKVSuite) TestPrewriteCheckForUpdateTS() {

s.NoError(txn.Commit(ctx))

snapshot = s.store.GetSnapshot(txn.GetCommitTS())
snapshot = s.store.GetSnapshot(txn.CommitTS())
v, err = snapshot.Get(context.Background(), k2)
s.NoError(err)
s.Equal(v1, v)
v, err = snapshot.Get(context.Background(), k3)
s.NoError(err)
s.Equal(v1, v)

snapshot = s.store.GetSnapshot(txn.GetCommitTS() - 1)
snapshot = s.store.GetSnapshot(txn.CommitTS() - 1)
v, err = snapshot.Get(context.Background(), k2)
s.NoError(err)
s.Equal(v2, v)
Expand Down
5 changes: 0 additions & 5 deletions txnkv/transaction/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ func (txn TxnProbe) SetStartTS(ts uint64) {
txn.startTS = ts
}

// GetCommitTS returns the commit ts.
func (txn TxnProbe) GetCommitTS() uint64 {
return txn.commitTS
}

// GetUnionStore returns transaction's embedded unionstore.
func (txn TxnProbe) GetUnionStore() *unionstore.KVUnionStore {
return txn.us
Expand Down
5 changes: 5 additions & 0 deletions txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1704,6 +1704,11 @@ func (txn *KVTxn) StartTS() uint64 {
return txn.startTS
}

// CommitTS returns the commit timestamp of the already committed transaction, or zero if it's not committed yet.
func (txn *KVTxn) CommitTS() uint64 {
return txn.commitTS
}

// Valid returns if the transaction is valid.
// A transaction become invalid after commit or rollback.
func (txn *KVTxn) Valid() bool {
Expand Down

0 comments on commit 89643b0

Please sign in to comment.