diff --git a/backend/interface.go b/backend/interface.go index 4d2e8f4a..3302fc93 100644 --- a/backend/interface.go +++ b/backend/interface.go @@ -26,6 +26,7 @@ type PooledConnect interface { WriteSetStatement() error GetConnectionID() int64 GetReturnTime() time.Time + Id() uint32 } type ConnectionPool interface { diff --git a/backend/mocks/PooledConnect.go b/backend/mocks/PooledConnect.go index dd4980f4..6fc71de5 100644 --- a/backend/mocks/PooledConnect.go +++ b/backend/mocks/PooledConnect.go @@ -268,3 +268,17 @@ func (_m *PooledConnect) Ping() error { func (_m *PooledConnect) GetReturnTime() time.Time { return time.Now() } + +// Id provides a mock function with given fields: +func (_m *PooledConnect) Id() uint32 { + ret := _m.Called() + + var r0 uint32 + if rf, ok := ret.Get(0).(func() uint32); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint32) + } + + return r0 +} diff --git a/backend/pooled_connection.go b/backend/pooled_connection.go index 2dcbe676..b1abb0a5 100644 --- a/backend/pooled_connection.go +++ b/backend/pooled_connection.go @@ -27,6 +27,10 @@ type pooledConnectImpl struct { returnTime time.Time } +func (pc *pooledConnectImpl) Id() uint32 { + return pc.directConnection.conn.ConnectionID +} + // Recycle return PooledConnect to the pool func (pc *pooledConnectImpl) Recycle() { //if has error,the connection can’t be recycled diff --git a/go.mod b/go.mod index 5aba52f5..70100544 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect - github.com/emirpasic/gods v1.12.0 + github.com/emirpasic/gods v1.18.1 github.com/gin-contrib/gzip v0.0.1 github.com/gin-gonic/gin v1.7.2 github.com/go-ini/ini v1.42.0 @@ -25,6 +25,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.9.0 // indirect + github.com/heimdalr/dag v1.1.1 // indirect github.com/jonboulle/clockwork v0.1.0 // indirect github.com/json-iterator/go v1.1.11 // indirect github.com/mattn/go-isatty v0.0.13 // indirect diff --git a/models/proxy.go b/models/proxy.go index 94efada1..fe3b8ec7 100644 --- a/models/proxy.go +++ b/models/proxy.go @@ -55,11 +55,11 @@ type Proxy struct { AdminPassword string `ini:"admin_password"` SlowSQLTime int64 `ini:"slow_sql_time"` SessionTimeout int `ini:"session_timeout"` + DeadlockCheckInterval int `ini:"deadlock_check_interval"` // 监控配置 StatsEnabled string `ini:"stats_enabled"` // set true to enable stats StatsInterval int `ini:"stats_interval"` // set stats interval of connect pool - EncryptKey string `ini:"encrypt_key"` ServerVersion string `ini:"server_version"` @@ -87,6 +87,11 @@ func ParseProxyConfigFromFile(cfgFile string) (*Proxy, error) { } else if proxyConfig.Cluster != "" { proxyConfig.CoordinatorRoot = "/" + proxyConfig.Cluster } + + if proxyConfig.DeadlockCheckInterval == 0 { + proxyConfig.DeadlockCheckInterval = 5 + } + return proxyConfig, err } diff --git a/proxy/server/deadlock_checker.go b/proxy/server/deadlock_checker.go new file mode 100644 index 00000000..b034e7cf --- /dev/null +++ b/proxy/server/deadlock_checker.go @@ -0,0 +1,120 @@ +/* + * @Author: funnyAnt + * @Date: 2022-05-16 11:52:01 + * @LastEditTime: 2022-05-17 10:56:29 + * @LastEditors: + * @Description: 分布式死锁检查 + */ +package server + +import ( + "strconv" + + "github.com/XiaoMi/Gaea/backend" + "github.com/XiaoMi/Gaea/log" + "github.com/heimdalr/dag" +) + + func doDeadlockCheck(s *Server, slices map[string]*backend.Slice) error { + //init dag + d := dag.NewDAG() + + addDiVertex := func(value string) error { + if v, _ := d.GetVertex(value); v == nil { + if err := d.AddVertexByID(value, value); err != nil { + return err + } + } + + return nil + } + + addDiEdge := func(from string, to string) (loopId string, err error) { + if err = addDiVertex(from); err != nil { + return "", err + } + + if err = addDiVertex(to); err != nil { + return "", err + } + + log.Debug("add dig[from:%v,to:%v]", from, to) + if err = d.AddEdge(from, to); err != nil { + if _, ok := err.(dag.EdgeLoopError); ok { + loopId = from + return loopId, err + } + } + + return "", nil + + } + + backend2FrontendMap := fetchBackend2FrontendConn(s) + for k, v := range slices { + toKill, err := fetchLockWaits(v, func(waiting, blocking string) (string, error) { + keyFrom := k + backend2FrontendDelimiter + waiting + if v, ok := backend2FrontendMap[keyFrom]; ok { + frontendIdFrom := v.c.ConnectionID + keyTo := k + backend2FrontendDelimiter + blocking + if v, ok := backend2FrontendMap[keyTo]; ok { + frontendIdTo := v.c.ConnectionID + loopId, err := addDiEdge(strconv.Itoa(int(frontendIdFrom)), strconv.Itoa(int(frontendIdTo))) + if loopId != "" { + return keyFrom, err + } + } + } + return "", nil + }) + + log.Debug("add dig info[%v]", d.String()) + if toKill != "" { + //kill the frontend + killFrontendConn(backend2FrontendMap[toKill]) + continue + } + if err != nil { + return err + } + + } + + return nil + } + + func fetchLockWaits(slice *backend.Slice, callback func(waiting, blocking string) (string, error)) (string, error) { + const MAX_ROWS = 10000 + sql := "SELECT distinct trx_a.trx_mysql_thread_id AS waiting, trx_b.trx_mysql_thread_id AS blocking " + + "FROM information_schema.innodb_lock_waits, information_schema.INNODB_TRX AS trx_a, information_schema.INNODB_TRX AS trx_b " + + "WHERE trx_a.trx_id = requesting_trx_id AND trx_b.trx_id = blocking_trx_id;" + + //doQuery + conn, err := slice.GetMasterConn() + if err != nil { + return "", err + } + result, err := conn.Execute(sql, MAX_ROWS) + if err != nil { + return "", err + } + for _, rowValue := range result.Values { + waiting := strconv.Itoa(int(rowValue[0].(uint64))) + blocking := strconv.Itoa(int(rowValue[1].(uint64))) + if toKill, err := callback(waiting, blocking); err != nil { + return toKill, err + } + } + return "", nil + } + + func fetchBackend2FrontendConn(s *Server) map[string]*Session { + return s.getBackend2FrontendMap() + } + + func killFrontendConn(session *Session) { + if session != nil { + session.forceClose("deadlock check") + } + } + \ No newline at end of file diff --git a/proxy/server/executor.go b/proxy/server/executor.go index f4f74be4..18bb3010 100644 --- a/proxy/server/executor.go +++ b/proxy/server/executor.go @@ -40,6 +40,9 @@ const ( masterComment = "/*master*/" // general query log variable gaeaGeneralLogVariable = "gaea_general_log" + + //Delimiter between backendConnId and frontendConnId + backend2FrontendDelimiter = "@@" ) // SessionExecutor is bound to a session, so requests are serializable @@ -827,3 +830,30 @@ func (se *SessionExecutor) ExecuteSQLs(reqCtx *util.RequestContext, sqls map[str } return rs, nil } + +func (se *SessionExecutor) forceClose() (err error) { + se.txLock.Lock() + defer se.txLock.Unlock() + + se.status &= ^mysql.ServerStatusInTrans + + for _, pc := range se.txConns { + pc.Close() + pc.Recycle() + } + + se.txConns = make(map[string]backend.PooledConnect) + return +} + + +func (se *SessionExecutor) getAllTrxConn() map[string]backend.PooledConnect { + se.txLock.Lock() + defer se.txLock.Unlock() + txConns := make(map[string]backend.PooledConnect, len(se.txConns)) + for k, v := range se.txConns { + txConns[k+backend2FrontendDelimiter+strconv.Itoa(int(v.Id()))] = v + } + + return txConns +} \ No newline at end of file diff --git a/proxy/server/manager.go b/proxy/server/manager.go index 76f53015..1d09d79f 100644 --- a/proxy/server/manager.go +++ b/proxy/server/manager.go @@ -134,6 +134,7 @@ type Manager struct { namespaces [2]*NamespaceManager users [2]*UserManager statistics *StatisticManager + ProxyCfg *models.Proxy } // NewManager return empty Manager @@ -144,7 +145,8 @@ func NewManager() *Manager { // CreateManager create manager func CreateManager(cfg *models.Proxy, namespaceConfigs map[string]*models.Namespace) (*Manager, error) { m := NewManager() - + m.ProxyCfg = cfg + // init statistics statisticManager, err := CreateStatisticManager(cfg, m) if err != nil { @@ -262,6 +264,11 @@ func (m *Manager) GetNamespace(name string) *Namespace { return m.namespaces[current].GetNamespace(name) } +func (m *Manager) GetNamespaces() map[string]*Namespace { + current, _, _ := m.switchIndex.Get() + return m.namespaces[current].GetNamespaces() +} + // CheckUser check if user in users func (m *Manager) CheckUser(user string) bool { current, _, _ := m.switchIndex.Get() diff --git a/proxy/server/server.go b/proxy/server/server.go index 23ad00d5..d74a5289 100644 --- a/proxy/server/server.go +++ b/proxy/server/server.go @@ -18,6 +18,7 @@ import ( "net" "runtime" "strconv" + "sync" "time" "fmt" @@ -27,6 +28,7 @@ import ( "github.com/XiaoMi/Gaea/mysql" "github.com/XiaoMi/Gaea/util" "github.com/XiaoMi/Gaea/util/sync2" + "github.com/XiaoMi/Gaea/util/timer" ) var ( @@ -36,15 +38,18 @@ var ( // Server means proxy that serve client request type Server struct { - closed sync2.AtomicBool - listener net.Listener - sessionTimeout time.Duration - tw *util.TimeWheel - adminServer *AdminServer - manager *Manager - EncryptKey string - ServerVersion string - AuthPlugin string + closed sync2.AtomicBool + listener net.Listener + sessionTimeout time.Duration + tw *util.TimeWheel + adminServer *AdminServer + manager *Manager + EncryptKey string + ServerVersion string + AuthPlugin string + frontendConn map[uint32]*Session + frontendConnLock sync.RWMutex + deadlockCheckTimer *timer.Timer } // NewServer create new server @@ -99,6 +104,7 @@ func NewServer(cfg *models.Proxy, manager *Manager) (*Server, error) { return nil, err } s.adminServer = adminServer + s.startDeadlockCheckTimer() log.Notice("server start succ, netProtoType: %s, addr: %s", cfg.ProtoType, cfg.ProxyAddr) return s, nil @@ -148,6 +154,9 @@ func (s *Server) onConn(c net.Conn) { cc.executor.db, cc.executor.namespace, cc.c.capability) + + s.addFrontend(cc.c.ConnectionID, cc) + cc.Run() } @@ -189,6 +198,49 @@ func (s *Server) Close() error { return nil } +func (s *Server) startDeadlockCheckTimer() { + isRunning := sync2.NewAtomicBool(false) + deadlockCheck := func() { + //保障只有一个job在运行 + if isRunning.CompareAndSwap(false, true) { + return + } + defer isRunning.CompareAndSwap(true, false) + for _, ns := range s.manager.GetNamespaces() { + if err := doDeadlockCheck(s, ns.slices); err != nil { + log.Warn("do deck lock check error[%v]", err) + } + } + } + + interval := s.manager.ProxyCfg.DeadlockCheckInterval + s.deadlockCheckTimer = timer.NewTimer(time.Duration(interval) * time.Second) + s.deadlockCheckTimer.Start(deadlockCheck) +} + +func (s *Server) addFrontend(connId uint32, session *Session) { + s.frontendConnLock.Lock() + defer s.frontendConnLock.Unlock() + s.frontendConn[connId] = session +} +func (s *Server) removeFrontend(connId uint32) { + s.frontendConnLock.Lock() + defer s.frontendConnLock.Unlock() + delete(s.frontendConn, connId) +} +func (s *Server) getBackend2FrontendMap() map[string]*Session { + s.frontendConnLock.RLock() + defer s.frontendConnLock.RUnlock() + + backend2FrontendMap := make(map[string]*Session, 128) + for _, s := range s.frontendConn { + for k, _ := range s.executor.getAllTrxConn() { + backend2FrontendMap[k] = s + } + } + return backend2FrontendMap +} + // ReloadNamespacePrepare config change prepare phase func (s *Server) ReloadNamespacePrepare(name string, client models.Client) error { // get namespace conf from etcd diff --git a/proxy/server/session.go b/proxy/server/session.go index 95097927..283308e7 100644 --- a/proxy/server/session.go +++ b/proxy/server/session.go @@ -210,8 +210,21 @@ func (cc *Session) Close() { } cc.c.Close() log.Debug("client closed, %d", cc.c.GetConnectionID()) + cc.proxy.removeFrontend(cc.c.ConnectionID) +} + +func (cc *Session) forceClose(reason string) { + if cc.IsClosed() { + return + } + cc.closed.Store(true) + if err := cc.executor.forceClose(); err != nil { + log.Warn("executor force close error: %v", err) + } + cc.c.Close() + log.Warn("client[%d] is forced close because of [%v]", cc.c.GetConnectionID(), reason) - return + cc.proxy.removeFrontend(cc.c.ConnectionID) } // IsClosed check if closed