diff --git a/sqle/server/auditplan/task_type_mysql_processlist.go b/sqle/server/auditplan/task_type_mysql_processlist.go index b654e699b..5cb8f739f 100644 --- a/sqle/server/auditplan/task_type_mysql_processlist.go +++ b/sqle/server/auditplan/task_type_mysql_processlist.go @@ -2,16 +2,16 @@ package auditplan import ( "context" + "database/sql" "fmt" "strconv" "time" - "github.com/actiontech/sqle/sqle/errors" - "github.com/actiontech/sqle/sqle/dms" "github.com/actiontech/sqle/sqle/driver/mysql/executor" "github.com/actiontech/sqle/sqle/driver/mysql/util" driverV2 "github.com/actiontech/sqle/sqle/driver/v2" + "github.com/actiontech/sqle/sqle/errors" "github.com/actiontech/sqle/sqle/locale" "github.com/actiontech/sqle/sqle/model" "github.com/actiontech/sqle/sqle/pkg/params" @@ -54,24 +54,6 @@ func (at *MySQLProcessListTaskV2) Audit(sqls []*model.SQLManageRecord) (*AuditRe return auditSQLs(sqls) } -func (at *MySQLProcessListTaskV2) processListSQL(ap *AuditPlan) string { - sql := ` -SELECT DISTINCT db,time,info -FROM information_schema.processlist -WHERE ID != connection_id() AND info != '' AND db NOT IN ('information_schema','performance_schema','mysql','sys') -%v -` - whereSqlMinSecond := "" - { - sqlMinSecond := ap.Params.GetParam(paramKeySQLMinSecond).Int() - if sqlMinSecond > 0 { - whereSqlMinSecond = fmt.Sprintf("AND TIME > %d", sqlMinSecond) - } - } - sql = fmt.Sprintf(sql, whereSqlMinSecond) - return sql -} - func (at *MySQLProcessListTaskV2) ExtractSQL(logger *logrus.Entry, ap *AuditPlan, persist *model.Storage) ([]*SQLV2, error) { if ap.InstanceID == "" { return nil, fmt.Errorf("instance is not configured") @@ -99,17 +81,22 @@ func (at *MySQLProcessListTaskV2) ExtractSQL(logger *logrus.Entry, ap *AuditPlan } defer db.Db.Close() - res, err := db.Db.Query(at.processListSQL(ap)) + // 查询 SHOW FULL PROCESSLIST + res, err := db.Db.Query("SHOW FULL PROCESSLIST") if err != nil { - return nil, fmt.Errorf("show processlist failed, error: %v", err) + return nil, fmt.Errorf("SHOW FULL PROCESSLIST failed, error: %v", err) } - if len(res) == 0 { + if len(res) <= 1 { // 仅有自己的连接 return nil, nil } cache := NewSQLV2Cache() + sqlMinSecond := ap.Params.GetParam(paramKeySQLMinSecond).Int() for i := range res { - query := res[i]["info"].String + if at.filterFullProcessList(res[i], sqlMinSecond, db.Db.GetConnectionID()) { + continue + } + query := res[i]["Info"].String sqlV2 := &SQLV2{ Source: ap.Type, SourceId: strconv.FormatUint(uint64(ap.InstanceAuditPlanId), 10), @@ -145,3 +132,21 @@ func (at *MySQLProcessListTaskV2) ExtractSQL(logger *logrus.Entry, ap *AuditPlan } return cache.GetSQLs(), nil } + +func (at *MySQLProcessListTaskV2) filterFullProcessList(row map[string]sql.NullString, sqlMinSecond int, connID string) bool { + if sqlMinSecond > 0 { + queryTime, _ := strconv.Atoi(row["Time"].String) + if sqlMinSecond > queryTime { + return true + } + } + if row["Info"].String == "" || + row["Id"].String == connID || + row["db"].String == "information_schema" || + row["db"].String == "performance_schema" || + row["db"].String == "mysql" || + row["db"].String == "sys" { + return true + } + return false +}