Skip to content

Commit

Permalink
Implement the switch group action
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Aug 19, 2024
1 parent 9e59e7b commit 319c7b1
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 46 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -6114,13 +6114,13 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sha256 = "4828494251d340d2afb6d66409186c5c20c281b043ef4389edfcd9090627fbf7",
strip_prefix = "github.com/JmPotato/[email protected]20240730032855-30010ef5dc20",
sha256 = "6771bc6ceac87e9f82c728d552003081ee634864727795a8d2f7a3fa53c067df",
strip_prefix = "github.com/JmPotato/[email protected]20240807105224-7e7a2da7bfcd",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/JmPotato/kvproto/com_github_jmpotato_kvproto-v0.0.0-20240730032855-30010ef5dc20.zip",
"http://ats.apps.svc/gomod/github.com/JmPotato/kvproto/com_github_jmpotato_kvproto-v0.0.0-20240730032855-30010ef5dc20.zip",
"https://cache.hawkingrei.com/gomod/github.com/JmPotato/kvproto/com_github_jmpotato_kvproto-v0.0.0-20240730032855-30010ef5dc20.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/JmPotato/kvproto/com_github_jmpotato_kvproto-v0.0.0-20240730032855-30010ef5dc20.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/JmPotato/kvproto/com_github_jmpotato_kvproto-v0.0.0-20240807105224-7e7a2da7bfcd.zip",
"http://ats.apps.svc/gomod/github.com/JmPotato/kvproto/com_github_jmpotato_kvproto-v0.0.0-20240807105224-7e7a2da7bfcd.zip",
"https://cache.hawkingrei.com/gomod/github.com/JmPotato/kvproto/com_github_jmpotato_kvproto-v0.0.0-20240807105224-7e7a2da7bfcd.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/JmPotato/kvproto/com_github_jmpotato_kvproto-v0.0.0-20240807105224-7e7a2da7bfcd.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/systable_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,5 +115,5 @@ func TestCheckSysTableCompatibility(t *testing.T) {
//
// The above variables are in the file br/pkg/restore/systable_restore.go
func TestMonitorTheSystemTableIncremental(t *testing.T) {
require.Equal(t, int64(211), session.CurrentBootstrapVersion)
require.Equal(t, int64(212), session.CurrentBootstrapVersion)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/pingcap/tidb

go 1.21

replace github.com/pingcap/kvproto => github.com/JmPotato/kvproto v0.0.0-20240730032855-30010ef5dc20
replace github.com/pingcap/kvproto => github.com/JmPotato/kvproto v0.0.0-20240807105224-7e7a2da7bfcd

require (
cloud.google.com/go/storage v1.38.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -845,8 +845,8 @@ github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ=
github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM=
github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo=
github.com/JmPotato/kvproto v0.0.0-20240730032855-30010ef5dc20 h1:Hxu4QP2WeowrYaqta923Stz3ZVY55QGAzOTzy6QekFI=
github.com/JmPotato/kvproto v0.0.0-20240730032855-30010ef5dc20/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/JmPotato/kvproto v0.0.0-20240807105224-7e7a2da7bfcd h1:y9cKHOkSUX2wiMt7J5IeKNl4aSCuyIF2Y0ah/MuYVhc=
github.com/JmPotato/kvproto v0.0.0-20240807105224-7e7a2da7bfcd/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU=
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk=
github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI=
Expand Down
5 changes: 1 addition & 4 deletions pkg/ddl/resourcegroup/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@ func NewGroupFromOptions(groupName string, options *model.ResourceGroupSettings)
}
// Update the action settings.
runaway.Action = rmpb.RunawayAction(options.Runaway.Action.Type)
runaway.ActionV2 = &rmpb.RunawayActionV2{
Type: runaway.Action,
SwitchGroupName: options.Runaway.Action.SwitchGroupName,
}
runaway.SwitchGroupName = options.Runaway.Action.SwitchGroupName
// Update the watch settings.
if options.Runaway.WatchType != model.WatchNone {
runaway.Watch = &rmpb.RunawayWatch{}
Expand Down
106 changes: 80 additions & 26 deletions pkg/domain/resourcegroup/runaway.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,35 @@ type QuarantineRecord struct {
Watch rmpb.RunawayWatchType
WatchText string
Source string
Action rmpb.RunawayAction
// Action-related fields.
Action rmpb.RunawayAction
SwitchGroupName string
}

// GetRecordKey is used to get the key in ttl cache.
func (r *QuarantineRecord) GetRecordKey() string {
return r.ResourceGroupName + "/" + r.WatchText
}

// GetSwitchGroupName returns the switch group name if the action type is `SWITCH_GROUP`.
func (r *QuarantineRecord) GetSwitchGroupName() string {
if r.Action == rmpb.RunawayAction_SwitchGroup {
return r.SwitchGroupName
}
return ""
}

// GetActionString returns the action string.
func (r *QuarantineRecord) GetActionString() string {
if r == nil {
return rmpb.RunawayAction_NoneAction.String()
}
if r.Action == rmpb.RunawayAction_SwitchGroup {
return fmt.Sprintf("%s(%s)", r.Action.String(), r.SwitchGroupName)
}
return r.Action.String()
}

func writeInsert(builder *strings.Builder, tableName string) {
builder.WriteString("insert into ")
builder.WriteString(tableName)
Expand All @@ -115,7 +136,7 @@ func (r *QuarantineRecord) GenInsertionStmt() (string, []any) {
var builder strings.Builder
params := make([]any, 0, 6)
writeInsert(&builder, RunawayWatchTableName)
builder.WriteString("(null, %?, %?, %?, %?, %?, %?, %?)")
builder.WriteString("(null, %?, %?, %?, %?, %?, %?, %?, %?)")
params = append(params, r.ResourceGroupName)
params = append(params, r.StartTime)
if r.EndTime.Equal(NullTime) {
Expand All @@ -127,6 +148,7 @@ func (r *QuarantineRecord) GenInsertionStmt() (string, []any) {
params = append(params, r.WatchText)
params = append(params, r.Source)
params = append(params, r.Action)
params = append(params, r.GetSwitchGroupName())
return builder.String(), params
}

Expand All @@ -135,7 +157,7 @@ func (r *QuarantineRecord) GenInsertionDoneStmt() (string, []any) {
var builder strings.Builder
params := make([]any, 0, 9)
writeInsert(&builder, RunawayWatchDoneTableName)
builder.WriteString("(null, %?, %?, %?, %?, %?, %?, %?, %?, %?)")
builder.WriteString("(null, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?)")
params = append(params, r.ID)
params = append(params, r.ResourceGroupName)
params = append(params, r.StartTime)
Expand All @@ -148,6 +170,7 @@ func (r *QuarantineRecord) GenInsertionDoneStmt() (string, []any) {
params = append(params, r.WatchText)
params = append(params, r.Source)
params = append(params, r.Action)
params = append(params, r.GetSwitchGroupName())
params = append(params, time.Now().UTC())
return builder.String(), params
}
Expand Down Expand Up @@ -256,7 +279,11 @@ func (rm *RunawayManager) DeriveChecker(resourceGroupName, originalSQL, sqlDiges
return newRunawayChecker(rm, resourceGroupName, group.RunawaySettings, originalSQL, sqlDigest, planDigest, startTime)
}

func (rm *RunawayManager) markQuarantine(resourceGroupName, convict string, watchType rmpb.RunawayWatchType, action rmpb.RunawayAction, ttl time.Duration, now *time.Time) {
func (rm *RunawayManager) markQuarantine(
resourceGroupName, convict string,
watchType rmpb.RunawayWatchType, action rmpb.RunawayAction, switchGroupName string,
ttl time.Duration, now *time.Time,
) {
var endTime time.Time
if ttl > 0 {
endTime = now.UTC().Add(ttl)
Expand All @@ -269,6 +296,7 @@ func (rm *RunawayManager) markQuarantine(resourceGroupName, convict string, watc
WatchText: convict,
Source: rm.serverID,
Action: action,
SwitchGroupName: switchGroupName,
}
// Add record without ID into watch list in this TiDB right now.
rm.addWatchList(record, ttl, false)
Expand Down Expand Up @@ -423,12 +451,12 @@ func (rm *RunawayManager) StaleQuarantineRecordChan() <-chan *QuarantineRecord {
}

// examineWatchList check whether the query is in watch list.
func (rm *RunawayManager) examineWatchList(resourceGroupName string, convict string) (bool, rmpb.RunawayAction) {
func (rm *RunawayManager) examineWatchList(resourceGroupName string, convict string) (bool, rmpb.RunawayAction, string) {
item := rm.getWatchFromWatchList(resourceGroupName + "/" + convict)
if item == nil {
return false, 0
return false, 0, ""
}
return true, item.Action
return true, item.Action, item.GetSwitchGroupName()
}

// Stop stops the watchList which is a ttlcache.
Expand Down Expand Up @@ -457,7 +485,12 @@ type RunawayChecker struct {
markedByRule atomic.Bool
// markedByWatch is set to true when the query matches the specified watch rules.
markedByWatch bool
watchAction rmpb.RunawayAction
// watchAction is the specified watch action for the runaway query.
// If it's not given, the action defined in `settings` will be used.
watchAction rmpb.RunawayAction
// watchSwitchGroupName is the specified switch group name for the runaway query.
// If it's not given, the switch group name defined in `settings` will be used.
watchSwitchGroupName string
}

func newRunawayChecker(
Expand All @@ -480,38 +513,47 @@ func newRunawayChecker(
}

// BeforeExecutor checks whether query is in watch list before executing and after compiling.
func (r *RunawayChecker) BeforeExecutor() error {
func (r *RunawayChecker) BeforeExecutor() (string, error) {
if r == nil {
return nil
return "", nil
}
var (
watched bool
action rmpb.RunawayAction
switchGroupName string
)
// Check if the query matches any specified watch rules.
for _, convict := range r.getConvictIdentifiers() {
watched, action := r.manager.examineWatchList(r.resourceGroupName, convict)
watched, action, switchGroupName = r.manager.examineWatchList(r.resourceGroupName, convict)
if !watched {
continue
}
// Use the group runaway settings if none are provided.
if action == rmpb.RunawayAction_NoneAction && r.settings != nil {
action = r.settings.Action
switchGroupName = r.settings.SwitchGroupName
}
// Mark it if this is the first time being watched.
r.markRunawayByWatch(action)
r.markRunawayByWatch(action, switchGroupName)
// Take action if needed.
switch action {
case rmpb.RunawayAction_Kill:
// Return an error to interrupt the query.
return exeerrors.ErrResourceGroupQueryRunawayQuarantine
return "", exeerrors.ErrResourceGroupQueryRunawayQuarantine
case rmpb.RunawayAction_CoolDown:
// This action will be handled in `BeforeCopRequest`.
return nil
return "", nil
case rmpb.RunawayAction_DryRun:
// Noop.
return nil
return "", nil
case rmpb.RunawayAction_SwitchGroup:
// Return the switch group name to switch the resource group before executing.
return switchGroupName, nil
default:
// Continue to examine other convicts.
}
}
return nil
return "", nil
}

// CheckAction is used to check current action of the query.
Expand Down Expand Up @@ -542,7 +584,7 @@ func (r *RunawayChecker) CheckRuleKillAction() bool {
if until > 0 {
return false
}
r.markRunawayByIdentify(r.settings.Action, &now)
r.markRunawayByIdentify(&now)
return r.settings.Action == rmpb.RunawayAction_Kill
}
return false
Expand Down Expand Up @@ -582,7 +624,7 @@ func (r *RunawayChecker) BeforeCopRequest(req *tikvrpc.Request) error {
return nil
}
// execution time exceeds the threshold, mark the query as runaway
r.markRunawayByIdentify(r.settings.Action, &now)
r.markRunawayByIdentify(&now)
// Take action if needed.
switch r.settings.Action {
case rmpb.RunawayAction_Kill:
Expand All @@ -605,7 +647,7 @@ func (r *RunawayChecker) CheckCopRespError(err error) error {
if strings.HasPrefix(err.Error(), "Coprocessor task terminated due to exceeding the deadline") {
if !r.markedByRule.Load() {
now := time.Now()
if r.deadline.Before(now) && r.markRunawayByIdentify(r.settings.Action, &now) {
if r.deadline.Before(now) && r.markRunawayByIdentify(&now) {
return exeerrors.ErrResourceGroupQueryRunawayInterrupted
}
}
Expand All @@ -623,29 +665,41 @@ func (r *RunawayChecker) markQuarantine(now *time.Time) {
}
ttl := time.Duration(r.settings.Watch.LastingDurationMs) * time.Millisecond

r.manager.markQuarantine(r.resourceGroupName, r.getSettingConvictIdentifier(), r.settings.Watch.Type, r.settings.Action, ttl, now)
r.manager.markQuarantine(
r.resourceGroupName, r.getSettingConvictIdentifier(),
r.settings.Watch.Type, r.settings.Action, r.settings.SwitchGroupName,
ttl, now,
)
}

func (r *RunawayChecker) markRunawayByIdentify(action rmpb.RunawayAction, now *time.Time) bool {
func (r *RunawayChecker) markRunawayByIdentify(now *time.Time) bool {
swapped := r.markedByRule.CompareAndSwap(false, true)
if swapped {
r.markRunaway("identify", action, now)
r.markRunaway("identify", r.settings.Action, r.settings.SwitchGroupName, now)
if !r.markedByWatch {
r.markQuarantine(now)
}
}
return swapped
}

func (r *RunawayChecker) markRunawayByWatch(action rmpb.RunawayAction) {
func (r *RunawayChecker) markRunawayByWatch(action rmpb.RunawayAction, switchGroupName string) {
r.markedByWatch = true
r.watchAction = action
r.watchSwitchGroupName = switchGroupName
now := time.Now()
r.markRunaway("watch", action, &now)
r.markRunaway("watch", action, switchGroupName, &now)
}

func (r *RunawayChecker) markRunaway(matchType string, action rmpb.RunawayAction, now *time.Time) {
actionStr := strings.ToLower(action.String())
func (r *RunawayChecker) markRunaway(matchType string, action rmpb.RunawayAction, switchGroupName string, now *time.Time) {
var actionStr string
switch action {
case rmpb.RunawayAction_NoneAction, rmpb.RunawayAction_DryRun, rmpb.RunawayAction_CoolDown, rmpb.RunawayAction_Kill:
actionStr = action.String()
case rmpb.RunawayAction_SwitchGroup:
actionStr = fmt.Sprintf("%s(%s)", action.String(), switchGroupName)
}
actionStr = strings.ToLower(actionStr)
metrics.RunawayCheckerCounter.WithLabelValues(r.resourceGroupName, matchType, actionStr).Inc()
r.manager.markRunaway(r.resourceGroupName, r.originalSQL, r.planDigest, actionStr, matchType, now)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/domain/runaway.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ func getRunawayWatchRecord(exec sqlexec.RestrictedSQLExecutor, reader *SystemTab
WatchText: r.GetString(5),
Source: r.GetString(6),
Action: rmpb.RunawayAction(r.GetInt64(7)),
SwitchGroupName: r.GetString(8),
}
// If a TiDB write record slow, it will occur that the record which has earlier start time is inserted later than others.
// So we start the scan a little earlier.
Expand Down Expand Up @@ -607,6 +608,7 @@ func getRunawayWatchDoneRecord(exec sqlexec.RestrictedSQLExecutor, reader *Syste
WatchText: r.GetString(6),
Source: r.GetString(7),
Action: rmpb.RunawayAction(r.GetInt64(8)),
SwitchGroupName: r.GetString(9),
}
// Ditto as getRunawayWatchRecord.
if push {
Expand Down
6 changes: 5 additions & 1 deletion pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,9 +547,13 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
_, planDigest := GetPlanDigest(stmtCtx)
_, sqlDigest := stmtCtx.SQLDigest()
stmtCtx.RunawayChecker = rm.DeriveChecker(stmtCtx.ResourceGroupName, stmtCtx.OriginalSQL, sqlDigest.String(), planDigest.String(), sessionVars.StartTime)
if err := stmtCtx.RunawayChecker.BeforeExecutor(); err != nil {
switchGroupName, err := stmtCtx.RunawayChecker.BeforeExecutor()
if err != nil {
return nil, err
}
if len(switchGroupName) > 0 {
stmtCtx.ResourceGroupName = switchGroupName
}
}
ctx = a.observeStmtBeginForTopSQL(ctx)

Expand Down
7 changes: 3 additions & 4 deletions pkg/executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3556,7 +3556,6 @@ func (e *memtableRetriever) setDataFromRunawayWatches(sctx sessionctx.Context) e
watches := do.GetRunawayWatchList()
rows := make([][]types.Datum, 0, len(watches))
for _, watch := range watches {
action := watch.Action
row := types.MakeDatums(
watch.ID,
watch.ResourceGroupName,
Expand All @@ -3565,7 +3564,7 @@ func (e *memtableRetriever) setDataFromRunawayWatches(sctx sessionctx.Context) e
watch.Watch.String(),
watch.WatchText,
watch.Source,
action.String(),
watch.GetActionString(),
)
if watch.EndTime.Equal(resourcegroup.NullTime) {
row[3].SetString("UNLIMITED", mysql.DefaultCollationName)
Expand Down Expand Up @@ -3607,12 +3606,12 @@ func (e *memtableRetriever) setDataFromResourceGroups() error {
}
dur := time.Duration(setting.Rule.ExecElapsedTimeMs) * time.Millisecond
fmt.Fprintf(limitBuilder, "EXEC_ELAPSED='%s'", dur.String())
actionType := model.RunawayActionType(setting.ActionV2.GetType())
actionType := model.RunawayActionType(setting.Action)
switch actionType {
case model.RunawayActionDryRun, model.RunawayActionCooldown, model.RunawayActionKill:
fmt.Fprintf(limitBuilder, ", ACTION=%s", actionType.String())
case model.RunawayActionSwitchGroup:
fmt.Fprintf(limitBuilder, ", ACTION=%s(%s)", actionType.String(), setting.ActionV2.GetSwitchGroupName())
fmt.Fprintf(limitBuilder, ", ACTION=%s(%s)", actionType.String(), setting.SwitchGroupName)
default:
}
if setting.Watch != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/executor/internal/querywatch/query_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func setWatchOption(ctx context.Context,
}
case ast.QueryWatchAction:
record.Action = rmpb.RunawayAction(op.ActionOption.Type)
record.SwitchGroupName = op.ActionOption.SwitchGroupName.String()
case ast.QueryWatchType:
textOption := op.TextOption
expr, err := plannerutil.RewriteAstExprWithPlanCtx(sctx.GetPlanCtx(), textOption.PatternExpr, nil, nil, false)
Expand Down Expand Up @@ -149,7 +150,9 @@ func validateWatchRecord(record *resourcegroup.QuarantineRecord, client *rmclien
return errors.Errorf("must set runaway config for resource group `%s`", record.ResourceGroupName)
}
record.Action = rg.RunawaySettings.Action
record.SwitchGroupName = rg.RunawaySettings.SwitchGroupName
}
// TODO: validate the switch group.
if record.Watch == rmpb.RunawayWatchType_NoneWatch {
return errors.Errorf("must specify watch type")
}
Expand Down
Loading

0 comments on commit 319c7b1

Please sign in to comment.