Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into dev/optimize-send-req
Browse files Browse the repository at this point in the history
  • Loading branch information
zyguan committed Dec 18, 2024
2 parents 6bf98cf + 8e0275c commit e661596
Show file tree
Hide file tree
Showing 48 changed files with 1,321 additions and 485 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/compatibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ jobs:
- name: Checkout TiDB
uses: actions/checkout@v2
with:
repository: pingcap/tidb
repository: MyonKeminta/tidb
ref: m/update-ts-check
path: tidb

- name: Check build
Expand Down
20 changes: 10 additions & 10 deletions integration_tests/1pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ func (s *testOnePCSuite) Test1PC() {
s.Equal(txn.GetCommitter().GetOnePCCommitTS(), txn.GetCommitter().GetCommitTS())
s.Greater(txn.GetCommitter().GetOnePCCommitTS(), txn.StartTS())
// Check keys are committed with the same version
s.mustGetFromSnapshot(txn.GetCommitTS(), k3, v3)
s.mustGetFromSnapshot(txn.GetCommitTS(), k4, v4)
s.mustGetFromSnapshot(txn.GetCommitTS(), k5, v5)
s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k3)
s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k4)
s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k5)
s.mustGetFromSnapshot(txn.CommitTS(), k3, v3)
s.mustGetFromSnapshot(txn.CommitTS(), k4, v4)
s.mustGetFromSnapshot(txn.CommitTS(), k5, v5)
s.mustGetNoneFromSnapshot(txn.CommitTS()-1, k3)
s.mustGetNoneFromSnapshot(txn.CommitTS()-1, k4)
s.mustGetNoneFromSnapshot(txn.CommitTS()-1, k5)

