Skip to content

Commit

Permalink
Add more tests; fix a potential leak
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta committed Dec 16, 2024
1 parent eb114cb commit 26a7805
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 0 deletions.
117 changes: 117 additions & 0 deletions integration_tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1597,6 +1597,123 @@ func (s *testCommitterSuite) TestAggressiveLockingResetTTLManager() {
s.NoError(txn.Rollback())
}

type aggressiveLockingExitPhase int

const (
exitOnEnterAggressiveLocking aggressiveLockingExitPhase = iota
exitOnFirstLockKeys
exitOnRetry
exitOnSecondLockKeys
)

func (s *testCommitterSuite) testAggressiveLockingResetPrimaryAndTTLManagerAfterExitImpl(done bool, exitPhase aggressiveLockingExitPhase, retryDifferentKey bool) {
s.T().Logf("testing subcase, done-or-cancel: %v, exitPhase: %v, retryDifferentKey: %v", done, exitPhase, retryDifferentKey)
txn := s.begin()
txn.SetPessimistic(true)
txn.StartAggressiveLocking()
s.True(txn.IsInAggressiveLockingMode())
s.True(txn.GetCommitter().IsNil())
defer func() {
s.NoError(txn.Rollback())
}()

if exitPhase == exitOnEnterAggressiveLocking {
if done {
txn.DoneAggressiveLocking(context.Background())
} else {
txn.CancelAggressiveLocking(context.Background())
}
s.False(txn.IsInAggressiveLockingMode())
s.Zero(txn.GetLockedCount())
s.True(txn.GetCommitter().IsNil())
return
}

lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1")))
s.True(txn.GetCommitter().IsTTLRunning())
s.Equal(1, txn.GetLockedCount())

if exitPhase == exitOnFirstLockKeys {
if done {
txn.DoneAggressiveLocking(context.Background())
s.True(txn.GetCommitter().IsTTLRunning())
s.Equal(1, txn.GetLockedCount())
} else {
txn.CancelAggressiveLocking(context.Background())
s.False(txn.GetCommitter().IsTTLRunning())
s.Zero(txn.GetLockedCount())
}
s.False(txn.IsInAggressiveLockingMode())
return
}

txn.RetryAggressiveLocking(context.Background())

if exitPhase == exitOnRetry {
if done {
txn.DoneAggressiveLocking(context.Background())
} else {
txn.CancelAggressiveLocking(context.Background())
}
s.False(txn.IsInAggressiveLockingMode())
s.Zero(txn.GetLockedCount())
s.False(txn.GetCommitter().IsTTLRunning())
return
}

forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
s.NoError(err)
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
key := []byte("k1")
if retryDifferentKey {
key = []byte("k2")
}
s.NoError(txn.LockKeys(context.Background(), lockCtx, key))
s.True(txn.GetCommitter().IsTTLRunning())
s.Equal(key, txn.GetCommitter().GetPrimaryKey())
expectedLockCount := 1
if retryDifferentKey {
// When lock k2 during retry, the previously-locked k1 is not immediately released, and it will be released when
// the fair locking state switches (either retry/cancel/done).
expectedLockCount = 2
}
s.Equal(expectedLockCount, txn.GetLockedCount())

if exitPhase == exitOnSecondLockKeys {
if done {
txn.DoneAggressiveLocking(context.Background())
s.True(txn.GetCommitter().IsTTLRunning())
s.Equal(1, txn.GetLockedCount())
} else {
txn.CancelAggressiveLocking(context.Background())
s.False(txn.GetCommitter().IsTTLRunning())
s.Zero(txn.GetLockedCount())
}
s.False(txn.IsInAggressiveLockingMode())
return
}

s.FailNow("unreachable")
}

func (s *testCommitterSuite) TestAggressiveLockingResetPrimaryAndTTLManagerAfterExit() {
// Done or cancel
for _, done := range []bool{false, true} {
// Iterate exiting phase
for _, exitPhase := range []aggressiveLockingExitPhase{
exitOnEnterAggressiveLocking,
exitOnFirstLockKeys,
exitOnRetry,
exitOnSecondLockKeys,
} {
for _, retryDifferentKey := range []bool{false, true} {
s.testAggressiveLockingResetPrimaryAndTTLManagerAfterExitImpl(done, exitPhase, retryDifferentKey)
}
}
}
}

// TestElapsedTTL tests that elapsed time is correct even if ts physical time is greater than local time.
func (s *testCommitterSuite) TestElapsedTTL() {
key := []byte("key")
Expand Down
8 changes: 8 additions & 0 deletions txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,12 @@ func (txn *KVTxn) DoneAggressiveLocking(ctx context.Context) {
txn.aggressiveLockingContext = nil
}()

// If finally no key locked and ttlManager is just started during the current fair locking procedure, reset the
// ttlManager as no key will be the primary.
if txn.aggressiveLockingContext.lastAssignedPrimaryKey && !txn.aggressiveLockingContext.assignedPrimaryKey {
txn.committer.ttlManager.reset()
}

txn.cleanupAggressiveLockingRedundantLocks(context.Background())

if txn.forUpdateTSChecks == nil {
Expand Down Expand Up @@ -1579,6 +1585,8 @@ func (txn *KVTxn) resetPrimary(keepTTLManager bool) {
// This function is only used during the LockKeys invocation, and the parameter noNewLockToAcquire indicates whether
// there are any key needs to be locked in the current LockKeys call, after filtering out those that has already been
// locked before the most recent RetryAggressiveLocking.
// Also note that this function is not the only path that fair locking resets the ttlManager. DoneAggressiveLocking may
// also stop the ttlManager if no key is locked after exiting.
func (txn *KVTxn) resetTTLManagerForAggressiveLockingMode(noNewLockToAcquire bool) {
if !txn.IsInAggressiveLockingMode() {
// Not supposed to be called in a non fair locking context
Expand Down

0 comments on commit 26a7805

Please sign in to comment.