diff --git a/config/client.go b/config/client.go index b29c7b91e..941f2fb7c 100644 --- a/config/client.go +++ b/config/client.go @@ -90,6 +90,9 @@ type TiKVClient struct { // RemoteCoprocessorAddr is the address of the remote coprocessor. RemoteCoprocessorAddr string `toml:"remote-coprocessor-addr" json:"remote-coprocessor-addr"` + + // TxnChunkWriterAddr is the address of the txn chunk writer. + TxnChunkWriterAddr string `toml:"txn-chunk-writer-addr" json:"txn-chunk-writer-addr"` } // AsyncCommit is the config for the async commit feature. The switch to enable it is a system variable. diff --git a/go.mod b/go.mod index 35efba1c2..376f46645 100644 --- a/go.mod +++ b/go.mod @@ -59,3 +59,5 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/pingcap/kvproto => github.com/tidbcloud/kvproto v0.0.0-20240117032603-369bbbf7b45e diff --git a/go.sum b/go.sum index 744f15daa..f3f166086 100644 --- a/go.sum +++ b/go.sum @@ -74,8 +74,8 @@ github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgW github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2 h1:364A6VCS+l0oHBKZKotX9LzmfEtIO/NTccTIQcPp3Ug= -github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20240314010430-d552d2f77da9 h1:tAfzWZi+hVqX8tM+jnltSiC2k3X+305L/aGiblhX6dk= +github.com/pingcap/kvproto v0.0.0-20240314010430-d552d2f77da9/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 7d5ec2eb2..784f0f819 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -195,6 +195,8 @@ type twoPhaseCommitter struct { isInternal bool forUpdateTSConstraints map[string]uint64 + + txnFileCtx txnFileCtx } type memBufferMutations struct { @@ -734,6 +736,9 @@ func (c *twoPhaseCommitter) primary() []byte { if c.mutations != nil { return c.mutations.GetKey(0) } + if c.txnFileCtx.slice.len() > 0 { + return c.txnFileCtx.slice.chunkRanges[0].smallest + } return nil } return c.primaryKey @@ -917,7 +922,7 @@ func (c *twoPhaseCommitter) preSplitRegion(ctx context.Context, group groupedMut // CommitSecondaryMaxBackoff is max sleep time of the 'commit' command const CommitSecondaryMaxBackoff = 41000 -// doActionOnGroupedMutations splits groups into batches (there is one group per region, and potentially many batches per group, but all mutations +// doActionOnGroupedMutations splits groups into batches (there is one slice per region, and potentially many batches per slice, but all mutations // in a batch will belong to the same region). func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *retry.Backoffer, action twoPhaseCommitAction, groups []groupedMutations) error { action.tiKVTxnRegionsNumHistogram().Observe(float64(len(groups))) @@ -1215,7 +1220,7 @@ func keepAlive(c *twoPhaseCommitter, closeCh chan struct{}, primaryKey []byte, l logutil.Logger(bo.GetCtx()).Info("send TxnHeartBeat", zap.Uint64("startTS", c.startTS), zap.Uint64("newTTL", newTTL)) startTime := time.Now() - _, stopHeartBeat, err := sendTxnHeartBeat(bo, c.store, primaryKey, c.startTS, newTTL) + _, stopHeartBeat, err := sendTxnHeartBeat(bo, c.store, primaryKey, c.startTS, newTTL, c.txnFileCtx.slice.len() > 0) if err != nil { keepFail++ metrics.TxnHeartBeatHistogramError.Observe(time.Since(startTime).Seconds()) @@ -1237,11 +1242,12 @@ func keepAlive(c *twoPhaseCommitter, closeCh chan struct{}, primaryKey []byte, l } } -func sendTxnHeartBeat(bo *retry.Backoffer, store kvstore, primary []byte, startTS, ttl uint64) (newTTL uint64, stopHeartBeat bool, err error) { +func sendTxnHeartBeat(bo *retry.Backoffer, store kvstore, primary []byte, startTS, ttl uint64, isTxnFile bool) (newTTL uint64, stopHeartBeat bool, err error) { req := tikvrpc.NewRequest(tikvrpc.CmdTxnHeartBeat, &kvrpcpb.TxnHeartBeatRequest{ PrimaryLock: primary, StartVersion: startTS, AdviseLockTtl: ttl, + IsTxnFile: isTxnFile, }) for { loc, err := store.GetRegionCache().LocateKey(bo, primary) diff --git a/txnkv/transaction/test_probe.go b/txnkv/transaction/test_probe.go index f35017f84..cc9201bab 100644 --- a/txnkv/transaction/test_probe.go +++ b/txnkv/transaction/test_probe.go @@ -371,7 +371,7 @@ func (c CommitterProbe) CleanupMutations(ctx context.Context) error { // SendTxnHeartBeat renews a txn's ttl. func SendTxnHeartBeat(bo *retry.Backoffer, store kvstore, primary []byte, startTS, ttl uint64) (newTTL uint64, stopHeartBeat bool, err error) { - return sendTxnHeartBeat(bo, store, primary, startTS, ttl) + return sendTxnHeartBeat(bo, store, primary, startTS, ttl, false) } // ConfigProbe exposes configurations and global variables for testing purpose. diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 94ef35964..f77469d25 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -154,7 +154,7 @@ type KVTxn struct { interceptor interceptor.RPCInterceptor assertionLevel kvrpcpb.AssertionLevel *util.RequestSource - // resourceGroupName is the name of tenant resource group. + // resourceGroupName is the name of tenant resource slice. resourceGroupName string aggressiveLockingContext *aggressiveLockingContext @@ -296,7 +296,7 @@ func (txn *KVTxn) SetResourceGroupTagger(tagger tikvrpc.ResourceGroupTagger) { txn.GetSnapshot().SetResourceGroupTagger(tagger) } -// SetResourceGroupName set resource group name for both read and write. +// SetResourceGroupName set resource slice name for both read and write. func (txn *KVTxn) SetResourceGroupName(name string) { txn.resourceGroupName = name txn.GetSnapshot().SetResourceGroupName(name) @@ -502,6 +502,13 @@ func (txn *KVTxn) Commit(ctx context.Context) error { } } }() + if committer.useTxnFile() { + err = committer.executeTxnFile(ctx) + if val == nil || sessionID > 0 { + txn.onCommitted(err) + } + return err + } // latches disabled // pessimistic transaction should also bypass latch. if txn.store.TxnLatches() == nil || txn.IsPessimistic() { diff --git a/txnkv/transaction/txn_file.go b/txnkv/transaction/txn_file.go new file mode 100644 index 000000000..e0b48d7ae --- /dev/null +++ b/txnkv/transaction/txn_file.go @@ -0,0 +1,577 @@ +package transaction + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pkg/errors" + "github.com/tikv/client-go/v2/config" + tikverr "github.com/tikv/client-go/v2/error" + "github.com/tikv/client-go/v2/internal/client" + "github.com/tikv/client-go/v2/internal/locate" + "github.com/tikv/client-go/v2/internal/logutil" + "github.com/tikv/client-go/v2/internal/retry" + "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/metrics" + "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/txnkv/txnlock" + atomicutil "go.uber.org/atomic" + "go.uber.org/zap" + "hash/crc32" + "io" + "net/http" + "strconv" +) + +var ( + // BuildTxnFileMaxBackoff is max sleep time to build TxnFile. + BuildTxnFileMaxBackoff = atomicutil.NewUint64(60000) +) + +const MaxTxnChunkSize = 128 * 1024 * 1024 +const PreSplitRegionChunks = 4 + +type txnFileCtx struct { + slice txnChunkSlice +} + +type chunkBatch struct { + txnChunkSlice + region *locate.Region + isPrimary bool +} + +type txnChunkSlice struct { + chunkIDs []uint64 + chunkRanges []txnChunkRange +} + +func (cs *txnChunkSlice) appendSlice(other *txnChunkSlice) { + if cs.len() == 0 { + cs.chunkIDs = append(cs.chunkIDs, other.chunkIDs...) + cs.chunkRanges = append(cs.chunkRanges, other.chunkRanges...) + return + } + lastChunkRange := cs.chunkRanges[len(cs.chunkIDs)-1] + for i, chunkRange := range other.chunkRanges { + if bytes.Compare(lastChunkRange.smallest, chunkRange.smallest) < 0 { + cs.chunkIDs = append(cs.chunkIDs, other.chunkIDs[i:]...) + cs.chunkRanges = append(cs.chunkRanges, other.chunkRanges[i:]...) + break + } + } + return +} + +func (cs *txnChunkSlice) append(chunkID uint64, chunkRange txnChunkRange) { + cs.chunkIDs = append(cs.chunkIDs, chunkID) + cs.chunkRanges = append(cs.chunkRanges, chunkRange) +} + +func (cs *txnChunkSlice) len() int { + return len(cs.chunkIDs) +} + +func (cs *txnChunkSlice) slice(i, j int) txnChunkSlice { + return txnChunkSlice{ + chunkIDs: cs.chunkIDs[i:j], + chunkRanges: cs.chunkRanges[i:j], + } +} + +func (cs *txnChunkSlice) groupToBatches(c *locate.RegionCache, bo *retry.Backoffer) ([]chunkBatch, error) { + var batches []chunkBatch + endKey := kv.NextKey(cs.chunkRanges[cs.len()-1].biggest) + regions, err := c.LoadRegionsInKeyRange(bo, cs.chunkRanges[0].smallest, endKey) + if err != nil { + return nil, err + } + for _, region := range regions { + batch := chunkBatch{ + region: region, + } + for i, chunkRange := range cs.chunkRanges { + if chunkRange.overlapRegion(region) { + batch.append(cs.chunkIDs[i], chunkRange) + } + } + if batch.len() > 0 { + batches = append(batches, batch) + } + } + return batches, nil +} + +type txnChunkRange struct { + smallest []byte + biggest []byte +} + +func newTxnChunkRange(smallest []byte, biggest []byte) txnChunkRange { + return txnChunkRange{ + smallest: smallest, + biggest: biggest, + } +} + +func (r *txnChunkRange) overlapRegion(region *locate.Region) bool { + return bytes.Compare(r.smallest, region.EndKey()) < 0 && bytes.Compare(r.biggest, region.StartKey()) >= 0 +} + +type txnFileAction interface { + executeBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch chunkBatch) (*tikvrpc.Response, error) + onPrimarySuccess(c *twoPhaseCommitter) + extractKeyError(resp *tikvrpc.Response) *kvrpcpb.KeyError +} + +type txnFilePrewriteAction struct{} + +func (a txnFilePrewriteAction) executeBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch chunkBatch) (*tikvrpc.Response, error) { + primaryLock := c.txnFileCtx.slice.chunkRanges[0].smallest + req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{ + StartVersion: c.startTS, + PrimaryLock: primaryLock, + LockTtl: c.lockTTL, + MaxCommitTs: c.maxCommitTS, + AssertionLevel: kvrpcpb.AssertionLevel_Off, + TxnFileChunks: batch.chunkIDs, + }, kvrpcpb.Context{ + Priority: c.priority, + SyncLog: c.syncLog, + ResourceGroupTag: c.resourceGroupTag, + DiskFullOpt: c.diskFullOpt, + TxnSource: c.txnSource, + MaxExecutionDurationMs: uint64(client.ReadTimeoutMedium.Milliseconds()), + RequestSource: c.txn.GetRequestSource(), + ResourceControlContext: &kvrpcpb.ResourceControlContext{ + ResourceGroupName: c.resourceGroupName, + }, + }) + sender := locate.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient()) + var resolvingRecordToken *int + for { + resp, _, err := sender.SendReq(bo, req, batch.region.VerID(), client.ReadTimeoutMedium) + if err != nil { + return nil, err + } + if resp.Resp == nil { + return nil, errors.WithStack(tikverr.ErrBodyMissing) + } + prewriteResp := resp.Resp.(*kvrpcpb.PrewriteResponse) + keyErrs := prewriteResp.GetErrors() + if len(keyErrs) == 0 { + return resp, nil + } + var locks []*txnlock.Lock + for _, keyErr := range keyErrs { + // Check already exists error + if alreadyExist := keyErr.GetAlreadyExist(); alreadyExist != nil { + e := &tikverr.ErrKeyExist{AlreadyExist: alreadyExist} + return nil, c.extractKeyExistsErr(e) + } + + // Extract lock from key error + lock, err1 := txnlock.ExtractLockFromKeyErr(keyErr) + if err1 != nil { + return nil, err1 + } + logutil.BgLogger().Info( + "prewrite txn file encounters lock", + zap.Uint64("session", c.sessionID), + zap.Uint64("txnID", c.startTS), + zap.Stringer("lock", lock), + ) + // If an optimistic transaction encounters a lock with larger TS, this transaction will certainly + // fail due to a WriteConflict error. So we can construct and return an error here early. + // Pessimistic transactions don't need such an optimization. If this key needs a pessimistic lock, + // TiKV will return a PessimisticLockNotFound error directly if it encounters a different lock. Otherwise, + // TiKV returns lock.TTL = 0, and we still need to resolve the lock. + if lock.TxnID > c.startTS { + return nil, tikverr.NewErrWriteConflictWithArgs( + c.startTS, + lock.TxnID, + 0, + lock.Key, + kvrpcpb.WriteConflict_Optimistic, + ) + } + locks = append(locks, lock) + } + if resolvingRecordToken == nil { + token := c.store.GetLockResolver().RecordResolvingLocks(locks, c.startTS) + resolvingRecordToken = &token + defer c.store.GetLockResolver().ResolveLocksDone(c.startTS, *resolvingRecordToken) + } else { + c.store.GetLockResolver().UpdateResolvingLocks(locks, c.startTS, *resolvingRecordToken) + } + resolveLockOpts := txnlock.ResolveLocksOptions{ + CallerStartTS: c.startTS, + Locks: locks, + Detail: &c.getDetail().ResolveLock, + } + resolveLockRes, err := c.store.GetLockResolver().ResolveLocksWithOpts(bo, resolveLockOpts) + if err != nil { + return nil, err + } + msBeforeExpired := resolveLockRes.TTL + if msBeforeExpired > 0 { + err = bo.BackoffWithCfgAndMaxSleep( + retry.BoTxnLock, + int(msBeforeExpired), + errors.Errorf("2PC txn file prewrite lockedKeys: %d", len(locks)), + ) + if err != nil { + return nil, err + } + } + } +} + +func (a txnFilePrewriteAction) onPrimarySuccess(c *twoPhaseCommitter) { + c.run(c, nil) +} + +func (a txnFilePrewriteAction) extractKeyError(resp *tikvrpc.Response) *kvrpcpb.KeyError { + prewriteResp, _ := resp.Resp.(*kvrpcpb.PrewriteResponse) + errs := prewriteResp.GetErrors() + if len(errs) > 0 { + return errs[0] + } + return nil +} + +type txnFileCommitAction struct { + commitTS uint64 +} + +func (a txnFileCommitAction) executeBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch chunkBatch) (*tikvrpc.Response, error) { + req := tikvrpc.NewRequest(tikvrpc.CmdCommit, &kvrpcpb.CommitRequest{ + StartVersion: c.startTS, + CommitVersion: a.commitTS, + IsTxnFile: true, + }, kvrpcpb.Context{ + Priority: c.priority, + SyncLog: c.syncLog, + ResourceGroupTag: c.resourceGroupTag, + DiskFullOpt: c.diskFullOpt, + TxnSource: c.txnSource, + MaxExecutionDurationMs: uint64(client.ReadTimeoutMedium.Milliseconds()), + RequestSource: c.txn.GetRequestSource(), + ResourceControlContext: &kvrpcpb.ResourceControlContext{ + ResourceGroupName: c.resourceGroupName, + }, + }) + sender := locate.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient()) + for { + resp, _, err := sender.SendReq(bo, req, batch.region.VerID(), client.ReadTimeoutMedium) + if batch.isPrimary && sender.GetRPCError() != nil { + c.setUndeterminedErr(errors.WithStack(sender.GetRPCError())) + } + // Unexpected error occurs, return it. + if err != nil { + return nil, err + } + if resp.Resp == nil { + return nil, errors.WithStack(tikverr.ErrBodyMissing) + } + commitResp := resp.Resp.(*kvrpcpb.CommitResponse) + if keyErr := commitResp.GetError(); keyErr != nil { + if rejected := keyErr.GetCommitTsExpired(); rejected != nil { + logutil.Logger(bo.GetCtx()).Info("2PC commitTS rejected by TiKV, retry with a newer commitTS", + zap.Uint64("txnStartTS", c.startTS), + zap.Stringer("info", logutil.Hex(rejected))) + + // Do not retry for a txn which has a too large MinCommitTs + // 3600000 << 18 = 943718400000 + if rejected.MinCommitTs-rejected.AttemptedCommitTs > 943718400000 { + return nil, errors.Errorf("2PC MinCommitTS is too large, we got MinCommitTS: %d, and AttemptedCommitTS: %d", + rejected.MinCommitTs, rejected.AttemptedCommitTs) + } + + // Update commit ts and retry. + commitTS, err1 := c.store.GetTimestampWithRetry(bo, c.txn.GetScope()) + if err1 != nil { + logutil.Logger(bo.GetCtx()).Warn("2PC get commitTS failed", + zap.Error(err1), + zap.Uint64("txnStartTS", c.startTS)) + return nil, err1 + } + + c.mu.Lock() + c.commitTS = commitTS + c.mu.Unlock() + // Update the commitTS of the request and retry. + req.Commit().CommitVersion = commitTS + continue + } + return nil, tikverr.ExtractKeyErr(keyErr) + } + return resp, nil + } +} + +func (a txnFileCommitAction) onPrimarySuccess(c *twoPhaseCommitter) { + c.mu.Lock() + c.mu.committed = true + c.mu.Unlock() +} + +func (a txnFileCommitAction) extractKeyError(resp *tikvrpc.Response) *kvrpcpb.KeyError { + commitResp, _ := resp.Resp.(*kvrpcpb.CommitResponse) + return commitResp.GetError() +} + +type txnFileRollbackAction struct{} + +func (a txnFileRollbackAction) executeBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch chunkBatch) (*tikvrpc.Response, error) { + req := tikvrpc.NewRequest(tikvrpc.CmdBatchRollback, &kvrpcpb.BatchRollbackRequest{ + StartVersion: c.startTS, + IsTxnFile: true, + }, kvrpcpb.Context{ + Priority: c.priority, + SyncLog: c.syncLog, + ResourceGroupTag: c.resourceGroupTag, + DiskFullOpt: c.diskFullOpt, + TxnSource: c.txnSource, + MaxExecutionDurationMs: uint64(client.ReadTimeoutShort.Milliseconds()), + RequestSource: c.txn.GetRequestSource(), + ResourceControlContext: &kvrpcpb.ResourceControlContext{ + ResourceGroupName: c.resourceGroupName, + }, + }) + sender := locate.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient()) + resp, _, err1 := sender.SendReq(bo, req, batch.region.VerID(), client.ReadTimeoutShort) + if err1 != nil { + return nil, err1 + } + return resp, nil +} + +func (a txnFileRollbackAction) onPrimarySuccess(_ *twoPhaseCommitter) { +} + +func (a txnFileRollbackAction) extractKeyError(resp *tikvrpc.Response) *kvrpcpb.KeyError { + rollbackResp, _ := resp.Resp.(*kvrpcpb.BatchRollbackResponse) + return rollbackResp.GetError() +} + +func (c *twoPhaseCommitter) executeTxnFile(ctx context.Context) (err error) { + defer func() { + // Always clean up all written keys if the txn does not commit. + c.mu.RLock() + committed := c.mu.committed + undetermined := c.mu.undeterminedErr != nil + c.mu.RUnlock() + if !committed && !undetermined { + err1 := c.executeTxnFileActionWithRetry(retry.NewBackofferWithVars(ctx, int(PrewriteMaxBackoff.Load()), c.txn.vars), c.txnFileCtx.slice, txnFileRollbackAction{}) + if err1 != nil { + logutil.BgLogger().Error("executeTxnFileActionWithRetry failed", zap.Error(err1)) + } + metrics.TwoPCTxnCounterError.Inc() + } else { + metrics.TwoPCTxnCounterOk.Inc() + } + c.txn.commitTS = c.commitTS + }() + + bo := retry.NewBackofferWithVars(ctx, int(PrewriteMaxBackoff.Load()), c.txn.vars) + if err = c.buildTxnFiles(bo, c.mutations); err != nil { + return + } + if err = c.preSplitTxnFileRegions(bo); err != nil { + return + } + err = c.executeTxnFileActionWithRetry(bo, c.txnFileCtx.slice, txnFilePrewriteAction{}) + if err != nil { + return + } + c.commitTS, err = c.store.GetTimestampWithRetry(bo, c.txn.GetScope()) + if err != nil { + return + } + err = c.executeTxnFileActionWithRetry(bo, c.txnFileCtx.slice, txnFileCommitAction{commitTS: c.commitTS}) + return +} + +func (c *twoPhaseCommitter) executeTxnFileAction(bo *retry.Backoffer, chunkSlice txnChunkSlice, action txnFileAction) (txnChunkSlice, error) { + var regionErrChunks txnChunkSlice + batches, err := chunkSlice.groupToBatches(c.store.GetRegionCache(), bo) + if err != nil { + return regionErrChunks, err + } + firstBatch := batches[0] + if firstBatch.region.Contains(c.primary()) { + firstBatch.isPrimary = true + resp, err := action.executeBatch(c, bo, firstBatch) + if err != nil { + return regionErrChunks, err + } + regionErr, err := resp.GetRegionError() + if err != nil { + return regionErrChunks, err + } + if regionErr != nil { + return chunkSlice, nil + } + action.onPrimarySuccess(c) + batches = batches[1:] + } + for _, batch := range batches { + resp, err1 := action.executeBatch(c, bo, batch) + if err1 != nil { + return regionErrChunks, err1 + } + if keyErr := action.extractKeyError(resp); keyErr != nil { + if alreadyExist := keyErr.GetAlreadyExist(); alreadyExist != nil { + e := &tikverr.ErrKeyExist{AlreadyExist: alreadyExist} + return regionErrChunks, c.extractKeyExistsErr(e) + } + lock, err2 := txnlock.ExtractLockFromKeyErr(keyErr) + if err2 != nil { + return regionErrChunks, err2 + } + if lock.TxnID > c.startTS { + return regionErrChunks, tikverr.NewErrWriteConflictWithArgs( + c.startTS, + lock.TxnID, + 0, + lock.Key, + kvrpcpb.WriteConflict_Optimistic, + ) + } + + } + regionErr, err1 := resp.GetRegionError() + if err1 != nil { + return regionErrChunks, err1 + } + if regionErr.GetEpochNotMatch() != nil { + regionErrChunks.appendSlice(&batch.txnChunkSlice) + continue + } + } + return regionErrChunks, nil +} + +func (c *twoPhaseCommitter) executeTxnFileActionWithRetry(bo *retry.Backoffer, chunkSlice txnChunkSlice, action txnFileAction) error { + currentChunks := chunkSlice + for { + var regionErrChunks txnChunkSlice + regionErrChunks, err := c.executeTxnFileAction(bo, currentChunks, action) + if err != nil { + return err + } + if regionErrChunks.len() == 0 { + return nil + } + currentChunks = regionErrChunks + err = bo.Backoff(retry.BoRegionMiss, errors.Errorf("txnFile region miss")) + if err != nil { + return err + } + } +} + +func (c *twoPhaseCommitter) buildTxnFiles(bo *retry.Backoffer, mutations CommitterMutations) error { + capacity := c.txn.Size() + c.txn.Len()*7 + 4 + if capacity > MaxTxnChunkSize { + capacity = MaxTxnChunkSize + } + writerAddr := config.GetGlobalConfig().TiKVClient.TxnChunkWriterAddr + buf := make([]byte, 0, capacity) + chunkSmallest := mutations.GetKey(0) + for i := 0; i < mutations.Len(); i++ { + key := mutations.GetKey(i) + op := mutations.GetOp(i) + val := mutations.GetValue(i) + entrySize := 2 + len(key) + 1 + 4 + len(val) + if len(buf)+entrySize+4 > cap(buf) { + chunkID, err := c.buildTxnFile(bo, writerAddr, buf) + if err != nil { + return err + } + ran := newTxnChunkRange(chunkSmallest, mutations.GetKey(i-1)) + c.txnFileCtx.slice.append(chunkID, ran) + chunkID++ + chunkSmallest = key + buf = buf[:0] + } + buf = binary.LittleEndian.AppendUint16(buf, uint16(len(key))) + buf = append(buf, key...) + buf = append(buf, byte(op)) + buf = binary.LittleEndian.AppendUint32(buf, uint32(len(val))) + buf = append(buf, val...) + } + if len(buf) > 0 { + chunkID, err := c.buildTxnFile(bo, writerAddr, buf) + if err != nil { + return err + } + ran := newTxnChunkRange(chunkSmallest, mutations.GetKey(mutations.Len()-1)) + c.txnFileCtx.slice.append(chunkID, ran) + } + return nil +} + +func (c *twoPhaseCommitter) buildTxnFile(bo *retry.Backoffer, writerAddr string, buf []byte) (uint64, error) { + hash := crc32.New(crc32.MakeTable(crc32.Castagnoli)) + hash.Write(buf) + crc := hash.Sum32() + buf = binary.LittleEndian.AppendUint32(buf, crc) + logutil.BgLogger().Info("build txn file size", zap.Int("size", len(buf))) + url := fmt.Sprintf("http://%s/txn_chunk", writerAddr) + resp, err := http.Post(url, "application/octet-stream", bytes.NewReader(buf)) + if err != nil { + return 0, err + } + if resp.StatusCode != http.StatusOK { + return 0, fmt.Errorf("http status %s", resp.Status) + } + data, err := io.ReadAll(resp.Body) + if err != nil { + return 0, err + } + return strconv.ParseUint(string(data), 10, 64) +} + +func (c *twoPhaseCommitter) useTxnFile() bool { + if c.txn.isPessimistic || c.txn.GetMemBuffer().Size() < 16*1024*1024 { + return false + } + conf := config.GetGlobalConfig() + return len(conf.TiKVClient.TxnChunkWriterAddr) > 0 +} + +func (c *twoPhaseCommitter) preSplitTxnFileRegions(bo *retry.Backoffer) error { + for { + batches, err := c.txnFileCtx.slice.groupToBatches(c.store.GetRegionCache(), bo) + if err != nil { + err = bo.Backoff(retry.BoRegionMiss, err) + if err != nil { + return err + } + continue + } + var splitKeys [][]byte + for _, batch := range batches { + if batch.len() > PreSplitRegionChunks { + for i := PreSplitRegionChunks; i < batch.len(); i += PreSplitRegionChunks { + splitKeys = append(splitKeys, batch.chunkRanges[i].smallest) + } + } + } + if len(splitKeys) == 0 { + return nil + } + _, err = c.store.SplitRegions(bo.GetCtx(), splitKeys, false, nil) + if err != nil { + logutil.BgLogger().Warn("txn file pre-split region failed") + } + err = bo.Backoff(retry.BoRegionMiss, err) + if err != nil { + return err + } + } +} diff --git a/txnkv/txnlock/lock_resolver.go b/txnkv/txnlock/lock_resolver.go index b93b86eb4..b27fc0d89 100644 --- a/txnkv/txnlock/lock_resolver.go +++ b/txnkv/txnlock/lock_resolver.go @@ -171,6 +171,7 @@ type Lock struct { UseAsyncCommit bool LockForUpdateTS uint64 MinCommitTS uint64 + IsTxnFile bool } func (l *Lock) String() string { @@ -179,8 +180,8 @@ func (l *Lock) String() string { buf.WriteString(hex.EncodeToString(l.Key)) buf.WriteString(", primary: ") buf.WriteString(hex.EncodeToString(l.Primary)) - return fmt.Sprintf("%s, txnStartTS: %d, lockForUpdateTS:%d, minCommitTs:%d, ttl: %d, type: %s, UseAsyncCommit: %t, txnSize: %d", - buf.String(), l.TxnID, l.LockForUpdateTS, l.MinCommitTS, l.TTL, l.LockType, l.UseAsyncCommit, l.TxnSize) + return fmt.Sprintf("%s, txnStartTS: %d, lockForUpdateTS:%d, minCommitTs:%d, ttl: %d, type: %s, UseAsyncCommit: %t, txnSize: %d, isTxnFile: %t", + buf.String(), l.TxnID, l.LockForUpdateTS, l.MinCommitTS, l.TTL, l.LockType, l.UseAsyncCommit, l.TxnSize, l.IsTxnFile) } // NewLock creates a new *Lock. @@ -195,6 +196,7 @@ func NewLock(l *kvrpcpb.LockInfo) *Lock { UseAsyncCommit: l.UseAsyncCommit, LockForUpdateTS: l.LockForUpdateTs, MinCommitTS: l.MinCommitTs, + IsTxnFile: l.IsTxnFile, } } @@ -236,6 +238,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo expiredLocks := locks txnInfos := make(map[uint64]uint64) + txnFileIDs := make(map[uint64]bool) startTime := time.Now() for _, l := range expiredLocks { logutil.Logger(bo.GetCtx()).Debug("BatchResolveLocks handling lock", zap.Stringer("lock", l)) @@ -292,6 +295,9 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo } txnInfos[l.TxnID] = status.commitTS + if l.IsTxnFile { + txnFileIDs[l.TxnID] = true + } } logutil.BgLogger().Info("BatchResolveLocks: lookup txn status", zap.Duration("cost time", time.Since(startTime)), @@ -300,8 +306,9 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo listTxnInfos := make([]*kvrpcpb.TxnInfo, 0, len(txnInfos)) for txnID, status := range txnInfos { listTxnInfos = append(listTxnInfos, &kvrpcpb.TxnInfo{ - Txn: txnID, - Status: status, + Txn: txnID, + Status: status, + IsTxnFile: txnFileIDs[txnID], }) } @@ -747,6 +754,7 @@ func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary ForceSyncCommit: forceSyncCommit, ResolvingPessimisticLock: resolvingPessimisticLock, VerifyIsPrimary: true, + IsTxnFile: lockInfo.IsTxnFile, }, kvrpcpb.Context{ RequestSource: util.RequestSourceFromCtx(bo.GetCtx()), ResourceControlContext: &kvrpcpb.ResourceControlContext{ @@ -1053,6 +1061,7 @@ func (lr *LockResolver) checkAllSecondaries(bo *retry.Backoffer, l *Lock, status func (lr *LockResolver) resolveRegionLocks(bo *retry.Backoffer, l *Lock, region locate.RegionVerID, keys [][]byte, status TxnStatus) error { lreq := &kvrpcpb.ResolveLockRequest{ StartVersion: l.TxnID, + IsTxnFile: l.IsTxnFile, } if status.IsCommitted() { lreq.CommitVersion = status.CommitTS()