// Overwriting in MVCC
v5New := []byte("v5new")
Expand All @@ -129,8 +129,8 @@ func (s *testOnePCSuite) Test1PC() {
s.True(txn.GetCommitter().IsOnePC())
s.Equal(txn.GetCommitter().GetOnePCCommitTS(), txn.GetCommitter().GetCommitTS())
s.Greater(txn.GetCommitter().GetOnePCCommitTS(), txn.StartTS())
s.mustGetFromSnapshot(txn.GetCommitTS(), k5, v5New)
s.mustGetFromSnapshot(txn.GetCommitTS()-1, k5, v5)
s.mustGetFromSnapshot(txn.CommitTS(), k5, v5New)
s.mustGetFromSnapshot(txn.CommitTS()-1, k5, v5)

// Check all keys
keys := [][]byte{k1, k2, k3, k4, k5}
Expand Down Expand Up @@ -175,8 +175,8 @@ func (s *testOnePCSuite) Test1PCIsolation() {
s.mustGetFromTxn(txn2, k, v1)
s.Nil(txn2.Rollback())

s.mustGetFromSnapshot(txn.GetCommitTS(), k, v2)
s.mustGetFromSnapshot(txn.GetCommitTS()-1, k, v1)
s.mustGetFromSnapshot(txn.CommitTS(), k, v2)
s.mustGetFromSnapshot(txn.CommitTS()-1, k, v1)
}

func (s *testOnePCSuite) Test1PCDisallowMultiRegion() {
Expand Down
218 changes: 206 additions & 12 deletions integration_tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1076,7 +1076,7 @@ func (s *testCommitterSuite) TestPessimisticLockAllowLockWithConflict() {
txn.StartAggressiveLocking()

s.Nil(txn0.Commit(context.Background()))
s.Greater(txn0.GetCommitTS(), txn.StartTS())
s.Greater(txn0.CommitTS(), txn.StartTS())

lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
if checkExistence {
Expand All @@ -1087,9 +1087,9 @@ func (s *testCommitterSuite) TestPessimisticLockAllowLockWithConflict() {
}
s.Nil(txn.LockKeys(context.Background(), lockCtx, key))

s.Equal(txn0.GetCommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Equal(txn0.CommitTS(), lockCtx.MaxLockedWithConflictTS)
v := lockCtx.Values[string(key)]
s.Equal(txn0.GetCommitTS(), v.LockedWithConflictTS)
s.Equal(txn0.CommitTS(), v.LockedWithConflictTS)
s.True(v.Exists)
s.Equal(value, v.Value)

Expand Down Expand Up @@ -1269,12 +1269,12 @@ func (s *testCommitterSuite) TestAggressiveLockingInsert() {
s.IsType(errors.Cause(err), &tikverr.ErrWriteConflict{})
lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
s.NoError(insertPessimisticLock(lockCtx, "k8"))
s.Equal(txn2.GetCommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Equal(txn2.GetCommitTS(), lockCtx.Values["k8"].LockedWithConflictTS)
s.Equal(txn2.CommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Equal(txn2.CommitTS(), lockCtx.Values["k8"].LockedWithConflictTS)
// Update forUpdateTS to simulate a pessimistic retry.
newForUpdateTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
s.Nil(err)
s.GreaterOrEqual(newForUpdateTS, txn2.GetCommitTS())
s.GreaterOrEqual(newForUpdateTS, txn2.CommitTS())
lockCtx = &kv.LockCtx{ForUpdateTS: newForUpdateTS, WaitStartTime: time.Now()}
mustAlreadyExist(insertPessimisticLock(lockCtx, "k7"))
s.NoError(insertPessimisticLock(lockCtx, "k8"))
Expand All @@ -1291,7 +1291,7 @@ func (s *testCommitterSuite) TestAggressiveLockingLockOnlyIfExists() {
txn0 := s.begin()
s.NoError(txn0.Set([]byte("k1"), []byte("v1")))
s.NoError(txn0.Commit(context.Background()))
txn0CommitTS := txn0.GetCommitTS()
txn0CommitTS := txn0.CommitTS()

txn.StartAggressiveLocking()
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now(), ReturnValues: true, LockOnlyIfExists: true}
Expand All @@ -1312,7 +1312,7 @@ func (s *testCommitterSuite) TestAggressiveLockingLockOnlyIfExists() {
txn0 = s.begin()
s.NoError(txn0.Delete([]byte("k1")))
s.NoError(txn0.Commit(context.Background()))
txn0CommitTS = txn0.GetCommitTS()
txn0CommitTS = txn0.CommitTS()

txn.StartAggressiveLocking()
lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now(), ReturnValues: true, LockOnlyIfExists: true}
Expand Down Expand Up @@ -1461,13 +1461,13 @@ func (s *testCommitterSuite) TestAggressiveLockingLoadValueOptionChanges() {
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2")))

if firstAttemptLockedWithConflict {
s.Equal(txn2.GetCommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Equal(txn2.GetCommitTS(), lockCtx.Values["k1"].LockedWithConflictTS)
s.Equal(txn2.GetCommitTS(), lockCtx.Values["k2"].LockedWithConflictTS)
s.Equal(txn2.CommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Equal(txn2.CommitTS(), lockCtx.Values["k1"].LockedWithConflictTS)
s.Equal(txn2.CommitTS(), lockCtx.Values["k2"].LockedWithConflictTS)
}

if firstAttemptLockedWithConflict {
forUpdateTS = txn2.GetCommitTS() + 1
forUpdateTS = txn2.CommitTS() + 1
} else {
forUpdateTS++
}
Expand Down Expand Up @@ -1520,6 +1520,200 @@ func (s *testCommitterSuite) TestAggressiveLockingExitIfInapplicable() {
s.NoError(txn.Rollback())
}

func (s *testCommitterSuite) TestAggressiveLockingResetTTLManager() {
// Not blocked
txn := s.begin()
txn.SetPessimistic(true)
txn.StartAggressiveLocking()
s.True(txn.IsInAggressiveLockingMode())
s.True(txn.GetCommitter().IsNil())

lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1")))
s.True(txn.GetCommitter().IsTTLRunning())

txn.CancelAggressiveLocking(context.Background())
s.False(txn.IsInAggressiveLockingMode())
s.False(txn.GetCommitter().IsTTLRunning())

// End the transaction to test the next case.
s.NoError(txn.Rollback())

// txn blocked by txn2
txn = s.begin()
txn.SetPessimistic(true)
txn.StartAggressiveLocking()
s.True(txn.IsInAggressiveLockingMode())
s.True(txn.GetCommitter().IsNil())

txn2 := s.begin()
txn2.SetPessimistic(true)
lockCtx2 := &kv.LockCtx{ForUpdateTS: txn2.StartTS(), WaitStartTime: time.Now()}
s.NoError(txn2.LockKeys(context.Background(), lockCtx2, []byte("k1")))

lockResCh := make(chan error)
go func() {
lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
lockResCh <- txn.LockKeys(context.Background(), lockCtx, []byte("k1"))
}()
// No immediate result as blocked by txn2
select {
case <-time.After(time.Millisecond * 100):
case err := <-lockResCh:
s.FailNowf("get lock result when expected to be blocked", "error: %+v", err)
}

s.NoError(txn2.Set([]byte("k1"), []byte("v1")))
s.NoError(txn2.Commit(context.Background()))

// txn is resumed
select {
case <-time.After(time.Second):
s.FailNow("txn not resumed after blocker is committed")
case err := <-lockResCh:
s.NoError(err)
}

s.Equal(txn2.CommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Greater(lockCtx.MaxLockedWithConflictTS, txn.StartTS())

s.True(txn.GetCommitter().IsTTLRunning())

txn.RetryAggressiveLocking(context.Background())
s.True(txn.GetCommitter().IsTTLRunning())

// Get a new ts as the new forUpdateTS.
forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
s.NoError(err)
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1")))
s.True(txn.GetCommitter().IsTTLRunning())

txn.CancelAggressiveLocking(context.Background())
s.False(txn.GetCommitter().IsTTLRunning())
s.Zero(txn.GetLockedCount())

// End the test.
s.NoError(txn.Rollback())
}

type aggressiveLockingExitPhase int

const (
exitOnEnterAggressiveLocking aggressiveLockingExitPhase = iota
exitOnFirstLockKeys
exitOnRetry
exitOnSecondLockKeys
)

func (s *testCommitterSuite) testAggressiveLockingResetPrimaryAndTTLManagerAfterExitImpl(done bool, exitPhase aggressiveLockingExitPhase, retryDifferentKey bool) {
s.T().Logf("testing subcase, done-or-cancel: %v, exitPhase: %v, retryDifferentKey: %v", done, exitPhase, retryDifferentKey)
txn := s.begin()
txn.SetPessimistic(true)
txn.StartAggressiveLocking()
s.True(txn.IsInAggressiveLockingMode())
s.True(txn.GetCommitter().IsNil())
defer func() {
s.NoError(txn.Rollback())
}()

if exitPhase == exitOnEnterAggressiveLocking {
if done {
txn.DoneAggressiveLocking(context.Background())
} else {
txn.CancelAggressiveLocking(context.Background())
}
s.False(txn.IsInAggressiveLockingMode())
s.Zero(txn.GetLockedCount())
s.True(txn.GetCommitter().IsNil())
return
}

lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1")))
s.True(txn.GetCommitter().IsTTLRunning())
s.Equal(1, txn.GetLockedCount())

if exitPhase == exitOnFirstLockKeys {
if done {
txn.DoneAggressiveLocking(context.Background())
s.True(txn.GetCommitter().IsTTLRunning())
s.Equal(1, txn.GetLockedCount())
} else {
txn.CancelAggressiveLocking(context.Background())
s.False(txn.GetCommitter().IsTTLRunning())
s.Zero(txn.GetLockedCount())
}
s.False(txn.IsInAggressiveLockingMode())
return
}

txn.RetryAggressiveLocking(context.Background())

if exitPhase == exitOnRetry {
if done {
txn.DoneAggressiveLocking(context.Background())
} else {
txn.CancelAggressiveLocking(context.Background())
}
s.False(txn.IsInAggressiveLockingMode())
s.Zero(txn.GetLockedCount())
s.False(txn.GetCommitter().IsTTLRunning())
return
}

forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
s.NoError(err)
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
key := []byte("k1")
if retryDifferentKey {
key = []byte("k2")
}
s.NoError(txn.LockKeys(context.Background(), lockCtx, key))
s.True(txn.GetCommitter().IsTTLRunning())
s.Equal(key, txn.GetCommitter().GetPrimaryKey())
expectedLockCount := 1
if retryDifferentKey {
// When lock k2 during retry, the previously-locked k1 is not immediately released, and it will be released when
// the fair locking state switches (either retry/cancel/done).
expectedLockCount = 2
}
s.Equal(expectedLockCount, txn.GetLockedCount())

if exitPhase == exitOnSecondLockKeys {
if done {
txn.DoneAggressiveLocking(context.Background())
s.True(txn.GetCommitter().IsTTLRunning())
s.Equal(1, txn.GetLockedCount())
} else {
txn.CancelAggressiveLocking(context.Background())
s.False(txn.GetCommitter().IsTTLRunning())
s.Zero(txn.GetLockedCount())
}
s.False(txn.IsInAggressiveLockingMode())
return
}

s.FailNow("unreachable")
}

func (s *testCommitterSuite) TestAggressiveLockingResetPrimaryAndTTLManagerAfterExit() {
// Done or cancel
for _, done := range []bool{false, true} {
// Iterate exiting phase
for _, exitPhase := range []aggressiveLockingExitPhase{
exitOnEnterAggressiveLocking,
exitOnFirstLockKeys,
exitOnRetry,
exitOnSecondLockKeys,
} {
for _, retryDifferentKey := range []bool{false, true} {
s.testAggressiveLockingResetPrimaryAndTTLManagerAfterExitImpl(done, exitPhase, retryDifferentKey)
}
}
}
}

// TestElapsedTTL tests that elapsed time is correct even if ts physical time is greater than local time.
func (s *testCommitterSuite) TestElapsedTTL() {
key := []byte("key")
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/assertion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *testAssertionSuite) testAssertionImpl(keyPrefix string, pessimistic boo
err = prepareTxn.Commit(context.Background())
s.Nil(err)
prepareStartTS := prepareTxn.GetCommitter().GetStartTS()
prepareCommitTS := prepareTxn.GetCommitTS()
prepareCommitTS := prepareTxn.CommitTS()

// A helper to perform a complete transaction. When multiple keys are passed in, assertion will be set on only
// the last key.
Expand Down
6 changes: 3 additions & 3 deletions integration_tests/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (s *testAsyncCommitCommon) putKV(key, value []byte, enableAsyncCommit bool)
s.Nil(err)
err = txn.Commit(context.Background())
s.Nil(err)
return txn.StartTS(), txn.GetCommitTS()
return txn.StartTS(), txn.CommitTS()
}

func (s *testAsyncCommitCommon) mustGetFromTxn(txn transaction.TxnProbe, key, expectedValue []byte) {
Expand Down Expand Up @@ -440,8 +440,8 @@ func (s *testAsyncCommitSuite) TestAsyncCommitLinearizability() {
s.Nil(err)
err = t1.Commit(ctx)
s.Nil(err)
commitTS1 := t1.GetCommitTS()
commitTS2 := t2.GetCommitTS()
commitTS1 := t1.CommitTS()
commitTS2 := t2.CommitTS()
s.Less(commitTS2, commitTS1)
}

Expand Down
Loading

0 comments on commit e661596

Please sign in to comment.