Skip to content

Commit

Permalink
Merge pull request #109 from drdstech/feature_check_gtid_before_rebui…
Browse files Browse the repository at this point in the history
…ldme

cli: check gtid before rebuildme
  • Loading branch information
BohuTANG authored Dec 10, 2020
2 parents 40b6ce6 + 97d27b5 commit 9a8c275
Show file tree
Hide file tree
Showing 17 changed files with 173 additions and 45 deletions.
5 changes: 3 additions & 2 deletions conf/xenon-sample.conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"version":"mysql80",
"passwd":"",
"host":"localhost",
"port":${YOUR-MYSQL-PORT},
"port":"${YOUR-MYSQL-PORT}",
"basedir":"${YOUR-MYSQL-BIN-DIR}",
"defaults-file":"${YOUR-MYSQL-CNF-PATH}"
},
Expand All @@ -35,7 +35,8 @@
"ssh-passwd":"${YOUR-SSH-PWD}",
"basedir":"${YOUR-MYSQL-BIN-DIR}",
"backupdir":"${YOUR-BACKUP-DIR}",
"xtrabackup-bindir":"${YOUR-XTRABACKUP-BIN-DIR}"
"xtrabackup-bindir":"${YOUR-XTRABACKUP-BIN-DIR}",
"max-allowed-local-trx-count": "${YOUR-MAX-ALLOWED-LOCAL-TRX-COUNT}"
},

"rpc":
Expand Down
2 changes: 1 addition & 1 deletion docs/config/192.168.0.11_xenon.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"ssh-user":"ubuntu",
"ssh-passwd":"",
"basedir":"/usr",
"backup-dir":"/data/mysql",
"backupdir":"/data/mysql",
"xtrabackup-bindir":"/opt/xtrabackup/bin"
},
Expand Down
2 changes: 1 addition & 1 deletion docs/config/192.168.0.2_xenon.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"ssh-user":"ubuntu",
"ssh-passwd":"",
"basedir":"/usr",
"backup-dir":"/data/mysql",
"backupdir":"/data/mysql",
"xtrabackup-bindir":"/opt/xtrabackup/bin"
},
Expand Down
2 changes: 1 addition & 1 deletion docs/config/192.168.0.3_xenon.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"ssh-user":"ubuntu",
"ssh-passwd":"",
"basedir":"/usr",
"backup-dir":"/data/mysql",
"backupdir":"/data/mysql",
"xtrabackup-bindir":"/opt/xtrabackup/bin"
},
Expand Down
5 changes: 3 additions & 2 deletions docs/config/xenon-simple.conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@
"ssh-user":"ubuntu",
"ssh-passwd":"ubuntu",
"ssh-port":22,
"backup-dir":"/home/ubuntu/data3306",
"backupdir":"/home/ubuntu/data3306",
"xtrabackup-bindir":"/home/ubuntu/xtrabackup_20161216",
"backup-iops-limits":100000,
"backup-use-memory": "2GB",
"backup-parallel": 2
"backup-parallel": 2,
"max-allowed-local-trx-count": 0
},

"rpc":
Expand Down
2 changes: 1 addition & 1 deletion docs/how_to_build_and_run_xenon.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ $ sudo vi /etc/xenon/xenon.json
"ssh-user":"${YOUR-SSH-USER}",
"ssh-passwd":"${YOUR-SSH-PWD}",
"basedir":"${YOUR-MYSQL-BIN-DIR}",
"backup-dir":"${YOUR-BACKUP-DIR}",
"backupdir":"${YOUR-BACKUP-DIR}",
"xtrabackup-bindir":"${YOUR-XTRABACKUP-BIN-DIR}"
},

Expand Down
17 changes: 17 additions & 0 deletions src/cli/callx/callx.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,23 @@ func GetGTIDRPC(node string) (*model.MysqlStatusRPCResponse, error) {
return rsp, err
}

func GetGTIDSubtractRPC(node string, subsetGTID string, setGTID string) (*model.MysqlGTIDSubtractRPCResponse, error) {
cli, cleanup, err := GetClient(node)
if err != nil {
return nil, err
}
defer cleanup()

method := model.RPCMysqlGTIDSubtract
req := model.NewMysqlGTIDSubtractRPCRequest()
req.SubsetGTID = subsetGTID
req.SetGTID = setGTID
rsp := model.NewMysqlGTIDSubtractRPCResponse(model.OK)
err = cli.Call(method, req, rsp)

return rsp, err
}

