Skip to content

Commit

Permalink
调整为全局gorm连接
Browse files Browse the repository at this point in the history
  • Loading branch information
milkystar6 committed Jan 24, 2024
1 parent 1f8d9fc commit ceb2e47
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,36 @@ package customize_exporter

import (
"fmt"
"github.com/flipped-aurora/gin-vue-admin/server/grpcServer/config"
"github.com/flipped-aurora/gin-vue-admin/server/grpcServer/model"
mo "github.com/flipped-aurora/gin-vue-admin/server/model/saasdb"
"gorm.io/gorm"
)

func (c *CustomizeCollector) ActiveSessions() {
cfg := config.LoadConfig
func (c *CustomizeCollector) ActiveSessions(dbInformationSchema dbConnCfg, csaas *gorm.DB, localdb *gorm.DB) {
//cfg := config.LoadConfig
// 访问saasdb ==> get 在saasdb 注册了的数据库的端口
// 根据端口 去分别查询数据库
localAddr := cfg.MyHostAddrInfo.MyIP

csaas := c.connSaasdb()
var ins mo.Instance
portSlice, _ := ins.QueryPortsByIP(csaas, localAddr, keyForMySQL)
for _, v := range portSlice {

dbInformationSchema := dbConnCfg{
//User: config.LoadConfig.MySQLManager.MysqlManagerUser,
//Passwd: config.LoadConfig.MySQLManager.MysqlManagerPassword,
Host: localAddr,
Port: v,
Db: informationSchema,
}
go c.CountActiveSessions(dbInformationSchema, csaas)
}
//localAddr := cfg.MyHostAddrInfo.MyIP
//
//csaas, _ := c.connSaasdb()
//var ins mo.Instance
//portSlice, _ := ins.QueryPortsByIP(csaas, localAddr, keyForMySQL)
//for _, v := range portSlice {
//
// dbInformationSchema := dbConnCfg{
// //User: config.LoadConfig.MySQLManager.MysqlManagerUser,
// //Passwd: config.LoadConfig.MySQLManager.MysqlManagerPassword,
// Host: localAddr,
// Port: v,
// Db: informationSchema,
// }
// go c.CountActiveSessions(dbInformationSchema, csaas)
//}
go c.CountActiveSessions(dbInformationSchema, csaas, localdb)
}

// CountActiveSessions 统计活跃会话的个数
func (c *CustomizeCollector) CountActiveSessions(dbInformationSchema dbConnCfg, csaasdb *gorm.DB) {
func (c *CustomizeCollector) CountActiveSessions(dbInformationSchema dbConnCfg, csaasdb *gorm.DB, db *gorm.DB) {
// 定义会话状态常量
//var SessionStates = []string{
// "INIT",
Expand All @@ -54,7 +54,7 @@ func (c *CustomizeCollector) CountActiveSessions(dbInformationSchema dbConnCfg,
// "UPDATE BY KEYCACHE",
// "WRITE TO BINLOG",
//}
db := c.connLocalMySQL(dbInformationSchema)
//db, _ := c.connLocalMySQL(dbInformationSchema)
var pro model.InformationSchemaProcesslist
//count := pro.CountActiveSessions(db, SessionStates)

Expand Down Expand Up @@ -87,6 +87,6 @@ func (c *CustomizeCollector) CountActiveSessions(dbInformationSchema dbConnCfg,
}

// 关闭连接
c.CloseDB(db)
c.CloseDB(csaasdb)
//c.CloseDB(db)
//c.CloseDB(csaasdb)
}
55 changes: 26 additions & 29 deletions server/grpcServer/customize_exporter/collector_for_dumpbinlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,42 @@ package customize_exporter

import (
"fmt"
"github.com/flipped-aurora/gin-vue-admin/server/grpcServer/config"
"github.com/flipped-aurora/gin-vue-admin/server/grpcServer/model"
mo "github.com/flipped-aurora/gin-vue-admin/server/model/saasdb"
"gorm.io/gorm"
)

/* 检查是否有过多的keyForOggOrDpUser的slave 会话 */

func (c *CustomizeCollector) CheckBinlogDumpThreadsCounts() {
cfg := config.LoadConfig
func (c *CustomizeCollector) CheckBinlogDumpThreadsCounts(dbInformationSchema dbConnCfg, csaas *gorm.DB, localdb *gorm.DB) {
//cfg := config.LoadConfig
// 访问saasdb ==> get 在saasdb 注册了的数据库的端口
// 根据端口 去分别查询数据库
localAddr := cfg.MyHostAddrInfo.MyIP

csaas := c.connSaasdb()
var ins mo.Instance
portSlice, _ := ins.QueryPortsByIP(csaas, localAddr, keyForMySQL)

for _, v := range portSlice {

dbInformationSchema := dbConnCfg{
//User: config.LoadConfig.MySQLManager.MysqlManagerUser,
//Passwd: config.LoadConfig.MySQLManager.MysqlManagerPassword,
Host: localAddr,
Port: v,
Db: informationSchema,
}

go c.CheckBinlogDumpThreadsCountsWorker(dbInformationSchema, csaas)
}
//localAddr := cfg.MyHostAddrInfo.MyIP

//csaas := c.connSaasdb()
//var ins mo.Instance
//portSlice, _ := ins.QueryPortsByIP(csaas, localAddr, keyForMySQL)
//
//for _, v := range portSlice {
//
// dbInformationSchema := dbConnCfg{
// //User: config.LoadConfig.MySQLManager.MysqlManagerUser,
// //Passwd: config.LoadConfig.MySQLManager.MysqlManagerPassword,
// Host: localAddr,
// Port: v,
// Db: informationSchema,
// }
//
// go c.CheckBinlogDumpThreadsCountsWorker(localdb, csaas)
//}
go c.CheckBinlogDumpThreadsCountsWorker(dbInformationSchema, localdb, csaas)
}

func (c *CustomizeCollector) CheckBinlogDumpThreadsCountsWorker(dbInformationSchema dbConnCfg, csaasdb *gorm.DB) {
db := c.connLocalMySQL(dbInformationSchema)
//func (c *CustomizeCollector) CheckBinlogDumpThreadsCountsWorker(dbInformationSchema dbConnCfg, csaasdb *gorm.DB) {

func (c *CustomizeCollector) CheckBinlogDumpThreadsCountsWorker(dbInformationSchema dbConnCfg, db *gorm.DB, csaasdb *gorm.DB) {
//db := c.connLocalMySQL(dbInformationSchema)
var pro model.InformationSchemaProcesslist

_, count := pro.GetProcesslistWithCommandAndUser(db, keyForDumpBinlog, keyForOggOrDpUser)
Expand All @@ -47,13 +48,9 @@ func (c *CustomizeCollector) CheckBinlogDumpThreadsCountsWorker(dbInformationSch

db1 := csaasdb
webhook, _ := wb.GetHookInfo(db1)

//TODO 根据IP、PORT获取MySQL的实例名称

// 推送webhook消息
if count > maxDumpStatConnNum {
fmtJson := fmt.Sprintf(`{"message_topic":"%v","ins_ip": "%v","ins_port":"%v","suppress_duration":%v,"info":"%v,counts:%v"}`, msgJsonTopicDumpGtid, dbInformationSchema.Host, dbInformationSchema.Port, oggOrDpSuppressDuration, keyForDumpBinlog, count)
fmt.Println(fmtJson)
data := []byte(fmtJson)
url := fmt.Sprintf("%v/api/reset", webhook.WebhookUrl)
headers := map[string]string{
Expand All @@ -66,6 +63,6 @@ func (c *CustomizeCollector) CheckBinlogDumpThreadsCountsWorker(dbInformationSch
}

// 关闭连接
c.CloseDB(db)
c.CloseDB(csaasdb)
//c.CloseDB(db)
//c.CloseDB(csaasdb)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package customize_exporter
import (
"fmt"
al "github.com/flipped-aurora/gin-vue-admin/server/grpcServer/agent_logger"
"github.com/flipped-aurora/gin-vue-admin/server/grpcServer/config"
mo "github.com/flipped-aurora/gin-vue-admin/server/model/saasdb"
"gorm.io/gorm"
"time"
)
Expand All @@ -25,35 +23,41 @@ type Transaction struct {
DiffSec string `gorm:"column:diff_sec"`
}

func (c *CustomizeCollector) GetLongQuerySql() {
cfg := config.LoadConfig
// 访问saasdb ==> get 在saasdb 注册了的数据库的端口
// 根据端口 去分别查询数据库
localAddr := cfg.MyHostAddrInfo.MyIP

csaas := c.connSaasdb()
var ins mo.Instance
portSlice, _ := ins.QueryPortsByIP(csaas, localAddr, keyForMySQL)
for _, v := range portSlice {

dbInformationSchema := dbConnCfg{
//User: config.LoadConfig.MySQLManager.MysqlManagerUser,
//Passwd: config.LoadConfig.MySQLManager.MysqlManagerPassword,
Host: localAddr,
Port: v,
Db: informationSchema,
func (c *CustomizeCollector) GetLongQuerySql(dbInformationSchema dbConnCfg, csaas *gorm.DB, localdb *gorm.DB) {
//cfg := config.LoadConfig
//// 访问saasdb ==> get 在saasdb 注册了的数据库的端口
//// 根据端口 去分别查询数据库
//localAddr := cfg.MyHostAddrInfo.MyIP
//
//csaas, _ := c.connSaasdb()
//var ins mo.Instance
//portSlice, _ := ins.QueryPortsByIP(csaas, localAddr, keyForMySQL)
//for _, v := range portSlice {
//
// dbInformationSchema := dbConnCfg{
// //User: config.LoadConfig.MySQLManager.MysqlManagerUser,
// //Passwd: config.LoadConfig.MySQLManager.MysqlManagerPassword,
// Host: localAddr,
// Port: v,
// Db: informationSchema,
// }
// go func() {
// err := c.LongQuerySql(dbInformationSchema, csaas)
// if err != nil {
// al.Error(fmt.Sprintf("分析长事务出错: %v", err))
// }
// }()
//}
go func() {
err := c.LongQuerySql(dbInformationSchema, csaas, localdb)
if err != nil {
al.Error(fmt.Sprintf("分析长事务出错: %v", err))
}
go func() {
err := c.LongQuerySql(dbInformationSchema, csaas)
if err != nil {
al.Error(fmt.Sprintf("分析长事务出错: %v", err))
}
}()
}
}()
}

func (c *CustomizeCollector) LongQuerySql(cfg dbConnCfg, csaas *gorm.DB) (e error) {
db := c.connLocalMySQL(cfg)
func (c *CustomizeCollector) LongQuerySql(cfg dbConnCfg, csaas *gorm.DB, db *gorm.DB) (e error) {
//db, _ := c.connLocalMySQL(cfg)

tol := int64(0)
yyy := "%S_TIMETASK%"
Expand Down Expand Up @@ -136,7 +140,7 @@ inner join PERFORMANCE_SCHEMA .events_statements_current d on
SendMsg2WebHook(csaas, msg)
}

c.CloseDB(db)
c.CloseDB(csaas)
//c.CloseDB(db)
//c.CloseDB(csaas)
return e
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var tbs []tb

func (c *CustomizeCollector) functionPkTable(cfg dbConnCfg) ([]interface{}, []interface{}, error) {

db := c.connLocalMySQL(cfg)
db, _ := c.connLocalMySQL(cfg)
collectorSQL = `SELECT
T.TABLE_SCHEMA,
T.TABLE_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"encoding/json"
"fmt"
al "github.com/flipped-aurora/gin-vue-admin/server/grpcServer/agent_logger"
"github.com/flipped-aurora/gin-vue-admin/server/grpcServer/config"
mo "github.com/flipped-aurora/gin-vue-admin/server/model/saasdb"
"gorm.io/gorm"
"os"
"reflect"
Expand All @@ -25,32 +23,33 @@ var slowQueryEntry []string
var slowQueryInfo InsSlowQueryLog

// SlowQueryLog 用于分析和获取slow log
func (c *CustomizeCollector) SlowQueryLog() {
cfg := config.LoadConfig
localAddr := cfg.MyHostAddrInfo.MyIP

csaas := c.connSaasdb()
var ins mo.Instance
portSlice, _ := ins.QueryPortsByIP(csaas, localAddr, keyForMySQL)

for _, v := range portSlice {
dbInformationSchema := dbConnCfg{
Host: localAddr,
Port: v,
Db: informationSchema,
}
go c.getSlowQueryLog(dbInformationSchema, csaas)
}
func (c *CustomizeCollector) SlowQueryLog(dbInformationSchema dbConnCfg, csaas *gorm.DB, localdb *gorm.DB) {
//cfg := config.LoadConfig
//localAddr := cfg.MyHostAddrInfo.MyIP
//
//csaas, _ := c.connSaasdb()
//var ins mo.Instance
//portSlice, _ := ins.QueryPortsByIP(csaas, localAddr, keyForMySQL)
//
//for _, v := range portSlice {
// dbInformationSchema := dbConnCfg{
// Host: localAddr,
// Port: v,
// Db: informationSchema,
// }
// go c.getSlowQueryLog(dbInformationSchema, csaas)
//}
go c.getSlowQueryLog(dbInformationSchema, csaas, localdb)
}

func (c *CustomizeCollector) getSlowQueryLog(dbInformationSchema dbConnCfg, csaasdb *gorm.DB) {
db := c.connLocalMySQL(dbInformationSchema)
func (c *CustomizeCollector) getSlowQueryLog(dbInformationSchema dbConnCfg, csaasdb *gorm.DB, db *gorm.DB) {
//db, _ := c.connLocalMySQL(dbInformationSchema)
isSwitchOn := c.GetVariables(db, "slow_query_log", 0)
slowQueryLogDir := c.GetVariables(db, "slow_query_log_file", 0)

//slowQueryLogDir = fmt.Sprintf("%v-testfile.log", dbInformationSchema.Port)
// 关闭db连接
c.CloseDB(db)
defer c.CloseDB(csaasdb)
//c.CloseDB(db)
//defer c.CloseDB(csaasdb)
if strings.ToLower(isSwitchOn) == "off" {
al.Info("未打开日志,协程结束")
// 协程结束
Expand Down
Loading

0 comments on commit ceb2e47

Please sign in to comment.