Skip to content

Commit

Permalink
Fix the problem that ttlManager may stop working if no need to lock a…
Browse files Browse the repository at this point in the history
…fter retry aggressive locking (#1522)

 

Signed-off-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta authored Dec 17, 2024
1 parent 43d2db7 commit 8e0275c
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 7 deletions.
194 changes: 194 additions & 0 deletions integration_tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1520,6 +1520,200 @@ func (s *testCommitterSuite) TestAggressiveLockingExitIfInapplicable() {
s.NoError(txn.Rollback())
}

func (s *testCommitterSuite) TestAggressiveLockingResetTTLManager() {
// Not blocked
txn := s.begin()
txn.SetPessimistic(true)
txn.StartAggressiveLocking()
s.True(txn.IsInAggressiveLockingMode())
s.True(txn.GetCommitter().IsNil())

lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1")))
s.True(txn.GetCommitter().IsTTLRunning())

txn.CancelAggressiveLocking(context.Background())
s.False(txn.IsInAggressiveLockingMode())
s.False(txn.GetCommitter().IsTTLRunning())

// End the transaction to test the next case.
s.NoError(txn.Rollback())

// txn blocked by txn2
txn = s.begin()
txn.SetPessimistic(true)
txn.StartAggressiveLocking()
s.True(txn.IsInAggressiveLockingMode())
s.True(txn.GetCommitter().IsNil())

txn2 := s.begin()
txn2.SetPessimistic(true)
lockCtx2 := &kv.LockCtx{ForUpdateTS: txn2.StartTS(), WaitStartTime: time.Now()}
s.NoError(txn2.LockKeys(context.Background(), lockCtx2, []byte("k1")))

lockResCh := make(chan error)
go func() {
lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
lockResCh <- txn.LockKeys(context.Background(), lockCtx, []byte("k1"))
}()
// No immediate result as blocked by txn2
select {
case <-time.After(time.Millisecond * 100):
case err := <-lockResCh:
s.FailNowf("get lock result when expected to be blocked", "error: %+v", err)
}

s.NoError(txn2.Set([]byte("k1"), []byte("v1")))
s.NoError(txn2.Commit(context.Background()))

// txn is resumed
select {
case <-time.After(time.Second):
s.FailNow("txn not resumed after blocker is committed")
case err := <-lockResCh:
s.NoError(err)
}

s.Equal(txn2.CommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Greater(lockCtx.MaxLockedWithConflictTS, txn.StartTS())

s.True(txn.GetCommitter().IsTTLRunning())

txn.RetryAggressiveLocking(context.Background())
s.True(txn.GetCommitter().IsTTLRunning())

// Get a new ts as the new forUpdateTS.
forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
s.NoError(err)
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1")))
s.True(txn.GetCommitter().IsTTLRunning())

txn.CancelAggressiveLocking(context.Background())
s.False(txn.GetCommitter().IsTTLRunning())
s.Zero(txn.GetLockedCount())

// End the test.
s.NoError(txn.Rollback())
}

type aggressiveLockingExitPhase int

const (
exitOnEnterAggressiveLocking aggressiveLockingExitPhase = iota
exitOnFirstLockKeys
exitOnRetry
exitOnSecondLockKeys
)

func (s *testCommitterSuite) testAggressiveLockingResetPrimaryAndTTLManagerAfterExitImpl(done bool, exitPhase aggressiveLockingExitPhase, retryDifferentKey bool) {
s.T().Logf("testing subcase, done-or-cancel: %v, exitPhase: %v, retryDifferentKey: %v", done, exitPhase, retryDifferentKey)
txn := s.begin()
txn.SetPessimistic(true)
txn.StartAggressiveLocking()
s.True(txn.IsInAggressiveLockingMode())
s.True(txn.GetCommitter().IsNil())
defer func() {
s.NoError(txn.Rollback())
}()

if exitPhase == exitOnEnterAggressiveLocking {
if done {
txn.DoneAggressiveLocking(context.Background())
} else {
txn.CancelAggressiveLocking(context.Background())
}
s.False(txn.IsInAggressiveLockingMode())
s.Zero(txn.GetLockedCount())
s.True(txn.GetCommitter().IsNil())
return
}

lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1")))
s.True(txn.GetCommitter().IsTTLRunning())
s.Equal(1, txn.GetLockedCount())

if exitPhase == exitOnFirstLockKeys {
if done {
txn.DoneAggressiveLocking(context.Background())
s.True(txn.GetCommitter().IsTTLRunning())
s.Equal(1, txn.GetLockedCount())
} else {
txn.CancelAggressiveLocking(context.Background())
s.False(txn.GetCommitter().IsTTLRunning())
s.Zero(txn.GetLockedCount())
}
s.False(txn.IsInAggressiveLockingMode())
return
}

txn.RetryAggressiveLocking(context.Background())

if exitPhase == exitOnRetry {
if done {
txn.DoneAggressiveLocking(context.Background())
} else {
txn.CancelAggressiveLocking(context.Background())
}
s.False(txn.IsInAggressiveLockingMode())
s.Zero(txn.GetLockedCount())
s.False(txn.GetCommitter().IsTTLRunning())
return
}

forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
s.NoError(err)
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
key := []byte("k1")
if retryDifferentKey {
key = []byte("k2")
}
s.NoError(txn.LockKeys(context.Background(), lockCtx, key))
s.True(txn.GetCommitter().IsTTLRunning())
s.Equal(key, txn.GetCommitter().GetPrimaryKey())
expectedLockCount := 1
if retryDifferentKey {
// When lock k2 during retry, the previously-locked k1 is not immediately released, and it will be released when
// the fair locking state switches (either retry/cancel/done).
expectedLockCount = 2
}
s.Equal(expectedLockCount, txn.GetLockedCount())

if exitPhase == exitOnSecondLockKeys {
if done {
txn.DoneAggressiveLocking(context.Background())
s.True(txn.GetCommitter().IsTTLRunning())
s.Equal(1, txn.GetLockedCount())
} else {
txn.CancelAggressiveLocking(context.Background())
s.False(txn.GetCommitter().IsTTLRunning())
s.Zero(txn.GetLockedCount())
}
s.False(txn.IsInAggressiveLockingMode())
return
}

s.FailNow("unreachable")
}

func (s *testCommitterSuite) TestAggressiveLockingResetPrimaryAndTTLManagerAfterExit() {
// Done or cancel
for _, done := range []bool{false, true} {
// Iterate exiting phase
for _, exitPhase := range []aggressiveLockingExitPhase{
exitOnEnterAggressiveLocking,
exitOnFirstLockKeys,
exitOnRetry,
exitOnSecondLockKeys,
} {
for _, retryDifferentKey := range []bool{false, true} {
s.testAggressiveLockingResetPrimaryAndTTLManagerAfterExitImpl(done, exitPhase, retryDifferentKey)
}
}
}
}

// TestElapsedTTL tests that elapsed time is correct even if ts physical time is greater than local time.
func (s *testCommitterSuite) TestElapsedTTL() {
key := []byte("key")
Expand Down
5 changes: 5 additions & 0 deletions txnkv/transaction/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ type CommitterProbe struct {
*twoPhaseCommitter
}

// IsNil returns whether tie internal twoPhaseCommitter is nil.
func (c CommitterProbe) IsNil() bool {
return c.twoPhaseCommitter == nil
}

// InitKeysAndMutations prepares the committer for commit.
func (c CommitterProbe) InitKeysAndMutations() error {
return c.initKeysAndMutations(context.Background())
Expand Down
55 changes: 48 additions & 7 deletions txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,8 +1006,12 @@ func (txn *KVTxn) RetryAggressiveLocking(ctx context.Context) {
}
txn.cleanupAggressiveLockingRedundantLocks(ctx)
if txn.aggressiveLockingContext.assignedPrimaryKey {
txn.resetPrimary()
txn.aggressiveLockingContext.assignedPrimaryKey = false
txn.aggressiveLockingContext.lastAssignedPrimaryKey = true
// Do not reset the ttlManager immediately. Instead, we
// will handle the case specially (see KVTxn.resetTTLManagerForAggressiveLockingMode).
// See: https://github.com/pingcap/tidb/issues/58279
txn.resetPrimary(true)
}

txn.aggressiveLockingContext.lastPrimaryKey = txn.aggressiveLockingContext.primaryKey
Expand Down Expand Up @@ -1039,8 +1043,8 @@ func (txn *KVTxn) CancelAggressiveLocking(ctx context.Context) {
}()

txn.cleanupAggressiveLockingRedundantLocks(context.Background())
if txn.aggressiveLockingContext.assignedPrimaryKey {
txn.resetPrimary()
if txn.aggressiveLockingContext.assignedPrimaryKey || txn.aggressiveLockingContext.lastAssignedPrimaryKey {
txn.resetPrimary(false)
txn.aggressiveLockingContext.assignedPrimaryKey = false
}

Expand Down Expand Up @@ -1075,6 +1079,12 @@ func (txn *KVTxn) DoneAggressiveLocking(ctx context.Context) {
txn.aggressiveLockingContext = nil
}()

// If finally no key locked and ttlManager is just started during the current fair locking procedure, reset the
// ttlManager as no key will be the primary.
if txn.aggressiveLockingContext.lastAssignedPrimaryKey && !txn.aggressiveLockingContext.assignedPrimaryKey {
txn.committer.ttlManager.reset()
}

txn.cleanupAggressiveLockingRedundantLocks(context.Background())

if txn.forUpdateTSChecks == nil {
Expand Down Expand Up @@ -1347,7 +1357,7 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
}
// It can't transform LockOnlyIfExists mode to normal mode. If so, it can add a lock to a key
// which doesn't exist in tikv. TiDB should ensure that primary key must be set when it sends
// a LockOnlyIfExists pessmistic lock request.
// a LockOnlyIfExists pessimistic lock request.
if (txn.committer == nil || txn.committer.primaryKey == nil) && len(keys) > 1 {
return &tikverr.ErrLockOnlyIfExistsNoPrimaryKey{
StartTS: txn.startTS,
Expand Down Expand Up @@ -1400,6 +1410,8 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
metrics.AggressiveLockedKeysDerived.Add(float64(filteredAggressiveLockedKeysCount))
metrics.AggressiveLockedKeysNew.Add(float64(len(keys)))

txn.resetTTLManagerForAggressiveLockingMode(len(keys) != 0)

if len(keys) == 0 {
if lockCtx.Stats != nil {
txn.collectAggressiveLockingStats(lockCtx, 0, 0, filteredAggressiveLockedKeysCount, lockWakeUpMode)
Expand Down Expand Up @@ -1484,7 +1496,7 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
}
if assignedPrimaryKey {
// unset the primary key and stop heartbeat if we assigned primary key when failed to lock it.
txn.resetPrimary()
txn.resetPrimary(false)
}
return err
}
Expand Down Expand Up @@ -1559,9 +1571,37 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
return nil
}

func (txn *KVTxn) resetPrimary() {
// resetPrimary resets the primary. It's used when the first LockKeys call in a transaction is failed, or need to be
// rolled back for some reason (e.g. TiDB may perform statement rollback), in which case the primary will be unlocked
// another key may be chosen as the new primary.
func (txn *KVTxn) resetPrimary(keepTTLManager bool) {
txn.committer.primaryKey = nil
txn.committer.ttlManager.reset()
if !keepTTLManager {
txn.committer.ttlManager.reset()
}
}

// resetTTLManagerForAggressiveLockingMode is used in a fair locking procedure to reset the ttlManager when necessary.
// This function is only used during the LockKeys invocation, and the parameter hasNewLockToAcquire indicates whether
// there are any key needs to be locked in the current LockKeys call, after filtering out those that has already been
// locked before the most recent RetryAggressiveLocking.
// Also note that this function is not the only path that fair locking resets the ttlManager. DoneAggressiveLocking may
// also stop the ttlManager if no key is locked after exiting.
func (txn *KVTxn) resetTTLManagerForAggressiveLockingMode(hasNewLockToAcquire bool) {
if !txn.IsInAggressiveLockingMode() {
// Not supposed to be called in a non fair locking context
return
}
// * When there's no new lock to acquire, assume the primary key is not changed in this case. Keep the ttlManager
// running.
// * When there is key to write:
// * If the primary key is not changed, also keep the ttlManager running. Then, when sending the
// acquirePessimisticLock requests, it will call ttlManager.run() again, but it's reentrant and will do nothing
// as the ttlManager is already running.
// * If the primary key is changed, the ttlManager will need to run on the new primary key instead. Reset it.
if hasNewLockToAcquire && !bytes.Equal(txn.aggressiveLockingContext.primaryKey, txn.aggressiveLockingContext.lastPrimaryKey) {
txn.committer.ttlManager.reset()
}
}

func (txn *KVTxn) selectPrimaryForPessimisticLock(sortedKeys [][]byte) {
Expand Down Expand Up @@ -1595,6 +1635,7 @@ func (txn *KVTxn) selectPrimaryForPessimisticLock(sortedKeys [][]byte) {
type aggressiveLockingContext struct {
lastRetryUnnecessaryLocks map[string]tempLockBufferEntry
lastPrimaryKey []byte
lastAssignedPrimaryKey bool
lastAttemptStartTime time.Time

currentLockedKeys map[string]tempLockBufferEntry
Expand Down

0 comments on commit 8e0275c

Please sign in to comment.