// GetMysqlUserRPC get mysql user
func GetMysqlUserRPC(node string) (*model.MysqlUserRPCResponse, error) {
cli, cleanup, err := GetClient(node)
Expand Down
74 changes: 70 additions & 4 deletions src/cli/cmd/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"fmt"
"model"
"path"
"strconv"
"strings"
"time"
"xbase/common"
Expand Down Expand Up @@ -150,19 +151,67 @@ func mysqlShutDownCommandFn(cmd *cobra.Command, args []string) {
// rebuild me
var (
fromStr string
force bool
)

func NewMysqlRebuildMeCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "rebuildme [--from=endpoint]",
Short: "rebuild a slave --from=endpoint",
Use: "rebuildme [--from=endpoint][--force]",
Short: "rebuild a slave --from=endpoint --force",
Run: mysqlRebuildMeCommandFn,
}
cmd.Flags().StringVar(&fromStr, "from", "", "--from=endpoint")
cmd.Flags().BoolVar(&force, "force", false, "--force")

return cmd
}

func getLocalTrxCount(self string, bestone string) (int, error) {
count := 0

rsp1, err := callx.GetGTIDRPC(bestone)
if err != nil {
return -1, fmt.Errorf("get.gtid.from.bestone[%v].failed[%v]", bestone, err)
} else if rsp1.GTID.Executed_GTID_Set == "" {
return -1, fmt.Errorf("the.Executed_GTID_Set.of.bestone[%v].is.null", bestone)
}
fromGTID := rsp1.GTID

rsp1, err = callx.GetGTIDRPC(self)
if err != nil {
return -1, fmt.Errorf("get.gtid.from.myself[%v].failed[%v]", self, err)
} else if rsp1.GTID.Executed_GTID_Set == "" {
return -1, fmt.Errorf("the.Executed_GTID_Set.of.myself[%v].is.null", self)
}
localGTID := rsp1.GTID

rsp2, err := callx.GetGTIDSubtractRPC(self, localGTID.Executed_GTID_Set, fromGTID.Executed_GTID_Set)
if err != nil {
return -1, fmt.Errorf("get.gtid.subtract.from.self[%v].failed[%v]", self, err)
}
subtract := rsp2.Subtract

// compute the number of local transactions
gtidSet := strings.Split(subtract, "\n")
for _, gtid := range gtidSet {
gtid = strings.TrimSpace(gtid)
gtid = strings.TrimSuffix(gtid, ",")
diffs := strings.Split(gtid, ":")[1:]
for _, diff := range diffs {
values := strings.Split(diff, "-")
if len(values) == 1 {
count += 1
} else {
s, _ := strconv.Atoi(values[0])
e, _ := strconv.Atoi(values[1])
count += e - s + 1
}
}
}

return count, nil
}

