Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebase tidb 7.5 #1220

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b13f4be
[pick-to-7.5] resource_control: add ru details in ExecDetails (#1070)…
nolouch Feb 18, 2024
fa310f8
add a helper function for new ru details (#1078) (#1159)
nolouch Feb 19, 2024
98ed21b
[Cherry-pick tidb-7.5] Check kill signals (#1158)
ekexium Feb 19, 2024
650ba83
pd client: uprade and fit get region option (#1069)
CabinfeverB Dec 1, 2023
735a4db
resource_control: add ru details in ExecDetails (#1070)
glorv Dec 4, 2023
fd10a6c
prefix safepoint kv with keyspace name (#928)
AmoebaProtozoa Aug 7, 2023
2793ccf
increase large transaction preSplitSizeThreashold (#1059)
coocood Nov 15, 2023
6987fe9
support remote coprocessor (#808)
coocood May 29, 2023
9323046
fix encoding of mvcc get by key (#775)
iosmanthus Apr 20, 2023
a871c05
add const label for metrics (#781)
zeminzhou Apr 26, 2023
cc2bcca
cse: decode batch cop task (#798)
iosmanthus May 12, 2023
5c602d0
Use a new way to create client for v2 (#806)
rleungx May 18, 2023
4b2d364
Count only one tiflash replica in disaggregated mode. (#805)
JinheLin May 22, 2023
56a1de8
Add get min ts implementation (#809)
rleungx May 23, 2023
ceb3412
fix strings import
iosmanthus Dec 21, 2023
f47aa78
run intergration test and unit test in tidb-cse-7.5
iosmanthus Dec 22, 2023
0a0c024
close on canceled (#1121)
pingyu Jan 17, 2024
9c08932
Fix wait recvloop timeout (#1134)
pingyu Jan 27, 2024
fc3bc45
Use correct address for CloseAddr (#1140)
pingyu Jan 27, 2024
ca20cba
tests: Fix timeout of TestResolveTxnFallbackFromAsyncCommit (#1146)
pingyu Jan 29, 2024
dbe9dec
Update pd client for cse (#1199)
rleungx Mar 4, 2024
0a1e45a
Upgrade to the latest pd client (#1217)
rleungx Mar 13, 2024
2683ede
Merge branch 'tidb-cse-7.5' into rebase-tidb-7.5-2
zeminzhou Mar 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,14 @@ var (
// ErrTiFlashServerTimeout is the error when tiflash server is timeout.
ErrTiFlashServerTimeout = errors.New("tiflash server timeout")
// ErrQueryInterrupted is the error when the query is interrupted.
ErrQueryInterrupted = errors.New("query interruppted")
// This is deprecated. Keep it only to pass CI :-(. We can remove this later.
ErrQueryInterrupted = errors.New("query interrupted")
// ErrTiKVStaleCommand is the error that the command is stale in tikv.
ErrTiKVStaleCommand = errors.New("tikv stale command")
// ErrTiKVMaxTimestampNotSynced is the error that tikv's max timestamp is not synced.
ErrTiKVMaxTimestampNotSynced = errors.New("tikv max timestamp not synced")
// ErrLockAcquireFailAndNoWaitSet is the error that acquire the lock failed while no wait is setted.
ErrLockAcquireFailAndNoWaitSet = errors.New("lock acquired failed and no wait is setted")
ErrLockAcquireFailAndNoWaitSet = errors.New("lock acquired failed and no wait is set")
// ErrResolveLockTimeout is the error that resolve lock timeout.
ErrResolveLockTimeout = errors.New("resolve lock timeout")
// ErrLockWaitTimeout is the error that wait for the lock is timeout.
Expand All @@ -96,11 +97,19 @@ var (
// ErrIsWitness is the error when a request is send to a witness.
ErrIsWitness = errors.New("peer is witness")
// ErrUnknown is the unknow error.
ErrUnknown = errors.New("unknow")
ErrUnknown = errors.New("unknown")
// ErrResultUndetermined is the error when execution result is unknown.
ErrResultUndetermined = errors.New("execution result undetermined")
)

type ErrQueryInterruptedWithSignal struct {
Signal uint32
}

func (e ErrQueryInterruptedWithSignal) Error() string {
return fmt.Sprintf("query interrupted by signal %d", e.Signal)
}

// MismatchClusterID represents the message that the cluster ID of the PD client does not match the PD.
const MismatchClusterID = "mismatch cluster id"

Expand Down
10 changes: 10 additions & 0 deletions integration_tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2502,3 +2502,13 @@ func (s *testCommitterSuite) TestExtractKeyExistsErr() {
s.True(txn.GetMemBuffer().TryLock())
txn.GetMemBuffer().Unlock()
}

func (s *testCommitterSuite) TestKillSignal() {
txn := s.begin()
err := txn.Set([]byte("key"), []byte("value"))
s.Nil(err)
var killed uint32 = 2
txn.SetVars(kv.NewVariables(&killed))
err = txn.Commit(context.Background())
s.ErrorContains(err, "query interrupted")
}
5 changes: 2 additions & 3 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -1483,9 +1483,8 @@ func (s *RegionRequestSender) SendReqCtx(
}

// recheck whether the session/query is killed during the Next()
boVars := bo.GetVars()
if boVars != nil && boVars.Killed != nil && atomic.LoadUint32(boVars.Killed) == 1 {
return nil, nil, retryTimes, errors.WithStack(tikverr.ErrQueryInterrupted)
if err2 := bo.CheckKilled(); err2 != nil {
return nil, nil, retryTimes, err2
}
if val, err := util.EvalFailpoint("mockRetrySendReqToRegion"); err == nil {
if val.(bool) {
Expand Down
21 changes: 17 additions & 4 deletions internal/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,9 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e
atomic.AddInt64(&detail.BackoffCount, 1)
}

if b.vars != nil && b.vars.Killed != nil {
if atomic.LoadUint32(b.vars.Killed) == 1 {
return errors.WithStack(tikverr.ErrQueryInterrupted)
}
err2 := b.CheckKilled()
if err2 != nil {
return err2
}

var startTs interface{}
Expand Down Expand Up @@ -382,3 +381,17 @@ func (b *Backoffer) longestSleepCfg() (*Config, int) {
}
return nil, 0
}

func (b *Backoffer) CheckKilled() error {
if b.vars != nil && b.vars.Killed != nil {
killed := atomic.LoadUint32(b.vars.Killed)
if killed != 0 {
logutil.BgLogger().Info(
"backoff stops because a killed signal is received",
zap.Uint32("signal", killed),
)
return errors.WithStack(tikverr.ErrQueryInterruptedWithSignal{Signal: killed})
}
}
return nil
}
4 changes: 4 additions & 0 deletions kv/variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ type Variables struct {

// Pointer to SessionVars.Killed
// Killed is a flag to indicate that this query is killed.
// This is an enum value rather than a boolean. See sqlkiller.go
// in TiDB for its definition.
// When its value is 0, it's not killed
// When its value is not 0, it's killed, the value indicates concrete reason.
Killed *uint32
}

Expand Down
22 changes: 21 additions & 1 deletion txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,27 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *retry.Backoffer, action
}

// doActionOnBatches does action to batches in parallel.
func (c *twoPhaseCommitter) doActionOnBatches(bo *retry.Backoffer, action twoPhaseCommitAction, batches []batchMutations) error {
func (c *twoPhaseCommitter) doActionOnBatches(
bo *retry.Backoffer, action twoPhaseCommitAction,
batches []batchMutations,
) error {
// killSignal should never be nil for TiDB
if c.txn != nil && c.txn.vars != nil && c.txn.vars.Killed != nil {
// Do not reset the killed flag here. Let the upper layer reset the flag.
// Before it resets, any request is considered valid to be killed.
status := atomic.LoadUint32(c.txn.vars.Killed)
if status != 0 {
logutil.BgLogger().Info(
"query is killed", zap.Uint32(
"signal",
status,
),
)
// TODO: There might be various signals besides a query interruption,
// but we are unable to differentiate them, because the definition is in TiDB.
return errors.WithStack(tikverr.ErrQueryInterruptedWithSignal{Signal: status})
}
}
if len(batches) == 0 {
return nil
}
Expand Down
12 changes: 0 additions & 12 deletions txnkv/transaction/pessimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,18 +214,6 @@ func (action actionPessimisticLock) handleSingleBatch(
return nil
}
}

// Handle the killed flag when waiting for the pessimistic lock.
// When a txn runs into LockKeys() and backoff here, it has no chance to call
// executor.Next() and check the killed flag.
if action.Killed != nil {
// Do not reset the killed flag here!
// actionPessimisticLock runs on each region parallelly, we have to consider that
// the error may be dropped.
if atomic.LoadUint32(action.Killed) == 1 {
return errors.WithStack(tikverr.ErrQueryInterrupted)
}
}
}
}

Expand Down
6 changes: 0 additions & 6 deletions txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1111,12 +1111,6 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
lockCtx.Stats.Mu.BackoffTypes = append(lockCtx.Stats.Mu.BackoffTypes, bo.GetTypes()...)
lockCtx.Stats.Mu.Unlock()
}
if lockCtx.Killed != nil {
// If the kill signal is received during waiting for pessimisticLock,
// pessimisticLockKeys would handle the error but it doesn't reset the flag.
// We need to reset the killed flag here.
atomic.CompareAndSwapUint32(lockCtx.Killed, 1, 0)
}
if txn.IsInAggressiveLockingMode() {
if txn.aggressiveLockingContext.maxLockedWithConflictTS < lockCtx.MaxLockedWithConflictTS {
txn.aggressiveLockingContext.maxLockedWithConflictTS = lockCtx.MaxLockedWithConflictTS
Expand Down
10 changes: 10 additions & 0 deletions util/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,16 @@ func NewRUDetails() *RUDetails {
}
}

// NewRUDetails creates a new RUDetails with specifical values.
// This function is used in tidb's unit test.
func NewRUDetailsWith(rru, wru float64, waitDur time.Duration) *RUDetails {
return &RUDetails{
readRU: uatomic.NewFloat64(rru),
writeRU: uatomic.NewFloat64(wru),
ruWaitDuration: uatomic.NewDuration(waitDur),
}
}

// Clone implements the RuntimeStats interface.
func (rd *RUDetails) Clone() *RUDetails {
return &RUDetails{
Expand Down
Loading