Skip to content

Commit

Permalink
Implement the switch group action
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Aug 8, 2024
1 parent 2f7e5f3 commit 028d340
Show file tree
Hide file tree
Showing 11 changed files with 433 additions and 71 deletions.
322 changes: 304 additions & 18 deletions DEPS.bzl

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/systable_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,5 +115,5 @@ func TestCheckSysTableCompatibility(t *testing.T) {
//
// The above variables are in the file br/pkg/restore/systable_restore.go
func TestMonitorTheSystemTableIncremental(t *testing.T) {
require.Equal(t, int64(211), session.CurrentBootstrapVersion)
require.Equal(t, int64(212), session.CurrentBootstrapVersion)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/pingcap/tidb

go 1.21

replace github.com/pingcap/kvproto => github.com/JmPotato/kvproto v0.0.0-20240730032855-30010ef5dc20
replace github.com/pingcap/kvproto => github.com/JmPotato/kvproto v0.0.0-20240807105224-7e7a2da7bfcd

require (
cloud.google.com/go/storage v1.38.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -845,8 +845,8 @@ github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ=
github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM=
github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo=
github.com/JmPotato/kvproto v0.0.0-20240730032855-30010ef5dc20 h1:Hxu4QP2WeowrYaqta923Stz3ZVY55QGAzOTzy6QekFI=
github.com/JmPotato/kvproto v0.0.0-20240730032855-30010ef5dc20/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/JmPotato/kvproto v0.0.0-20240807105224-7e7a2da7bfcd h1:y9cKHOkSUX2wiMt7J5IeKNl4aSCuyIF2Y0ah/MuYVhc=
github.com/JmPotato/kvproto v0.0.0-20240807105224-7e7a2da7bfcd/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU=
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk=
github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI=
Expand Down
5 changes: 1 addition & 4 deletions pkg/ddl/resourcegroup/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@ func NewGroupFromOptions(groupName string, options *model.ResourceGroupSettings)
}
// Update the action settings.
runaway.Action = rmpb.RunawayAction(options.Runaway.Action.Type)
runaway.ActionV2 = &rmpb.RunawayActionV2{
Type: runaway.Action,
SwitchGroupName: options.Runaway.Action.SwitchGroupName,
}
runaway.SwitchGroupName = options.Runaway.Action.SwitchGroupName
// Update the watch settings.
if options.Runaway.WatchType != model.WatchNone {
runaway.Watch = &rmpb.RunawayWatch{}
Expand Down
127 changes: 93 additions & 34 deletions pkg/domain/resourcegroup/runaway.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package resourcegroup

import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -111,19 +112,50 @@ type QuarantineRecord struct {
ID int64
ResourceGroupName string
// startTime and endTime are in UTC.
StartTime time.Time
EndTime time.Time
Watch rmpb.RunawayWatchType
WatchText string
Source string
Action rmpb.RunawayAction
StartTime time.Time
EndTime time.Time
Watch rmpb.RunawayWatchType
WatchText string
Source string
action rmpb.RunawayAction
switchGroupName string
}

// GetRecordKey is used to get the key in ttl cache.
func (r *QuarantineRecord) GetRecordKey() string {
return r.ResourceGroupName + "/" + r.WatchText
}

// GetAction returns the runaway action type.
func (r *QuarantineRecord) GetAction() rmpb.RunawayAction {
return r.action
}

// GetSwitchGroupName returns the switch group name if the action type is `SWITCH_GROUP`.
func (r *QuarantineRecord) GetSwitchGroupName() string {
if r.action == rmpb.RunawayAction_SwitchGroup {
return r.switchGroupName
}
return ""
}

// GetActionString returns the action string.
func (r *QuarantineRecord) GetActionString() string {
if r == nil {
return rmpb.RunawayAction_NoneAction.String()
}
if r.action == rmpb.RunawayAction_SwitchGroup {
return fmt.Sprintf("%s(%s)", r.action.String(), r.switchGroupName)
}
return r.action.String()
}

// SetAction sets the runaway action fields.
func (r *QuarantineRecord) SetAction(action rmpb.RunawayAction, switchGroupName string) {
r.action = action
r.switchGroupName = switchGroupName
}

func writeInsert(builder *strings.Builder, tableName string) {
builder.WriteString("insert into ")
builder.WriteString(tableName)
Expand All @@ -135,7 +167,7 @@ func (r *QuarantineRecord) GenInsertionStmt() (string, []any) {
var builder strings.Builder
params := make([]any, 0, 6)
writeInsert(&builder, RunawayWatchTableName)
builder.WriteString("(null, %?, %?, %?, %?, %?, %?, %?)")
builder.WriteString("(null, %?, %?, %?, %?, %?, %?, %?, %?)")
params = append(params, r.ResourceGroupName)
params = append(params, r.StartTime)
if r.EndTime.Equal(NullTime) {
Expand All @@ -146,7 +178,8 @@ func (r *QuarantineRecord) GenInsertionStmt() (string, []any) {
params = append(params, r.Watch)
params = append(params, r.WatchText)
params = append(params, r.Source)
params = append(params, r.Action)
params = append(params, r.GetAction())
params = append(params, r.GetSwitchGroupName())
return builder.String(), params
}

Expand All @@ -155,7 +188,7 @@ func (r *QuarantineRecord) GenInsertionDoneStmt() (string, []any) {
var builder strings.Builder
params := make([]any, 0, 9)
writeInsert(&builder, RunawayWatchDoneTableName)
builder.WriteString("(null, %?, %?, %?, %?, %?, %?, %?, %?, %?)")
builder.WriteString("(null, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?)")
params = append(params, r.ID)
params = append(params, r.ResourceGroupName)
params = append(params, r.StartTime)
Expand All @@ -167,7 +200,8 @@ func (r *QuarantineRecord) GenInsertionDoneStmt() (string, []any) {
params = append(params, r.Watch)
params = append(params, r.WatchText)
params = append(params, r.Source)
params = append(params, r.Action)
params = append(params, r.GetAction())
params = append(params, r.GetSwitchGroupName())
params = append(params, time.Now().UTC())
return builder.String(), params
}
Expand Down Expand Up @@ -276,7 +310,11 @@ func (rm *RunawayManager) DeriveChecker(resourceGroupName, originalSQL, sqlDiges
return newRunawayChecker(rm, resourceGroupName, group.RunawaySettings, originalSQL, sqlDigest, planDigest, startTime)
}

func (rm *RunawayManager) markQuarantine(resourceGroupName, convict string, watchType rmpb.RunawayWatchType, action rmpb.RunawayAction, ttl time.Duration, now *time.Time) {
func (rm *RunawayManager) markQuarantine(
resourceGroupName, convict string,
watchType rmpb.RunawayWatchType, action rmpb.RunawayAction, switchGroupName string,
ttl time.Duration, now *time.Time,
) {
var endTime time.Time
if ttl > 0 {
endTime = now.UTC().Add(ttl)
Expand All @@ -288,7 +326,8 @@ func (rm *RunawayManager) markQuarantine(resourceGroupName, convict string, watc
Watch: watchType,
WatchText: convict,
Source: rm.serverID,
Action: action,
action: action,
switchGroupName: switchGroupName,
}
// Add record without ID into watch list in this TiDB right now.
rm.addWatchList(record, ttl, false)
Expand Down Expand Up @@ -399,7 +438,7 @@ func (rm *RunawayManager) getWatchFromWatchList(key string) *QuarantineRecord {
return nil
}

func (rm *RunawayManager) markRunaway(resourceGroupName, originalSQL, planDigest string, action string, matchType RunawayMatchType, now *time.Time) {
func (rm *RunawayManager) markRunaway(resourceGroupName, originalSQL, planDigest, actionStr string, matchType RunawayMatchType, now *time.Time) {
source := rm.serverID
if !rm.syncerInitialized.Load() {
rm.logOnce.Do(func() {
Expand All @@ -412,7 +451,7 @@ func (rm *RunawayManager) markRunaway(resourceGroupName, originalSQL, planDigest
ResourceGroupName: resourceGroupName,
Time: *now,
Match: matchType.String(),
Action: action,
Action: actionStr,
SQLText: originalSQL,
PlanDigest: planDigest,
Source: source,
Expand Down Expand Up @@ -443,12 +482,12 @@ func (rm *RunawayManager) StaleQuarantineRecordChan() <-chan *QuarantineRecord {
}

// examineWatchList check whether the query is in watch list.
func (rm *RunawayManager) examineWatchList(resourceGroupName string, convict string) (bool, rmpb.RunawayAction) {
func (rm *RunawayManager) examineWatchList(resourceGroupName, convict string) (bool, rmpb.RunawayAction, string) {
item := rm.getWatchFromWatchList(resourceGroupName + "/" + convict)
if item == nil {
return false, 0
return false, 0, ""
}
return true, item.Action
return true, item.GetAction(), item.GetSwitchGroupName()
}

// Stop stops the watchList which is a ttlcache.
Expand All @@ -470,11 +509,15 @@ type RunawayChecker struct {
planDigest string

deadline time.Time
setting *rmpb.RunawaySettings
// settings is used as the fallback runaway settings if there is no other specification.
setting *rmpb.RunawaySettings

markedByRule atomic.Bool
markedByWatch bool
watchAction rmpb.RunawayAction
// action is the specified action for the runaway query.
// If it's not given, the action defined in `groupSettings` will be used.
action rmpb.RunawayAction
switchGroupName string
}

func newRunawayChecker(manager *RunawayManager, resourceGroupName string, setting *rmpb.RunawaySettings, originalSQL, sqlDigest, planDigest string, startTime time.Time) *RunawayChecker {
Expand All @@ -495,34 +538,39 @@ func newRunawayChecker(manager *RunawayManager, resourceGroupName string, settin
}

// BeforeExecutor checks whether query is in watch list before executing and after compiling.
func (r *RunawayChecker) BeforeExecutor() error {
func (r *RunawayChecker) BeforeExecutor() (string, error) {
if r == nil {
return nil
return "", nil
}
for _, convict := range r.getConvictIdentifiers() {
watched, action := r.manager.examineWatchList(r.resourceGroupName, convict)
watched, action, switchGroupName := r.manager.examineWatchList(r.resourceGroupName, convict)
if watched {
// Fallback to use the resource group runaway setting.
if action == rmpb.RunawayAction_NoneAction && r.setting != nil {
action = r.setting.Action
switchGroupName = r.setting.SwitchGroupName
}
r.markedByWatch = true
now := time.Now()
r.watchAction = action
r.markRunaway(RunawayMatchTypeWatch, action, &now)
r.action = action
r.switchGroupName = switchGroupName
r.markRunaway(RunawayMatchTypeWatch, action, switchGroupName, &now)
// If no match action, it will do nothing.
switch action {
case rmpb.RunawayAction_Kill:
return exeerrors.ErrResourceGroupQueryRunawayQuarantine
return "", exeerrors.ErrResourceGroupQueryRunawayQuarantine
case rmpb.RunawayAction_CoolDown:
// This action should be done in BeforeCopRequest.
return nil
return "", nil
case rmpb.RunawayAction_DryRun:
return nil
return "", nil
case rmpb.RunawayAction_SwitchGroup:
return switchGroupName, nil
default:
}
}
}
return nil
return "", nil
}

// BeforeCopRequest checks runaway and modifies the request if necessary before sending coprocessor request.
Expand All @@ -534,7 +582,7 @@ func (r *RunawayChecker) BeforeCopRequest(req *tikvrpc.Request) error {
if !marked {
// note: now we don't check whether query is in watch list again.
if r.markedByWatch {
if r.watchAction == rmpb.RunawayAction_CoolDown {
if r.action == rmpb.RunawayAction_CoolDown {
req.ResourceControlContext.OverridePriority = 1 // set priority to lowest
}
}
Expand All @@ -552,7 +600,7 @@ func (r *RunawayChecker) BeforeCopRequest(req *tikvrpc.Request) error {
}
// execution time exceeds the threshold, mark the query as runaway
if r.markedByRule.CompareAndSwap(false, true) {
r.markRunaway(RunawayMatchTypeIdentify, r.setting.Action, &now)
r.markRunaway(RunawayMatchTypeIdentify, r.setting.Action, r.setting.SwitchGroupName, &now)
if !r.markedByWatch {
r.markQuarantine(&now)
}
Expand Down Expand Up @@ -580,7 +628,7 @@ func (r *RunawayChecker) CheckCopRespError(err error) error {
if !r.markedByRule.Load() {
now := time.Now()
if r.deadline.Before(now) && r.markedByRule.CompareAndSwap(false, true) {
r.markRunaway(RunawayMatchTypeIdentify, r.setting.Action, &now)
r.markRunaway(RunawayMatchTypeIdentify, r.setting.Action, r.setting.SwitchGroupName, &now)
if !r.markedByWatch {
r.markQuarantine(&now)
}
Expand All @@ -601,11 +649,22 @@ func (r *RunawayChecker) markQuarantine(now *time.Time) {
}
ttl := time.Duration(r.setting.Watch.LastingDurationMs) * time.Millisecond

r.manager.markQuarantine(r.resourceGroupName, r.getSettingConvictIdentifier(), r.setting.Watch.Type, r.setting.Action, ttl, now)
r.manager.markQuarantine(
r.resourceGroupName, r.getSettingConvictIdentifier(),
r.setting.Watch.Type, r.setting.Action, r.setting.SwitchGroupName,
ttl, now,
)
}

func (r *RunawayChecker) markRunaway(matchType RunawayMatchType, action rmpb.RunawayAction, now *time.Time) {
actionStr := strings.ToLower(rmpb.RunawayAction_name[int32(action)])
func (r *RunawayChecker) markRunaway(matchType RunawayMatchType, action rmpb.RunawayAction, switchGroupName string, now *time.Time) {
var actionStr string
switch action {
case rmpb.RunawayAction_NoneAction, rmpb.RunawayAction_DryRun, rmpb.RunawayAction_CoolDown, rmpb.RunawayAction_Kill:
actionStr = action.String()
case rmpb.RunawayAction_SwitchGroup:
actionStr = fmt.Sprintf("%s(%s)", action.String(), switchGroupName)
}
actionStr = strings.ToLower(actionStr)
metrics.RunawayCheckerCounter.WithLabelValues(r.resourceGroupName, matchType.String(), actionStr).Inc()
r.manager.markRunaway(r.resourceGroupName, r.originalSQL, r.planDigest, actionStr, matchType, now)
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/domain/runaway.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,8 +566,9 @@ func getRunawayWatchRecord(exec sqlexec.RestrictedSQLExecutor, reader *SystemTab
Watch: rmpb.RunawayWatchType(r.GetInt64(4)),
WatchText: r.GetString(5),
Source: r.GetString(6),
Action: rmpb.RunawayAction(r.GetInt64(7)),
}
// TODO: initialize the switch group name from the system table also.
qr.SetAction(rmpb.RunawayAction(r.GetInt64(7)), "")
// If a TiDB write record slow, it will occur that the record which has earlier start time is inserted later than others.
// So we start the scan a little earlier.
if push {
Expand Down Expand Up @@ -606,8 +607,9 @@ func getRunawayWatchDoneRecord(exec sqlexec.RestrictedSQLExecutor, reader *Syste
Watch: rmpb.RunawayWatchType(r.GetInt64(5)),
WatchText: r.GetString(6),
Source: r.GetString(7),
Action: rmpb.RunawayAction(r.GetInt64(8)),
}
// TODO: initialize the switch group name from the system table also.
qr.SetAction(rmpb.RunawayAction(r.GetInt64(8)), "")
// Ditto as getRunawayWatchRecord.
if push {
reader.CheckPoint = now.Add(-3 * runawayWatchSyncInterval)
Expand Down
6 changes: 5 additions & 1 deletion pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,9 +547,13 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
_, planDigest := GetPlanDigest(stmtCtx)
_, sqlDigest := stmtCtx.SQLDigest()
stmtCtx.RunawayChecker = rm.DeriveChecker(stmtCtx.ResourceGroupName, stmtCtx.OriginalSQL, sqlDigest.String(), planDigest.String(), sessionVars.StartTime)
if err := stmtCtx.RunawayChecker.BeforeExecutor(); err != nil {
switchGroupName, err := stmtCtx.RunawayChecker.BeforeExecutor()
if err != nil {
return nil, err
}
if len(switchGroupName) > 0 {
stmtCtx.ResourceGroupName = switchGroupName
}
}
ctx = a.observeStmtBeginForTopSQL(ctx)

Expand Down
7 changes: 3 additions & 4 deletions pkg/executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3614,7 +3614,6 @@ func (e *memtableRetriever) setDataFromRunawayWatches(sctx sessionctx.Context) e
watches := do.GetRunawayWatchList()
rows := make([][]types.Datum, 0, len(watches))
for _, watch := range watches {
action := watch.Action
row := types.MakeDatums(
watch.ID,
watch.ResourceGroupName,
Expand All @@ -3623,7 +3622,7 @@ func (e *memtableRetriever) setDataFromRunawayWatches(sctx sessionctx.Context) e
rmpb.RunawayWatchType_name[int32(watch.Watch)],
watch.WatchText,
watch.Source,
rmpb.RunawayAction_name[int32(action)],
watch.GetActionString(),
)
if watch.EndTime.Equal(resourcegroup.NullTime) {
row[3].SetString("UNLIMITED", mysql.DefaultCollationName)
Expand Down Expand Up @@ -3665,12 +3664,12 @@ func (e *memtableRetriever) setDataFromResourceGroups() error {
}
dur := time.Duration(setting.Rule.ExecElapsedTimeMs) * time.Millisecond
fmt.Fprintf(limitBuilder, "EXEC_ELAPSED='%s'", dur.String())
actionType := model.RunawayActionType(setting.ActionV2.GetType())
actionType := model.RunawayActionType(setting.Action)
switch actionType {
case model.RunawayActionDryRun, model.RunawayActionCooldown, model.RunawayActionKill:
fmt.Fprintf(limitBuilder, ", ACTION=%s", actionType.String())
case model.RunawayActionSwitchGroup:
fmt.Fprintf(limitBuilder, ", ACTION=%s(%s)", actionType.String(), setting.ActionV2.GetSwitchGroupName())
fmt.Fprintf(limitBuilder, ", ACTION=%s(%s)", actionType.String(), setting.SwitchGroupName)
default:
}
if setting.Watch != nil {
Expand Down
Loading

0 comments on commit 028d340

Please sign in to comment.