Skip to content

Commit

Permalink
Merge pull request #111 from drdstech/bug_rebuildme
Browse files Browse the repository at this point in the history
mysql: fix rebuild node with small amounts of data failed
  • Loading branch information
BohuTANG authored Dec 25, 2020
2 parents f7f3594 + 61f83eb commit a0f57e0
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 25 deletions.
19 changes: 19 additions & 0 deletions src/cli/callx/callx.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,25 @@ func KillMysqldRPC(node string) error {
return nil
}

func SetMysqlStateRPC(node string, state model.MysqlState) error {
cli, cleanup, err := GetClient(node)
if err != nil {
return err
}
defer cleanup()

method := model.RPCMysqlSetState
req := model.NewMysqlSetStateRPCRequest()
req.State = state
rsp := model.NewMysqlSetStateRPCResponse(model.OK)
err = cli.Call(method, req, rsp)
if err != nil {
return err
}

return nil
}

func WaitMysqldShutdownRPC(node string) error {
cli, cleanup, err := GetClient(node)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions src/cli/cmd/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ func mysqlRebuildMeCommandFn(cmd *cobra.Command, args []string) {
// wait
err = callx.WaitMysqldShutdownRPC(self)
ErrorOK(err)

// set the mysql state to dead, avoid failure to rebuild node with small amounts of data
err = callx.SetMysqlStateRPC(self, model.MysqlDead)
ErrorOK(err)
}

// 7. check bestone is not in BACKUPING again
Expand Down
35 changes: 35 additions & 0 deletions src/model/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package model
const (
RPCMysqlStatus = "MysqlRPC.Status"
RPCMysqlGTIDSubtract = "MysqlRPC.GTIDSubtract"
RPCMysqlSetState = "MysqlRPC.SetState"
RPCMysqlSetGlobalSysVar = "MysqlRPC.SetGlobalSysVar"
RPCMysqlCreateUserWithPrivileges = "UserRPC.CreateUserWithPrivileges"
RPCMysqlCreateNormalUser = "UserRPC.CreateNormalUser"
Expand All @@ -25,6 +26,18 @@ const (
RPCMysqlIsWorking = "MysqlRPC.IsWorking"
)

type (
// State enum.
MysqlState string
)

const (
// MysqlAlive enum.
MysqlAlive MysqlState = "ALIVE"
// MysqlDead enum.
MysqlDead MysqlState = "DEAD"
)

// GTID info
type GTID struct {
// Mysql master log file which the slave is reading
Expand Down Expand Up @@ -182,6 +195,28 @@ func NewMysqlGTIDSubtractRPCResponse(code string) *MysqlGTIDSubtractRPCResponse
return &MysqlGTIDSubtractRPCResponse{RetCode: code}
}

type MysqlSetStateRPCRequest struct {
// The IP of this request
From string

// The new state
State MysqlState
}

type MysqlSetStateRPCResponse struct {
// Return code to rpc client:
// OK or other errors
RetCode string
}

func NewMysqlSetStateRPCRequest() *MysqlSetStateRPCRequest {
return &MysqlSetStateRPCRequest{}
}

func NewMysqlSetStateRPCResponse(code string) *MysqlSetStateRPCResponse {
return &MysqlSetStateRPCResponse{RetCode: code}
}

// user
type MysqlUserRPCRequest struct {
// The IP of this request
Expand Down
6 changes: 3 additions & 3 deletions src/mysql/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (m *Mysql) PingStop() {
// the slaves Slave_IO_Running is false, because it's in connecting state
func (m *Mysql) Promotable() bool {
log := m.log
promotable := (m.GetState() == MysqlAlive)
promotable := (m.GetState() == model.MysqlAlive)
if promotable {
gtid, err := m.GetGTID()
if err != nil {
Expand Down Expand Up @@ -257,7 +257,7 @@ func (m *Mysql) WaitUntilAfterGTID(targetGTID string) error {
}

// GetState returns the mysql state.
func (m *Mysql) GetState() State {
func (m *Mysql) GetState() model.MysqlState {
return m.getState()
}

Expand Down Expand Up @@ -315,7 +315,7 @@ func (m *Mysql) WaitMysqlWorks(timeout int) error {
go func() {
for {
m.Ping()
if m.GetState() == MysqlAlive {
if m.GetState() == model.MysqlAlive {
errChannel <- nil
break
}
Expand Down
5 changes: 3 additions & 2 deletions src/mysql/attr.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ package mysql

import (
"fmt"
"model"
)

func (m *Mysql) setState(state State) {
func (m *Mysql) setState(state model.MysqlState) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.state = state
}

func (m *Mysql) getState() State {
func (m *Mysql) getState() model.MysqlState {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.state
Expand Down
18 changes: 5 additions & 13 deletions src/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,11 @@ import (
)

type (
// State enum.
State string

// Option enum.
Option string
)

const (
// MysqlAlive enum.
MysqlAlive State = "ALIVE"
// MysqlDead enum.
MysqlDead State = "DEAD"

// MysqlReadonly enum.
MysqlReadonly Option = "READONLY"
// MysqlReadwrite enum.
Expand All @@ -49,7 +41,7 @@ type Mysql struct {
db *sql.DB
conf *config.MysqlConfig
log *xlog.Log
state State
state model.MysqlState
option Option
mutex sync.RWMutex
dbmutex sync.RWMutex
Expand All @@ -66,7 +58,7 @@ func NewMysql(conf *config.MysqlConfig, queryTimeout int, log *xlog.Log) *Mysql
db: nil,
log: log,
conf: conf,
state: MysqlDead,
state: model.MysqlDead,
mysqlHandler: getHandler(conf.Version),
pingTicker: common.NormalTicker(conf.PingTimeout),
}
Expand All @@ -92,7 +84,7 @@ func (m *Mysql) Ping() {
log.Error("mysql[%v].ping.getdb.error[%v].downs:%v,downslimits:%v", m.getConnStr(), err, m.downs, downsLimits)
if m.downs > downsLimits {
log.Error("mysql.dead.downs:%v,downslimits:%v", m.downs, downsLimits)
m.setState(MysqlDead)
m.setState(model.MysqlDead)
}
m.IncMysqlDowns()
m.downs++
Expand All @@ -103,7 +95,7 @@ func (m *Mysql) Ping() {
log.Error("mysql[%v].ping.error[%v].downs:%v,downslimits:%v", m.getConnStr(), err, m.downs, downsLimits)
if m.downs > downsLimits {
log.Error("mysql.dead.downs:%v,downslimits:%v", m.downs, downsLimits)
m.setState(MysqlDead)
m.setState(model.MysqlDead)
}
m.IncMysqlDowns()
m.downs++
Expand All @@ -121,7 +113,7 @@ func (m *Mysql) Ping() {

// reset downs.
m.downs = 0
m.setState(MysqlAlive)
m.setState(model.MysqlAlive)
m.pingEntry = *pe
}

Expand Down
7 changes: 4 additions & 3 deletions src/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package mysql

import (
"config"
"model"
"testing"
"time"
"xbase/common"
Expand All @@ -28,7 +29,7 @@ func TestMysql(t *testing.T) {

time.Sleep(time.Duration(config.DefaultMysqlConfig().PingTimeout*2) * time.Millisecond)
got := mysql.GetState()
want := MysqlAlive
want := model.MysqlAlive
assert.Equal(t, want, got)
mysql.PingStop()
}
Expand All @@ -42,7 +43,7 @@ func TestStateDead(t *testing.T) {

time.Sleep(time.Duration(config.DefaultMysqlConfig().PingTimeout*2) * time.Millisecond)
got := mysql.GetState()
want := MysqlDead
want := model.MysqlDead
assert.Equal(t, want, got)
mysql.PingStop()
}
Expand All @@ -56,7 +57,7 @@ func TestCreateReplUser(t *testing.T) {

time.Sleep(time.Duration(config.DefaultMysqlConfig().PingTimeout*2) * time.Millisecond)
got := mysql.GetState()
want := MysqlAlive
want := model.MysqlAlive
assert.Equal(t, want, got)
mysql.PingStop()
}
Expand Down
9 changes: 8 additions & 1 deletion src/mysql/rpc_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (m *MysqlRPC) StartSlave(req *model.MysqlRPCRequest, rsp *model.MysqlRPCRes

// IsWorking used to check the mysql works or not.
func (m *MysqlRPC) IsWorking(req *model.MysqlRPCRequest, rsp *model.MysqlRPCResponse) error {
if m.mysql.GetState() == MysqlAlive {
if m.mysql.GetState() == model.MysqlAlive {
rsp.RetCode = model.OK
} else {
rsp.RetCode = model.ErrorMySQLDown
Expand Down Expand Up @@ -118,3 +118,10 @@ func (m *MysqlRPC) GTIDSubtract(req *model.MysqlGTIDSubtractRPCRequest, rsp *mod
}
return nil
}

// SetState used to set the mysql state.
func (m *MysqlRPC) SetState(req *model.MysqlSetStateRPCRequest, rsp *model.MysqlSetStateRPCResponse) error {
rsp.RetCode = model.OK
m.mysql.setState(req.State)
return nil
}
41 changes: 40 additions & 1 deletion src/mysql/rpc_mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,53 @@ func TestMysqlRPCStatus(t *testing.T) {
}
want := model.NewMysqlStatusRPCResponse(model.OK)
want.GTID = GTID
want.Status = string(MysqlDead)
want.Status = string(model.MysqlDead)
want.Stats = &model.MysqlStats{}

got := rsp
assert.Equal(t, want, got)
}
}

func TestMysqlRPCSetState(t *testing.T) {
log := xlog.NewStdLog(xlog.Level(xlog.PANIC))
port := common.RandomPort(8000, 9000)
id, _, cleanup := MockMysql(log, port, NewMockGTIDB())
defer cleanup()

// MysqlDead
{
method := model.RPCMysqlSetState
req := model.NewMysqlSetStateRPCRequest()
req.State = model.MysqlDead
rsp := model.NewMysqlSetStateRPCResponse(model.OK)
c, cleanup := MockGetClient(t, id)
defer cleanup()
err := c.Call(method, req, rsp)
assert.Nil(t, err)

want := model.NewMysqlSetStateRPCResponse(model.OK)
got := rsp
assert.Equal(t, want, got)
}

// MysqlAlive
{
method := model.RPCMysqlSetState
req := model.NewMysqlSetStateRPCRequest()
req.State = model.MysqlAlive
rsp := model.NewMysqlSetStateRPCResponse(model.OK)
c, cleanup := MockGetClient(t, id)
defer cleanup()
err := c.Call(method, req, rsp)
assert.Nil(t, err)

want := model.NewMysqlSetStateRPCResponse(model.OK)
got := rsp
assert.Equal(t, want, got)
}
}

func TestMysqlRPCSetSysVar(t *testing.T) {
log := xlog.NewStdLog(xlog.Level(xlog.PANIC))
port := common.RandomPort(8000, 9000)
Expand Down
3 changes: 1 addition & 2 deletions src/raft/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package raft

import (
"model"
"mysql"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -287,7 +286,7 @@ func (r *Leader) processRequestVoteRequest(req *model.RaftRPCRequest) *model.Raf
// broadcast hearbeat requests to other peers of the cluster
func (r *Leader) sendHeartbeat(mysqlDown *bool, c chan *model.RaftRPCResponse) {
// check MySQL down
if r.mysql.GetState() == mysql.MysqlDead {
if r.mysql.GetState() == model.MysqlDead {
*mysqlDown = true
return
}
Expand Down

0 comments on commit a0f57e0

Please sign in to comment.