diff --git a/src/cli/callx/callx.go b/src/cli/callx/callx.go index 78085f6..90c5a45 100644 --- a/src/cli/callx/callx.go +++ b/src/cli/callx/callx.go @@ -315,6 +315,25 @@ func KillMysqldRPC(node string) error { return nil } +func SetMysqlStateRPC(node string, state model.MysqlState) error { + cli, cleanup, err := GetClient(node) + if err != nil { + return err + } + defer cleanup() + + method := model.RPCMysqlSetState + req := model.NewMysqlSetStateRPCRequest() + req.State = state + rsp := model.NewMysqlSetStateRPCResponse(model.OK) + err = cli.Call(method, req, rsp) + if err != nil { + return err + } + + return nil +} + func WaitMysqldShutdownRPC(node string) error { cli, cleanup, err := GetClient(node) if err != nil { diff --git a/src/cli/cmd/mysql.go b/src/cli/cmd/mysql.go index f74ac02..0f24882 100644 --- a/src/cli/cmd/mysql.go +++ b/src/cli/cmd/mysql.go @@ -305,6 +305,10 @@ func mysqlRebuildMeCommandFn(cmd *cobra.Command, args []string) { // wait err = callx.WaitMysqldShutdownRPC(self) ErrorOK(err) + + // set the mysql state to dead, avoid failure to rebuild node with small amounts of data + err = callx.SetMysqlStateRPC(self, model.MysqlDead) + ErrorOK(err) } // 7. check bestone is not in BACKUPING again diff --git a/src/model/mysql.go b/src/model/mysql.go index 953cb6b..af68c7a 100644 --- a/src/model/mysql.go +++ b/src/model/mysql.go @@ -11,6 +11,7 @@ package model const ( RPCMysqlStatus = "MysqlRPC.Status" RPCMysqlGTIDSubtract = "MysqlRPC.GTIDSubtract" + RPCMysqlSetState = "MysqlRPC.SetState" RPCMysqlSetGlobalSysVar = "MysqlRPC.SetGlobalSysVar" RPCMysqlCreateUserWithPrivileges = "UserRPC.CreateUserWithPrivileges" RPCMysqlCreateNormalUser = "UserRPC.CreateNormalUser" @@ -25,6 +26,18 @@ const ( RPCMysqlIsWorking = "MysqlRPC.IsWorking" ) +type ( + // State enum. + MysqlState string +) + +const ( + // MysqlAlive enum. + MysqlAlive MysqlState = "ALIVE" + // MysqlDead enum. + MysqlDead MysqlState = "DEAD" +) + // GTID info type GTID struct { // Mysql master log file which the slave is reading @@ -182,6 +195,28 @@ func NewMysqlGTIDSubtractRPCResponse(code string) *MysqlGTIDSubtractRPCResponse return &MysqlGTIDSubtractRPCResponse{RetCode: code} } +type MysqlSetStateRPCRequest struct { + // The IP of this request + From string + + // The new state + State MysqlState +} + +type MysqlSetStateRPCResponse struct { + // Return code to rpc client: + // OK or other errors + RetCode string +} + +func NewMysqlSetStateRPCRequest() *MysqlSetStateRPCRequest { + return &MysqlSetStateRPCRequest{} +} + +func NewMysqlSetStateRPCResponse(code string) *MysqlSetStateRPCResponse { + return &MysqlSetStateRPCResponse{RetCode: code} +} + // user type MysqlUserRPCRequest struct { // The IP of this request diff --git a/src/mysql/api.go b/src/mysql/api.go index 3aa4add..7c303f1 100644 --- a/src/mysql/api.go +++ b/src/mysql/api.go @@ -44,7 +44,7 @@ func (m *Mysql) PingStop() { // the slaves Slave_IO_Running is false, because it's in connecting state func (m *Mysql) Promotable() bool { log := m.log - promotable := (m.GetState() == MysqlAlive) + promotable := (m.GetState() == model.MysqlAlive) if promotable { gtid, err := m.GetGTID() if err != nil { @@ -257,7 +257,7 @@ func (m *Mysql) WaitUntilAfterGTID(targetGTID string) error { } // GetState returns the mysql state. -func (m *Mysql) GetState() State { +func (m *Mysql) GetState() model.MysqlState { return m.getState() } @@ -315,7 +315,7 @@ func (m *Mysql) WaitMysqlWorks(timeout int) error { go func() { for { m.Ping() - if m.GetState() == MysqlAlive { + if m.GetState() == model.MysqlAlive { errChannel <- nil break } diff --git a/src/mysql/attr.go b/src/mysql/attr.go index 331c79f..5451233 100644 --- a/src/mysql/attr.go +++ b/src/mysql/attr.go @@ -10,15 +10,16 @@ package mysql import ( "fmt" + "model" ) -func (m *Mysql) setState(state State) { +func (m *Mysql) setState(state model.MysqlState) { m.mutex.Lock() defer m.mutex.Unlock() m.state = state } -func (m *Mysql) getState() State { +func (m *Mysql) getState() model.MysqlState { m.mutex.RLock() defer m.mutex.RUnlock() return m.state diff --git a/src/mysql/mysql.go b/src/mysql/mysql.go index b282b22..3ce56f0 100644 --- a/src/mysql/mysql.go +++ b/src/mysql/mysql.go @@ -20,19 +20,11 @@ import ( ) type ( - // State enum. - State string - // Option enum. Option string ) const ( - // MysqlAlive enum. - MysqlAlive State = "ALIVE" - // MysqlDead enum. - MysqlDead State = "DEAD" - // MysqlReadonly enum. MysqlReadonly Option = "READONLY" // MysqlReadwrite enum. @@ -49,7 +41,7 @@ type Mysql struct { db *sql.DB conf *config.MysqlConfig log *xlog.Log - state State + state model.MysqlState option Option mutex sync.RWMutex dbmutex sync.RWMutex @@ -66,7 +58,7 @@ func NewMysql(conf *config.MysqlConfig, queryTimeout int, log *xlog.Log) *Mysql db: nil, log: log, conf: conf, - state: MysqlDead, + state: model.MysqlDead, mysqlHandler: getHandler(conf.Version), pingTicker: common.NormalTicker(conf.PingTimeout), } @@ -92,7 +84,7 @@ func (m *Mysql) Ping() { log.Error("mysql[%v].ping.getdb.error[%v].downs:%v,downslimits:%v", m.getConnStr(), err, m.downs, downsLimits) if m.downs > downsLimits { log.Error("mysql.dead.downs:%v,downslimits:%v", m.downs, downsLimits) - m.setState(MysqlDead) + m.setState(model.MysqlDead) } m.IncMysqlDowns() m.downs++ @@ -103,7 +95,7 @@ func (m *Mysql) Ping() { log.Error("mysql[%v].ping.error[%v].downs:%v,downslimits:%v", m.getConnStr(), err, m.downs, downsLimits) if m.downs > downsLimits { log.Error("mysql.dead.downs:%v,downslimits:%v", m.downs, downsLimits) - m.setState(MysqlDead) + m.setState(model.MysqlDead) } m.IncMysqlDowns() m.downs++ @@ -121,7 +113,7 @@ func (m *Mysql) Ping() { // reset downs. m.downs = 0 - m.setState(MysqlAlive) + m.setState(model.MysqlAlive) m.pingEntry = *pe } diff --git a/src/mysql/mysql_test.go b/src/mysql/mysql_test.go index 8edfa4a..8cd4402 100644 --- a/src/mysql/mysql_test.go +++ b/src/mysql/mysql_test.go @@ -10,6 +10,7 @@ package mysql import ( "config" + "model" "testing" "time" "xbase/common" @@ -28,7 +29,7 @@ func TestMysql(t *testing.T) { time.Sleep(time.Duration(config.DefaultMysqlConfig().PingTimeout*2) * time.Millisecond) got := mysql.GetState() - want := MysqlAlive + want := model.MysqlAlive assert.Equal(t, want, got) mysql.PingStop() } @@ -42,7 +43,7 @@ func TestStateDead(t *testing.T) { time.Sleep(time.Duration(config.DefaultMysqlConfig().PingTimeout*2) * time.Millisecond) got := mysql.GetState() - want := MysqlDead + want := model.MysqlDead assert.Equal(t, want, got) mysql.PingStop() } @@ -56,7 +57,7 @@ func TestCreateReplUser(t *testing.T) { time.Sleep(time.Duration(config.DefaultMysqlConfig().PingTimeout*2) * time.Millisecond) got := mysql.GetState() - want := MysqlAlive + want := model.MysqlAlive assert.Equal(t, want, got) mysql.PingStop() } diff --git a/src/mysql/rpc_mysql.go b/src/mysql/rpc_mysql.go index c2be817..0b3ccfe 100644 --- a/src/mysql/rpc_mysql.go +++ b/src/mysql/rpc_mysql.go @@ -84,7 +84,7 @@ func (m *MysqlRPC) StartSlave(req *model.MysqlRPCRequest, rsp *model.MysqlRPCRes // IsWorking used to check the mysql works or not. func (m *MysqlRPC) IsWorking(req *model.MysqlRPCRequest, rsp *model.MysqlRPCResponse) error { - if m.mysql.GetState() == MysqlAlive { + if m.mysql.GetState() == model.MysqlAlive { rsp.RetCode = model.OK } else { rsp.RetCode = model.ErrorMySQLDown @@ -118,3 +118,10 @@ func (m *MysqlRPC) GTIDSubtract(req *model.MysqlGTIDSubtractRPCRequest, rsp *mod } return nil } + +// SetState used to set the mysql state. +func (m *MysqlRPC) SetState(req *model.MysqlSetStateRPCRequest, rsp *model.MysqlSetStateRPCResponse) error { + rsp.RetCode = model.OK + m.mysql.setState(req.State) + return nil +} diff --git a/src/mysql/rpc_mysql_test.go b/src/mysql/rpc_mysql_test.go index 241d73f..0c6795e 100644 --- a/src/mysql/rpc_mysql_test.go +++ b/src/mysql/rpc_mysql_test.go @@ -42,7 +42,7 @@ func TestMysqlRPCStatus(t *testing.T) { } want := model.NewMysqlStatusRPCResponse(model.OK) want.GTID = GTID - want.Status = string(MysqlDead) + want.Status = string(model.MysqlDead) want.Stats = &model.MysqlStats{} got := rsp @@ -50,6 +50,45 @@ func TestMysqlRPCStatus(t *testing.T) { } } +func TestMysqlRPCSetState(t *testing.T) { + log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) + port := common.RandomPort(8000, 9000) + id, _, cleanup := MockMysql(log, port, NewMockGTIDB()) + defer cleanup() + + // MysqlDead + { + method := model.RPCMysqlSetState + req := model.NewMysqlSetStateRPCRequest() + req.State = model.MysqlDead + rsp := model.NewMysqlSetStateRPCResponse(model.OK) + c, cleanup := MockGetClient(t, id) + defer cleanup() + err := c.Call(method, req, rsp) + assert.Nil(t, err) + + want := model.NewMysqlSetStateRPCResponse(model.OK) + got := rsp + assert.Equal(t, want, got) + } + + // MysqlAlive + { + method := model.RPCMysqlSetState + req := model.NewMysqlSetStateRPCRequest() + req.State = model.MysqlAlive + rsp := model.NewMysqlSetStateRPCResponse(model.OK) + c, cleanup := MockGetClient(t, id) + defer cleanup() + err := c.Call(method, req, rsp) + assert.Nil(t, err) + + want := model.NewMysqlSetStateRPCResponse(model.OK) + got := rsp + assert.Equal(t, want, got) + } +} + func TestMysqlRPCSetSysVar(t *testing.T) { log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) port := common.RandomPort(8000, 9000) diff --git a/src/raft/leader.go b/src/raft/leader.go index 360db16..c64f5b7 100644 --- a/src/raft/leader.go +++ b/src/raft/leader.go @@ -10,7 +10,6 @@ package raft import ( "model" - "mysql" "strings" "sync" "time" @@ -287,7 +286,7 @@ func (r *Leader) processRequestVoteRequest(req *model.RaftRPCRequest) *model.Raf // broadcast hearbeat requests to other peers of the cluster func (r *Leader) sendHeartbeat(mysqlDown *bool, c chan *model.RaftRPCResponse) { // check MySQL down - if r.mysql.GetState() == mysql.MysqlDead { + if r.mysql.GetState() == model.MysqlDead { *mysqlDown = true return }