Skip to content

Commit

Permalink
Merge pull request #2849 from actiontech/fix-MySQL-ProcessList-trunca…
Browse files Browse the repository at this point in the history
…ted-sql

Fix my sql process list truncated sql
  • Loading branch information
littleniannian authored Jan 2, 2025
2 parents ba9d6c4 + f938a9d commit 6cef63f
Showing 1 changed file with 29 additions and 24 deletions.
53 changes: 29 additions & 24 deletions sqle/server/auditplan/task_type_mysql_processlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ package auditplan

import (
"context"
"database/sql"
"fmt"
"strconv"
"time"

"github.com/actiontech/sqle/sqle/errors"

"github.com/actiontech/sqle/sqle/dms"
"github.com/actiontech/sqle/sqle/driver/mysql/executor"
"github.com/actiontech/sqle/sqle/driver/mysql/util"
driverV2 "github.com/actiontech/sqle/sqle/driver/v2"
"github.com/actiontech/sqle/sqle/errors"
"github.com/actiontech/sqle/sqle/locale"
"github.com/actiontech/sqle/sqle/model"
"github.com/actiontech/sqle/sqle/pkg/params"
Expand Down Expand Up @@ -54,24 +54,6 @@ func (at *MySQLProcessListTaskV2) Audit(sqls []*model.SQLManageRecord) (*AuditRe
return auditSQLs(sqls)
}

func (at *MySQLProcessListTaskV2) processListSQL(ap *AuditPlan) string {
sql := `
SELECT DISTINCT db,time,info
FROM information_schema.processlist
WHERE ID != connection_id() AND info != '' AND db NOT IN ('information_schema','performance_schema','mysql','sys')
%v
`
whereSqlMinSecond := ""
{
sqlMinSecond := ap.Params.GetParam(paramKeySQLMinSecond).Int()
if sqlMinSecond > 0 {
whereSqlMinSecond = fmt.Sprintf("AND TIME > %d", sqlMinSecond)
}
}
sql = fmt.Sprintf(sql, whereSqlMinSecond)
return sql
}

func (at *MySQLProcessListTaskV2) ExtractSQL(logger *logrus.Entry, ap *AuditPlan, persist *model.Storage) ([]*SQLV2, error) {
if ap.InstanceID == "" {
return nil, fmt.Errorf("instance is not configured")
Expand Down Expand Up @@ -99,17 +81,22 @@ func (at *MySQLProcessListTaskV2) ExtractSQL(logger *logrus.Entry, ap *AuditPlan
}
defer db.Db.Close()

res, err := db.Db.Query(at.processListSQL(ap))
// 查询 SHOW FULL PROCESSLIST
res, err := db.Db.Query("SHOW FULL PROCESSLIST")
if err != nil {
return nil, fmt.Errorf("show processlist failed, error: %v", err)
return nil, fmt.Errorf("SHOW FULL PROCESSLIST failed, error: %v", err)
}

if len(res) == 0 {
if len(res) <= 1 { // 仅有自己的连接
return nil, nil
}
cache := NewSQLV2Cache()
sqlMinSecond := ap.Params.GetParam(paramKeySQLMinSecond).Int()
for i := range res {
query := res[i]["info"].String
if at.filterFullProcessList(res[i], sqlMinSecond, db.Db.GetConnectionID()) {
continue
}
query := res[i]["Info"].String
sqlV2 := &SQLV2{
Source: ap.Type,
SourceId: strconv.FormatUint(uint64(ap.InstanceAuditPlanId), 10),
Expand Down Expand Up @@ -145,3 +132,21 @@ func (at *MySQLProcessListTaskV2) ExtractSQL(logger *logrus.Entry, ap *AuditPlan
}
return cache.GetSQLs(), nil
}

func (at *MySQLProcessListTaskV2) filterFullProcessList(row map[string]sql.NullString, sqlMinSecond int, connID string) bool {
if sqlMinSecond > 0 {
queryTime, _ := strconv.Atoi(row["Time"].String)
if sqlMinSecond > queryTime {
return true
}
}
if row["Info"].String == "" ||
row["Id"].String == connID ||
row["db"].String == "information_schema" ||
row["db"].String == "performance_schema" ||
row["db"].String == "mysql" ||
row["db"].String == "sys" {
return true
}
return false
}

0 comments on commit 6cef63f

Please sign in to comment.