Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#221 [new feature] distributed deadlock check #222

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type PooledConnect interface {
WriteSetStatement() error
GetConnectionID() int64
GetReturnTime() time.Time
Id() uint32
}

type ConnectionPool interface {
Expand Down
14 changes: 14 additions & 0 deletions backend/mocks/PooledConnect.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions backend/pooled_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ type pooledConnectImpl struct {
returnTime time.Time
}

func (pc *pooledConnectImpl) Id() uint32 {
return pc.directConnection.conn.ConnectionID
}

// Recycle return PooledConnect to the pool
func (pc *pooledConnectImpl) Recycle() {
//if has error,the connection can’t be recycled
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf // indirect
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/emirpasic/gods v1.12.0
github.com/emirpasic/gods v1.18.1
github.com/gin-contrib/gzip v0.0.1
github.com/gin-gonic/gin v1.7.2
github.com/go-ini/ini v1.42.0
Expand All @@ -25,6 +25,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.9.0 // indirect
github.com/heimdalr/dag v1.1.1 // indirect
github.com/jonboulle/clockwork v0.1.0 // indirect
github.com/json-iterator/go v1.1.11 // indirect
github.com/mattn/go-isatty v0.0.13 // indirect
Expand Down
7 changes: 6 additions & 1 deletion models/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ type Proxy struct {
AdminPassword string `ini:"admin_password"`
SlowSQLTime int64 `ini:"slow_sql_time"`
SessionTimeout int `ini:"session_timeout"`
DeadlockCheckInterval int `ini:"deadlock_check_interval"`

// 监控配置
StatsEnabled string `ini:"stats_enabled"` // set true to enable stats
StatsInterval int `ini:"stats_interval"` // set stats interval of connect pool

EncryptKey string `ini:"encrypt_key"`

ServerVersion string `ini:"server_version"`
Expand Down Expand Up @@ -87,6 +87,11 @@ func ParseProxyConfigFromFile(cfgFile string) (*Proxy, error) {
} else if proxyConfig.Cluster != "" {
proxyConfig.CoordinatorRoot = "/" + proxyConfig.Cluster
}

if proxyConfig.DeadlockCheckInterval == 0 {
proxyConfig.DeadlockCheckInterval = 5
}

return proxyConfig, err
}

Expand Down
120 changes: 120 additions & 0 deletions proxy/server/deadlock_checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* @Author: funnyAnt
* @Date: 2022-05-16 11:52:01
* @LastEditTime: 2022-05-17 10:56:29
* @LastEditors:
* @Description: 分布式死锁检查
*/
package server

import (
"strconv"

"github.com/XiaoMi/Gaea/backend"
"github.com/XiaoMi/Gaea/log"
"github.com/heimdalr/dag"
)

func doDeadlockCheck(s *Server, slices map[string]*backend.Slice) error {
//init dag
d := dag.NewDAG()

addDiVertex := func(value string) error {
if v, _ := d.GetVertex(value); v == nil {
if err := d.AddVertexByID(value, value); err != nil {
return err
}
}

return nil
}

addDiEdge := func(from string, to string) (loopId string, err error) {
if err = addDiVertex(from); err != nil {
return "", err
}

if err = addDiVertex(to); err != nil {
return "", err
}

log.Debug("add dig[from:%v,to:%v]", from, to)
if err = d.AddEdge(from, to); err != nil {
if _, ok := err.(dag.EdgeLoopError); ok {
loopId = from
return loopId, err
}
}

return "", nil

}

backend2FrontendMap := fetchBackend2FrontendConn(s)
for k, v := range slices {
toKill, err := fetchLockWaits(v, func(waiting, blocking string) (string, error) {
keyFrom := k + backend2FrontendDelimiter + waiting
if v, ok := backend2FrontendMap[keyFrom]; ok {
frontendIdFrom := v.c.ConnectionID
keyTo := k + backend2FrontendDelimiter + blocking
if v, ok := backend2FrontendMap[keyTo]; ok {
frontendIdTo := v.c.ConnectionID
loopId, err := addDiEdge(strconv.Itoa(int(frontendIdFrom)), strconv.Itoa(int(frontendIdTo)))
if loopId != "" {
return keyFrom, err
}
}
}
return "", nil
})

log.Debug("add dig info[%v]", d.String())
if toKill != "" {
//kill the frontend
killFrontendConn(backend2FrontendMap[toKill])
continue
}
if err != nil {
return err
}

}

return nil
}

func fetchLockWaits(slice *backend.Slice, callback func(waiting, blocking string) (string, error)) (string, error) {
const MAX_ROWS = 10000
sql := "SELECT distinct trx_a.trx_mysql_thread_id AS waiting, trx_b.trx_mysql_thread_id AS blocking " +
"FROM information_schema.innodb_lock_waits, information_schema.INNODB_TRX AS trx_a, information_schema.INNODB_TRX AS trx_b " +
"WHERE trx_a.trx_id = requesting_trx_id AND trx_b.trx_id = blocking_trx_id;"

//doQuery
conn, err := slice.GetMasterConn()
if err != nil {
return "", err
}
result, err := conn.Execute(sql, MAX_ROWS)
if err != nil {
return "", err
}
for _, rowValue := range result.Values {
waiting := strconv.Itoa(int(rowValue[0].(uint64)))
blocking := strconv.Itoa(int(rowValue[1].(uint64)))
if toKill, err := callback(waiting, blocking); err != nil {
return toKill, err
}
}
return "", nil
}

func fetchBackend2FrontendConn(s *Server) map[string]*Session {
return s.getBackend2FrontendMap()
}

func killFrontendConn(session *Session) {
if session != nil {
session.forceClose("deadlock check")
}
}

30 changes: 30 additions & 0 deletions proxy/server/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ const (
masterComment = "/*master*/"
// general query log variable
gaeaGeneralLogVariable = "gaea_general_log"

//Delimiter between backendConnId and frontendConnId
backend2FrontendDelimiter = "@@"
)

// SessionExecutor is bound to a session, so requests are serializable
Expand Down Expand Up @@ -827,3 +830,30 @@ func (se *SessionExecutor) ExecuteSQLs(reqCtx *util.RequestContext, sqls map[str
}
return rs, nil
}

func (se *SessionExecutor) forceClose() (err error) {
se.txLock.Lock()
defer se.txLock.Unlock()

se.status &= ^mysql.ServerStatusInTrans

for _, pc := range se.txConns {
pc.Close()
pc.Recycle()
}

se.txConns = make(map[string]backend.PooledConnect)
return
}


func (se *SessionExecutor) getAllTrxConn() map[string]backend.PooledConnect {
se.txLock.Lock()
defer se.txLock.Unlock()
txConns := make(map[string]backend.PooledConnect, len(se.txConns))
for k, v := range se.txConns {
txConns[k+backend2FrontendDelimiter+strconv.Itoa(int(v.Id()))] = v
}

return txConns
}
9 changes: 8 additions & 1 deletion proxy/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ type Manager struct {
namespaces [2]*NamespaceManager
users [2]*UserManager
statistics *StatisticManager
ProxyCfg *models.Proxy
}

// NewManager return empty Manager
Expand All @@ -144,7 +145,8 @@ func NewManager() *Manager {
// CreateManager create manager
func CreateManager(cfg *models.Proxy, namespaceConfigs map[string]*models.Namespace) (*Manager, error) {
m := NewManager()

m.ProxyCfg = cfg

// init statistics
statisticManager, err := CreateStatisticManager(cfg, m)
if err != nil {
Expand Down Expand Up @@ -262,6 +264,11 @@ func (m *Manager) GetNamespace(name string) *Namespace {
return m.namespaces[current].GetNamespace(name)
}

func (m *Manager) GetNamespaces() map[string]*Namespace {
current, _, _ := m.switchIndex.Get()
return m.namespaces[current].GetNamespaces()
}

// CheckUser check if user in users
func (m *Manager) CheckUser(user string) bool {
current, _, _ := m.switchIndex.Get()
Expand Down
70 changes: 61 additions & 9 deletions proxy/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"net"
"runtime"
"strconv"
"sync"
"time"

"fmt"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/XiaoMi/Gaea/mysql"
"github.com/XiaoMi/Gaea/util"
"github.com/XiaoMi/Gaea/util/sync2"
"github.com/XiaoMi/Gaea/util/timer"
)

var (
Expand All @@ -36,15 +38,18 @@ var (

// Server means proxy that serve client request
type Server struct {
closed sync2.AtomicBool
listener net.Listener
sessionTimeout time.Duration
tw *util.TimeWheel
adminServer *AdminServer
manager *Manager
EncryptKey string
ServerVersion string
AuthPlugin string
closed sync2.AtomicBool
listener net.Listener
sessionTimeout time.Duration
tw *util.TimeWheel
adminServer *AdminServer
manager *Manager
EncryptKey string
ServerVersion string
AuthPlugin string
frontendConn map[uint32]*Session
frontendConnLock sync.RWMutex
deadlockCheckTimer *timer.Timer
}

// NewServer create new server
Expand Down Expand Up @@ -99,6 +104,7 @@ func NewServer(cfg *models.Proxy, manager *Manager) (*Server, error) {
return nil, err
}
s.adminServer = adminServer
s.startDeadlockCheckTimer()

log.Notice("server start succ, netProtoType: %s, addr: %s", cfg.ProtoType, cfg.ProxyAddr)
return s, nil
Expand Down Expand Up @@ -148,6 +154,9 @@ func (s *Server) onConn(c net.Conn) {
cc.executor.db,
cc.executor.namespace,
cc.c.capability)

s.addFrontend(cc.c.ConnectionID, cc)

cc.Run()
}

Expand Down Expand Up @@ -189,6 +198,49 @@ func (s *Server) Close() error {
return nil
}

func (s *Server) startDeadlockCheckTimer() {
isRunning := sync2.NewAtomicBool(false)
deadlockCheck := func() {
//保障只有一个job在运行
if isRunning.CompareAndSwap(false, true) {
return
}
defer isRunning.CompareAndSwap(true, false)
for _, ns := range s.manager.GetNamespaces() {
if err := doDeadlockCheck(s, ns.slices); err != nil {
log.Warn("do deck lock check error[%v]", err)
}
}
}

interval := s.manager.ProxyCfg.DeadlockCheckInterval
s.deadlockCheckTimer = timer.NewTimer(time.Duration(interval) * time.Second)
s.deadlockCheckTimer.Start(deadlockCheck)
}

func (s *Server) addFrontend(connId uint32, session *Session) {
s.frontendConnLock.Lock()
defer s.frontendConnLock.Unlock()
s.frontendConn[connId] = session
}
func (s *Server) removeFrontend(connId uint32) {
s.frontendConnLock.Lock()
defer s.frontendConnLock.Unlock()
delete(s.frontendConn, connId)
}
func (s *Server) getBackend2FrontendMap() map[string]*Session {
s.frontendConnLock.RLock()
defer s.frontendConnLock.RUnlock()

backend2FrontendMap := make(map[string]*Session, 128)
for _, s := range s.frontendConn {
for k, _ := range s.executor.getAllTrxConn() {
backend2FrontendMap[k] = s
}
}
return backend2FrontendMap
}

// ReloadNamespacePrepare config change prepare phase
func (s *Server) ReloadNamespacePrepare(name string, client models.Client) error {
// get namespace conf from etcd
Expand Down
Loading