diff --git a/conf/xenon-sample.conf.json b/conf/xenon-sample.conf.json index 15d6648..84fc4c3 100644 --- a/conf/xenon-sample.conf.json +++ b/conf/xenon-sample.conf.json @@ -17,7 +17,7 @@ "version":"mysql80", "passwd":"", "host":"localhost", - "port":${YOUR-MYSQL-PORT}, + "port":"${YOUR-MYSQL-PORT}", "basedir":"${YOUR-MYSQL-BIN-DIR}", "defaults-file":"${YOUR-MYSQL-CNF-PATH}" }, @@ -35,7 +35,8 @@ "ssh-passwd":"${YOUR-SSH-PWD}", "basedir":"${YOUR-MYSQL-BIN-DIR}", "backupdir":"${YOUR-BACKUP-DIR}", - "xtrabackup-bindir":"${YOUR-XTRABACKUP-BIN-DIR}" + "xtrabackup-bindir":"${YOUR-XTRABACKUP-BIN-DIR}", + "max-allowed-local-trx-count": "${YOUR-MAX-ALLOWED-LOCAL-TRX-COUNT}" }, "rpc": diff --git a/docs/config/192.168.0.11_xenon.md b/docs/config/192.168.0.11_xenon.md index 7b576ac..5b92c80 100644 --- a/docs/config/192.168.0.11_xenon.md +++ b/docs/config/192.168.0.11_xenon.md @@ -34,7 +34,7 @@ "ssh-user":"ubuntu", "ssh-passwd":"", "basedir":"/usr", - "backup-dir":"/data/mysql", + "backupdir":"/data/mysql", "xtrabackup-bindir":"/opt/xtrabackup/bin" }, diff --git a/docs/config/192.168.0.2_xenon.md b/docs/config/192.168.0.2_xenon.md index 90d8efe..127e6d1 100644 --- a/docs/config/192.168.0.2_xenon.md +++ b/docs/config/192.168.0.2_xenon.md @@ -34,7 +34,7 @@ "ssh-user":"ubuntu", "ssh-passwd":"", "basedir":"/usr", - "backup-dir":"/data/mysql", + "backupdir":"/data/mysql", "xtrabackup-bindir":"/opt/xtrabackup/bin" }, diff --git a/docs/config/192.168.0.3_xenon.md b/docs/config/192.168.0.3_xenon.md index cdb3d8e..aa6d5b1 100644 --- a/docs/config/192.168.0.3_xenon.md +++ b/docs/config/192.168.0.3_xenon.md @@ -34,7 +34,7 @@ "ssh-user":"ubuntu", "ssh-passwd":"", "basedir":"/usr", - "backup-dir":"/data/mysql", + "backupdir":"/data/mysql", "xtrabackup-bindir":"/opt/xtrabackup/bin" }, diff --git a/docs/config/xenon-simple.conf.json b/docs/config/xenon-simple.conf.json index ba54d92..da477e7 100644 --- a/docs/config/xenon-simple.conf.json +++ b/docs/config/xenon-simple.conf.json @@ -38,11 +38,12 @@ "ssh-user":"ubuntu", "ssh-passwd":"ubuntu", "ssh-port":22, - "backup-dir":"/home/ubuntu/data3306", + "backupdir":"/home/ubuntu/data3306", "xtrabackup-bindir":"/home/ubuntu/xtrabackup_20161216", "backup-iops-limits":100000, "backup-use-memory": "2GB", - "backup-parallel": 2 + "backup-parallel": 2, + "max-allowed-local-trx-count": 0 }, "rpc": diff --git a/docs/how_to_build_and_run_xenon.md b/docs/how_to_build_and_run_xenon.md index 7bb9d25..f035fe2 100644 --- a/docs/how_to_build_and_run_xenon.md +++ b/docs/how_to_build_and_run_xenon.md @@ -145,7 +145,7 @@ $ sudo vi /etc/xenon/xenon.json "ssh-user":"${YOUR-SSH-USER}", "ssh-passwd":"${YOUR-SSH-PWD}", "basedir":"${YOUR-MYSQL-BIN-DIR}", - "backup-dir":"${YOUR-BACKUP-DIR}", + "backupdir":"${YOUR-BACKUP-DIR}", "xtrabackup-bindir":"${YOUR-XTRABACKUP-BIN-DIR}" }, diff --git a/src/cli/callx/callx.go b/src/cli/callx/callx.go index 94bfde5..78085f6 100644 --- a/src/cli/callx/callx.go +++ b/src/cli/callx/callx.go @@ -753,6 +753,23 @@ func GetGTIDRPC(node string) (*model.MysqlStatusRPCResponse, error) { return rsp, err } +func GetGTIDSubtractRPC(node string, subsetGTID string, setGTID string) (*model.MysqlGTIDSubtractRPCResponse, error) { + cli, cleanup, err := GetClient(node) + if err != nil { + return nil, err + } + defer cleanup() + + method := model.RPCMysqlGTIDSubtract + req := model.NewMysqlGTIDSubtractRPCRequest() + req.SubsetGTID = subsetGTID + req.SetGTID = setGTID + rsp := model.NewMysqlGTIDSubtractRPCResponse(model.OK) + err = cli.Call(method, req, rsp) + + return rsp, err +} + // GetMysqlUserRPC get mysql user func GetMysqlUserRPC(node string) (*model.MysqlUserRPCResponse, error) { cli, cleanup, err := GetClient(node) diff --git a/src/cli/cmd/mysql.go b/src/cli/cmd/mysql.go index 1e972da..f74ac02 100644 --- a/src/cli/cmd/mysql.go +++ b/src/cli/cmd/mysql.go @@ -14,6 +14,7 @@ import ( "fmt" "model" "path" + "strconv" "strings" "time" "xbase/common" @@ -150,19 +151,67 @@ func mysqlShutDownCommandFn(cmd *cobra.Command, args []string) { // rebuild me var ( fromStr string + force bool ) func NewMysqlRebuildMeCommand() *cobra.Command { cmd := &cobra.Command{ - Use: "rebuildme [--from=endpoint]", - Short: "rebuild a slave --from=endpoint", + Use: "rebuildme [--from=endpoint][--force]", + Short: "rebuild a slave --from=endpoint --force", Run: mysqlRebuildMeCommandFn, } cmd.Flags().StringVar(&fromStr, "from", "", "--from=endpoint") + cmd.Flags().BoolVar(&force, "force", false, "--force") return cmd } +func getLocalTrxCount(self string, bestone string) (int, error) { + count := 0 + + rsp1, err := callx.GetGTIDRPC(bestone) + if err != nil { + return -1, fmt.Errorf("get.gtid.from.bestone[%v].failed[%v]", bestone, err) + } else if rsp1.GTID.Executed_GTID_Set == "" { + return -1, fmt.Errorf("the.Executed_GTID_Set.of.bestone[%v].is.null", bestone) + } + fromGTID := rsp1.GTID + + rsp1, err = callx.GetGTIDRPC(self) + if err != nil { + return -1, fmt.Errorf("get.gtid.from.myself[%v].failed[%v]", self, err) + } else if rsp1.GTID.Executed_GTID_Set == "" { + return -1, fmt.Errorf("the.Executed_GTID_Set.of.myself[%v].is.null", self) + } + localGTID := rsp1.GTID + + rsp2, err := callx.GetGTIDSubtractRPC(self, localGTID.Executed_GTID_Set, fromGTID.Executed_GTID_Set) + if err != nil { + return -1, fmt.Errorf("get.gtid.subtract.from.self[%v].failed[%v]", self, err) + } + subtract := rsp2.Subtract + + // compute the number of local transactions + gtidSet := strings.Split(subtract, "\n") + for _, gtid := range gtidSet { + gtid = strings.TrimSpace(gtid) + gtid = strings.TrimSuffix(gtid, ",") + diffs := strings.Split(gtid, ":")[1:] + for _, diff := range diffs { + values := strings.Split(diff, "-") + if len(values) == 1 { + count += 1 + } else { + s, _ := strconv.Atoi(values[0]) + e, _ := strconv.Atoi(values[1]) + count += e - s + 1 + } + } + } + + return count, nil +} + func mysqlRebuildMeCommandFn(cmd *cobra.Command, args []string) { if len(args) != 0 { ErrorOK(fmt.Errorf("too.many.args")) @@ -183,6 +232,7 @@ func mysqlRebuildMeCommandFn(cmd *cobra.Command, args []string) { datadir := conf.Backup.BackupDir binlogDir := "" binlogPrefix := "" + maxAllowedLocalTrxCount := conf.Backup.MaxAllowedLocalTrxCount // 1. first to check I am leader or not { @@ -190,11 +240,11 @@ func mysqlRebuildMeCommandFn(cmd *cobra.Command, args []string) { leader, err := callx.GetClusterLeader(self) ErrorOK(err) if leader == self { - log.Panic("[%v].I.am.leader.you.cant.rebuildme.sir", self) + log.Panic("I[%v].am.leader.you.cant.rebuildme.sir", self) } } - // 2. find the best to backup + // 2. find the best to backup and check gtid { if fromStr != "" { bestone = fromStr @@ -203,6 +253,22 @@ func mysqlRebuildMeCommandFn(cmd *cobra.Command, args []string) { ErrorOK(err) } log.Warning("S2-->prepare.rebuild.from[%v]....", bestone) + + // check if there are more than 2 local transactions on the local node than bestone + if force { + log.Warning("S2-->the.[--force].is.specified.skip.check.gtid") + } else { + working, err := callx.MysqlIsWorkingRPC(self) + ErrorOK(err) + if working == false { + log.Panic("local.mysql.is.not.working.you.cant.rebuildme.sir") + } + localTrxCount, err := getLocalTrxCount(self, bestone) + ErrorOK(err) + if localTrxCount > maxAllowedLocalTrxCount { + log.Panic("I[%v].have.[%v].local.transactions.more.than.maxAllowedLocalTrxCount[%v].compared.to.from[%v].you.cant.rebuildme.sir", self, localTrxCount, maxAllowedLocalTrxCount, bestone) + } + } } // 3. check bestone is not in BACKUPING diff --git a/src/config/config.go b/src/config/config.go index ec0f372..a818cb0 100644 --- a/src/config/config.go +++ b/src/config/config.go @@ -200,16 +200,17 @@ func (c *ReplicationConfig) UnmarshalJSON(b []byte) error { type BackupConfig struct { // MUST: set in init - SSHHost string `json:"ssh-host"` - SSHUser string `json:"ssh-user"` - SSHPasswd string `json:"ssh-passwd"` - SSHPort int `json:"ssh-port"` - BackupDir string `json:"backupdir"` - XtrabackupBinDir string `json:"xtrabackup-bindir"` - BackupIOPSLimits int `json:"backup-iops-limits"` - UseMemory string `json:"backup-use-memory"` - Parallel int `json:"backup-parallel"` - MysqldMonitorInterval int `json:"mysqld-monitor-interval"` + SSHHost string `json:"ssh-host"` + SSHUser string `json:"ssh-user"` + SSHPasswd string `json:"ssh-passwd"` + SSHPort int `json:"ssh-port"` + BackupDir string `json:"backupdir"` + XtrabackupBinDir string `json:"xtrabackup-bindir"` + BackupIOPSLimits int `json:"backup-iops-limits"` + UseMemory string `json:"backup-use-memory"` + Parallel int `json:"backup-parallel"` + MysqldMonitorInterval int `json:"mysqld-monitor-interval"` + MaxAllowedLocalTrxCount int `json:"max-allowed-local-trx-count"` // mysql admin Admin string @@ -232,19 +233,20 @@ type BackupConfig struct { func DefaultBackupConfig() *BackupConfig { return &BackupConfig{ - SSHPort: 22, - BackupDir: "/u01/backup", - XtrabackupBinDir: ".", - BackupIOPSLimits: 100000, - UseMemory: "2GB", - Parallel: 2, - MysqldMonitorInterval: 1000 * 1, - Admin: "root", - Passwd: "", - Host: "localhost", - Port: 3306, - Basedir: "/u01/mysql_20160606/", - DefaultsFile: "/etc/my3306.cnf", + SSHPort: 22, + BackupDir: "/u01/backup", + XtrabackupBinDir: ".", + BackupIOPSLimits: 100000, + UseMemory: "2GB", + Parallel: 2, + MysqldMonitorInterval: 1000 * 1, + MaxAllowedLocalTrxCount: 0, + Admin: "root", + Passwd: "", + Host: "localhost", + Port: 3306, + Basedir: "/u01/mysql_20160606/", + DefaultsFile: "/etc/my3306.cnf", } } diff --git a/src/model/mysql.go b/src/model/mysql.go index 859c476..953cb6b 100644 --- a/src/model/mysql.go +++ b/src/model/mysql.go @@ -10,6 +10,7 @@ package model const ( RPCMysqlStatus = "MysqlRPC.Status" + RPCMysqlGTIDSubtract = "MysqlRPC.GTIDSubtract" RPCMysqlSetGlobalSysVar = "MysqlRPC.SetGlobalSysVar" RPCMysqlCreateUserWithPrivileges = "UserRPC.CreateUserWithPrivileges" RPCMysqlCreateNormalUser = "UserRPC.CreateNormalUser" @@ -153,6 +154,34 @@ func NewMysqlStatusRPCResponse(code string) *MysqlStatusRPCResponse { return &MysqlStatusRPCResponse{RetCode: code} } +type MysqlGTIDSubtractRPCRequest struct { + // The IP of this request + From string + + // The first parameter of the function GTID_SUBTRACT + SubsetGTID string + + // The second parameter of the function GTID_SUBTRACT + SetGTID string +} + +type MysqlGTIDSubtractRPCResponse struct { + // The GTID Subtract of this request + Subtract string + + // Return code to rpc client: + // OK or other errors + RetCode string +} + +func NewMysqlGTIDSubtractRPCRequest() *MysqlGTIDSubtractRPCRequest { + return &MysqlGTIDSubtractRPCRequest{} +} + +func NewMysqlGTIDSubtractRPCResponse(code string) *MysqlGTIDSubtractRPCResponse { + return &MysqlGTIDSubtractRPCResponse{RetCode: code} +} + // user type MysqlUserRPCRequest struct { // The IP of this request diff --git a/src/mysql/api.go b/src/mysql/api.go index 3a5281b..3aa4add 100644 --- a/src/mysql/api.go +++ b/src/mysql/api.go @@ -165,7 +165,7 @@ func (m *Mysql) CheckGTID(followerGTID *model.GTID, candidateGTID *model.GTID) b // gtid_sub is not none, means the follower gtid is bigger than candidate gtid // if viewdiff<=0 and gtid_sub is not null it must be localcommitted - gtid_sub, err := m.GetGtidSubtract(fGTID, cGTID) + gtid_sub, err := m.GetGTIDSubtract(fGTID, cGTID) if err != nil { log.Error("mysql.CheckGTID.error[%v]", err) return false @@ -176,12 +176,12 @@ func (m *Mysql) CheckGTID(followerGTID *model.GTID, candidateGTID *model.GTID) b return false } -func (m *Mysql) GetGtidSubtract(subsetGTID string, setGTID string) (string, error) { +func (m *Mysql) GetGTIDSubtract(subsetGTID string, setGTID string) (string, error) { db, err := m.getDB() if err != nil { return "", err } - return m.mysqlHandler.GetGtidSubtract(db, subsetGTID, setGTID) + return m.mysqlHandler.GetGTIDSubtract(db, subsetGTID, setGTID) } // StartSlaveIOThread used to start the slave io thread. diff --git a/src/mysql/mock.go b/src/mysql/mock.go index 44919d1..c437362 100644 --- a/src/mysql/mock.go +++ b/src/mysql/mock.go @@ -172,8 +172,8 @@ func DefaultCheckGTID(followerGTID *model.GTID, leaderGTID *model.GTID) bool { return false } -// GetGtidSubtract mock. -func (mogtid *MockGTID) GetGtidSubtract(db *sql.DB, slaveGTID string, masterGTID string) (string, error) { +// GetGTIDSubtract mock. +func (mogtid *MockGTID) GetGTIDSubtract(db *sql.DB, slaveGTID string, masterGTID string) (string, error) { return mogtid.GetGtidSubtractFn(db, slaveGTID, masterGTID) } diff --git a/src/mysql/mysql_handler.go b/src/mysql/mysql_handler.go index 7e23c3d..d52f137 100644 --- a/src/mysql/mysql_handler.go +++ b/src/mysql/mysql_handler.go @@ -54,7 +54,7 @@ type MysqlHandler interface { GetUUID(db *sql.DB) (string, error) // get gtid subtract with slavegtid and master gtid - GetGtidSubtract(*sql.DB, string, string) (string, error) + GetGTIDSubtract(*sql.DB, string, string) (string, error) // set global variables SetGlobalSysVar(db *sql.DB, varsql string) error diff --git a/src/mysql/mysqlbase.go b/src/mysql/mysqlbase.go index cd88702..a243488 100644 --- a/src/mysql/mysqlbase.go +++ b/src/mysql/mysqlbase.go @@ -215,8 +215,8 @@ func (my *MysqlBase) WaitUntilAfterGTID(db *sql.DB, targetGTID string) error { return Execute(db, query) } -// GetGtidSubtract used to do "SELECT GTID_SUBTRACT('subsetGTID','setGTID') as gtid_sub" command -func (my *MysqlBase) GetGtidSubtract(db *sql.DB, subsetGTID string, setGTID string) (string, error) { +// GetGTIDSubtract used to do "SELECT GTID_SUBTRACT('subsetGTID','setGTID') as gtid_sub" command +func (my *MysqlBase) GetGTIDSubtract(db *sql.DB, subsetGTID string, setGTID string) (string, error) { query := fmt.Sprintf("SELECT GTID_SUBTRACT('%s','%s') as gtid_sub", subsetGTID, setGTID) rows, err := QueryWithTimeout(db, my.queryTimeout, query) if err != nil { diff --git a/src/mysql/rpc_mysql.go b/src/mysql/rpc_mysql.go index 590b386..c2be817 100644 --- a/src/mysql/rpc_mysql.go +++ b/src/mysql/rpc_mysql.go @@ -106,3 +106,15 @@ func (m *MysqlRPC) Status(req *model.MysqlStatusRPCRequest, rsp *model.MysqlStat rsp.Stats = m.mysql.getStats() return nil } + +// GTIDSubstract returns the mysql GTID subtract info. +func (m *MysqlRPC) GTIDSubtract(req *model.MysqlGTIDSubtractRPCRequest, rsp *model.MysqlGTIDSubtractRPCResponse) error { + var err error + + rsp.RetCode = model.OK + if rsp.Subtract, err = m.mysql.GetGTIDSubtract(req.SubsetGTID, req.SetGTID); err != nil { + rsp.RetCode = err.Error() + return nil + } + return nil +} diff --git a/src/raft/leader.go b/src/raft/leader.go index 77d9d7e..360db16 100644 --- a/src/raft/leader.go +++ b/src/raft/leader.go @@ -444,7 +444,7 @@ func (r *Leader) purgeBinlogStart() { leader.purgeBinlog() } }(r) - r.INFO("purge.bing.start[%vms]...", r.conf.PurgeBinlogInterval) + r.INFO("purge.binlog.start[%vms]...", r.conf.PurgeBinlogInterval) } func (r *Leader) purgeBinlogStop() { diff --git a/src/raft/raft.go b/src/raft/raft.go index d0fbf1f..7820e74 100644 --- a/src/raft/raft.go +++ b/src/raft/raft.go @@ -212,7 +212,7 @@ func (r *Raft) freePeers() { } } -// send command to state machine(F/C/L/I/S) loop with maxSendTime tiemout +// send command to state machine(F/C/L/I/S) loop with maxSendTime timeout // (F/C/L/I/S)-loop should handle it and return func (r *Raft) send(t int, request interface{}, maxSendTime int) (interface{}, error) { if !r.running() {