Skip to content

Commit

Permalink
feat: pipelined dml has its own max-txn-ttl of 24 hours
Browse files Browse the repository at this point in the history
Signed-off-by: ekexium <[email protected]>
  • Loading branch information
ekexium committed Mar 13, 2024
1 parent d59fea5 commit 8142bf2
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 8 deletions.
16 changes: 12 additions & 4 deletions txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type twoPhaseCommitAction interface {
// Global variable set by config file.
var (
ManagedLockTTL uint64 = 20000 // 20s
MaxPipelinedTxnTTL uint64 = 24 * 60 * 60 * 1000 // 24h
)

var (
Expand Down Expand Up @@ -1145,7 +1146,7 @@ type ttlManager struct {
lockCtx *kv.LockCtx
}

func (tm *ttlManager) run(c *twoPhaseCommitter, lockCtx *kv.LockCtx) {
func (tm *ttlManager) run(c *twoPhaseCommitter, lockCtx *kv.LockCtx, isPipelinedTxn bool) {
if _, err := util.EvalFailpoint("doNotKeepAlive"); err == nil {
return
}
Expand All @@ -1157,7 +1158,7 @@ func (tm *ttlManager) run(c *twoPhaseCommitter, lockCtx *kv.LockCtx) {
tm.ch = make(chan struct{})
tm.lockCtx = lockCtx

go keepAlive(c, tm.ch, c.primary(), lockCtx)
go keepAlive(c, tm.ch, c.primary(), lockCtx, isPipelinedTxn)
}

func (tm *ttlManager) close() {
Expand All @@ -1178,7 +1179,10 @@ const keepAliveMaxBackoff = 20000
const pessimisticLockMaxBackoff = 20000
const maxConsecutiveFailure = 10

func keepAlive(c *twoPhaseCommitter, closeCh chan struct{}, primaryKey []byte, lockCtx *kv.LockCtx) {
func keepAlive(
c *twoPhaseCommitter, closeCh chan struct{}, primaryKey []byte,
lockCtx *kv.LockCtx, isPipelinedTxn bool,
) {
// Ticker is set to 1/2 of the ManagedLockTTL.
ticker := time.NewTicker(time.Duration(atomic.LoadUint64(&ManagedLockTTL)) * time.Millisecond / 2)
defer ticker.Stop()
Expand All @@ -1205,7 +1209,11 @@ func keepAlive(c *twoPhaseCommitter, closeCh chan struct{}, primaryKey []byte, l
}

uptime := uint64(oracle.ExtractPhysical(now) - oracle.ExtractPhysical(c.startTS))
if uptime > config.GetGlobalConfig().MaxTxnTTL {
maxTtl := config.GetGlobalConfig().MaxTxnTTL
if isPipelinedTxn {
maxTtl = max(maxTtl, MaxPipelinedTxnTTL)
}
if uptime > maxTtl {
// Checks maximum lifetime for the ttlManager, so when something goes wrong
// the key will not be locked forever.
logutil.Logger(bo.GetCtx()).Info("ttlManager live up to its lifetime",
Expand Down
4 changes: 2 additions & 2 deletions txnkv/transaction/pessimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode(
if batch.isPrimary {
// After locking the primary key, we should protect the primary lock from expiring
// now in case locking the remaining keys take a long time.
c.run(c, action.LockCtx)
c.run(c, action.LockCtx, false)
}

// Handle the case that the TiKV's version is too old and doesn't support `CheckExistence`.
Expand Down Expand Up @@ -412,7 +412,7 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(
len(lockResp.Results) > 0 &&
lockResp.Results[0].Type != kvrpcpb.PessimisticLockKeyResultType_LockResultFailed {
// After locking the primary key, we should protect the primary lock from expiring.
c.run(c, action.LockCtx)
c.run(c, action.LockCtx, false)
}

if len(lockResp.Results) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion txnkv/transaction/pipelined_flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (action actionPipelinedFlush) handleSingleBatch(

if batch.isPrimary {
// start keepalive after primary key is written.
c.run(c, nil)
c.run(c, nil, true)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion txnkv/transaction/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func (action actionPrewrite) handleSingleBatch(
// In this case 1PC is not expected to be used, but still check it for safety.
if int64(c.txnSize) > config.GetGlobalConfig().TiKVClient.TTLRefreshedTxnSize &&
prewriteResp.OnePcCommitTs == 0 {
c.run(c, nil)
c.run(c, nil, false)
}
}

Expand Down

0 comments on commit 8142bf2

Please sign in to comment.