func mysqlRebuildMeCommandFn(cmd *cobra.Command, args []string) {
if len(args) != 0 {
ErrorOK(fmt.Errorf("too.many.args"))
Expand All @@ -183,18 +232,19 @@ func mysqlRebuildMeCommandFn(cmd *cobra.Command, args []string) {
datadir := conf.Backup.BackupDir
binlogDir := ""
binlogPrefix := ""
maxAllowedLocalTrxCount := conf.Backup.MaxAllowedLocalTrxCount

// 1. first to check I am leader or not
{
log.Warning("S1-->check.raft.leader")
leader, err := callx.GetClusterLeader(self)
ErrorOK(err)
if leader == self {
log.Panic("[%v].I.am.leader.you.cant.rebuildme.sir", self)
log.Panic("I[%v].am.leader.you.cant.rebuildme.sir", self)
}
}

// 2. find the best to backup
// 2. find the best to backup and check gtid
{
if fromStr != "" {
bestone = fromStr
Expand All @@ -203,6 +253,22 @@ func mysqlRebuildMeCommandFn(cmd *cobra.Command, args []string) {
ErrorOK(err)
}
log.Warning("S2-->prepare.rebuild.from[%v]....", bestone)

// check if there are more than 2 local transactions on the local node than bestone
if force {
log.Warning("S2-->the.[--force].is.specified.skip.check.gtid")
} else {
working, err := callx.MysqlIsWorkingRPC(self)
ErrorOK(err)
if working == false {
log.Panic("local.mysql.is.not.working.you.cant.rebuildme.sir")
}
localTrxCount, err := getLocalTrxCount(self, bestone)
ErrorOK(err)
if localTrxCount > maxAllowedLocalTrxCount {
log.Panic("I[%v].have.[%v].local.transactions.more.than.maxAllowedLocalTrxCount[%v].compared.to.from[%v].you.cant.rebuildme.sir", self, localTrxCount, maxAllowedLocalTrxCount, bestone)
}
}
}

// 3. check bestone is not in BACKUPING
Expand Down
48 changes: 25 additions & 23 deletions src/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,16 +200,17 @@ func (c *ReplicationConfig) UnmarshalJSON(b []byte) error {

type BackupConfig struct {
// MUST: set in init
SSHHost string `json:"ssh-host"`
SSHUser string `json:"ssh-user"`
SSHPasswd string `json:"ssh-passwd"`
SSHPort int `json:"ssh-port"`
BackupDir string `json:"backupdir"`
XtrabackupBinDir string `json:"xtrabackup-bindir"`
BackupIOPSLimits int `json:"backup-iops-limits"`
UseMemory string `json:"backup-use-memory"`
Parallel int `json:"backup-parallel"`
MysqldMonitorInterval int `json:"mysqld-monitor-interval"`
SSHHost string `json:"ssh-host"`
SSHUser string `json:"ssh-user"`
SSHPasswd string `json:"ssh-passwd"`
SSHPort int `json:"ssh-port"`
BackupDir string `json:"backupdir"`
XtrabackupBinDir string `json:"xtrabackup-bindir"`
BackupIOPSLimits int `json:"backup-iops-limits"`
UseMemory string `json:"backup-use-memory"`
Parallel int `json:"backup-parallel"`
MysqldMonitorInterval int `json:"mysqld-monitor-interval"`
MaxAllowedLocalTrxCount int `json:"max-allowed-local-trx-count"`

// mysql admin
Admin string
Expand All @@ -232,19 +233,20 @@ type BackupConfig struct {

func DefaultBackupConfig() *BackupConfig {
return &BackupConfig{
SSHPort: 22,
BackupDir: "/u01/backup",
XtrabackupBinDir: ".",
BackupIOPSLimits: 100000,
UseMemory: "2GB",
Parallel: 2,
MysqldMonitorInterval: 1000 * 1,
Admin: "root",
Passwd: "",
Host: "localhost",
Port: 3306,
Basedir: "/u01/mysql_20160606/",
DefaultsFile: "/etc/my3306.cnf",
SSHPort: 22,
BackupDir: "/u01/backup",
XtrabackupBinDir: ".",
BackupIOPSLimits: 100000,
UseMemory: "2GB",
Parallel: 2,
MysqldMonitorInterval: 1000 * 1,
MaxAllowedLocalTrxCount: 0,
Admin: "root",
Passwd: "",
Host: "localhost",
Port: 3306,
Basedir: "/u01/mysql_20160606/",
DefaultsFile: "/etc/my3306.cnf",
}
}

Expand Down
29 changes: 29 additions & 0 deletions src/model/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package model

const (
RPCMysqlStatus = "MysqlRPC.Status"
RPCMysqlGTIDSubtract = "MysqlRPC.GTIDSubtract"
RPCMysqlSetGlobalSysVar = "MysqlRPC.SetGlobalSysVar"
RPCMysqlCreateUserWithPrivileges = "UserRPC.CreateUserWithPrivileges"
RPCMysqlCreateNormalUser = "UserRPC.CreateNormalUser"
Expand Down Expand Up @@ -153,6 +154,34 @@ func NewMysqlStatusRPCResponse(code string) *MysqlStatusRPCResponse {
return &MysqlStatusRPCResponse{RetCode: code}
}

type MysqlGTIDSubtractRPCRequest struct {
// The IP of this request
From string

// The first parameter of the function GTID_SUBTRACT
SubsetGTID string

// The second parameter of the function GTID_SUBTRACT
SetGTID string
}

type MysqlGTIDSubtractRPCResponse struct {
// The GTID Subtract of this request
Subtract string

// Return code to rpc client:
// OK or other errors
RetCode string
}

func NewMysqlGTIDSubtractRPCRequest() *MysqlGTIDSubtractRPCRequest {
return &MysqlGTIDSubtractRPCRequest{}
}

func NewMysqlGTIDSubtractRPCResponse(code string) *MysqlGTIDSubtractRPCResponse {
return &MysqlGTIDSubtractRPCResponse{RetCode: code}
}

// user
type MysqlUserRPCRequest struct {
// The IP of this request
Expand Down
6 changes: 3 additions & 3 deletions src/mysql/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (m *Mysql) CheckGTID(followerGTID *model.GTID, candidateGTID *model.GTID) b

// gtid_sub is not none, means the follower gtid is bigger than candidate gtid
// if viewdiff<=0 and gtid_sub is not null it must be localcommitted
gtid_sub, err := m.GetGtidSubtract(fGTID, cGTID)
gtid_sub, err := m.GetGTIDSubtract(fGTID, cGTID)
if err != nil {
log.Error("mysql.CheckGTID.error[%v]", err)
return false
Expand All @@ -176,12 +176,12 @@ func (m *Mysql) CheckGTID(followerGTID *model.GTID, candidateGTID *model.GTID) b
return false
}

func (m *Mysql) GetGtidSubtract(subsetGTID string, setGTID string) (string, error) {
func (m *Mysql) GetGTIDSubtract(subsetGTID string, setGTID string) (string, error) {
db, err := m.getDB()
if err != nil {
return "", err
}
return m.mysqlHandler.GetGtidSubtract(db, subsetGTID, setGTID)
return m.mysqlHandler.GetGTIDSubtract(db, subsetGTID, setGTID)
}

// StartSlaveIOThread used to start the slave io thread.
Expand Down
4 changes: 2 additions & 2 deletions src/mysql/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ func DefaultCheckGTID(followerGTID *model.GTID, leaderGTID *model.GTID) bool {
return false
}

// GetGtidSubtract mock.
func (mogtid *MockGTID) GetGtidSubtract(db *sql.DB, slaveGTID string, masterGTID string) (string, error) {
// GetGTIDSubtract mock.
func (mogtid *MockGTID) GetGTIDSubtract(db *sql.DB, slaveGTID string, masterGTID string) (string, error) {
return mogtid.GetGtidSubtractFn(db, slaveGTID, masterGTID)
}

Expand Down
2 changes: 1 addition & 1 deletion src/mysql/mysql_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type MysqlHandler interface {
GetUUID(db *sql.DB) (string, error)

// get gtid subtract with slavegtid and master gtid
GetGtidSubtract(*sql.DB, string, string) (string, error)
GetGTIDSubtract(*sql.DB, string, string) (string, error)

// set global variables
SetGlobalSysVar(db *sql.DB, varsql string) error
Expand Down
4 changes: 2 additions & 2 deletions src/mysql/mysqlbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ func (my *MysqlBase) WaitUntilAfterGTID(db *sql.DB, targetGTID string) error {
return Execute(db, query)
}

// GetGtidSubtract used to do "SELECT GTID_SUBTRACT('subsetGTID','setGTID') as gtid_sub" command
func (my *MysqlBase) GetGtidSubtract(db *sql.DB, subsetGTID string, setGTID string) (string, error) {
// GetGTIDSubtract used to do "SELECT GTID_SUBTRACT('subsetGTID','setGTID') as gtid_sub" command
func (my *MysqlBase) GetGTIDSubtract(db *sql.DB, subsetGTID string, setGTID string) (string, error) {
query := fmt.Sprintf("SELECT GTID_SUBTRACT('%s','%s') as gtid_sub", subsetGTID, setGTID)
rows, err := QueryWithTimeout(db, my.queryTimeout, query)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions src/mysql/rpc_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,15 @@ func (m *MysqlRPC) Status(req *model.MysqlStatusRPCRequest, rsp *model.MysqlStat
rsp.Stats = m.mysql.getStats()
return nil
}

// GTIDSubstract returns the mysql GTID subtract info.
func (m *MysqlRPC) GTIDSubtract(req *model.MysqlGTIDSubtractRPCRequest, rsp *model.MysqlGTIDSubtractRPCResponse) error {
var err error

rsp.RetCode = model.OK
if rsp.Subtract, err = m.mysql.GetGTIDSubtract(req.SubsetGTID, req.SetGTID); err != nil {
rsp.RetCode = err.Error()
return nil
}
return nil
}
Loading

0 comments on commit 9a8c275

Please sign in to comment.