From b162334429e68c6f62414a82a664a58d65b4fa10 Mon Sep 17 00:00:00 2001 From: tai Date: Wed, 17 Jul 2019 10:34:41 +0800 Subject: [PATCH] patch: Enhance raft's invalid logic --- src/mysql/api.go | 43 +++++++++++-- src/mysql/api_test.go | 117 +++++++++++++++++++++++++++--------- src/mysql/mock.go | 37 ++++++------ src/mysql/mysql.go | 21 +++++++ src/mysql/mysql_handler.go | 3 + src/mysql/mysqlbase.go | 17 +++++- src/mysql/mysqlbase_test.go | 18 +++++- src/raft/follower.go | 8 +-- src/raft/raft_test.go | 6 +- 9 files changed, 210 insertions(+), 60 deletions(-) diff --git a/src/mysql/api.go b/src/mysql/api.go index 36d0cd5..168d1f6 100644 --- a/src/mysql/api.go +++ b/src/mysql/api.go @@ -114,27 +114,58 @@ func (m *Mysql) GTIDGreaterThan(gtid *model.GTID) (bool, model.GTID, error) { return cmp > 0, this, nil } +func (m *Mysql) GetLocalGTID(gtid string) (string, error) { + log := m.log + if gtid == "" { + return "", nil + } + + uuid, err := m.GetUUID() + if err != nil { + log.Error("mysql.GetLocalGTID.error[%v]", err) + return "", err + } + + s_gtid := strings.Split(gtid, ",") + for _, gtid := range s_gtid { + if strings.Contains(gtid, uuid) { + return gtid, nil + } + } + + return "", nil +} + // CheckGTID use to compare the followerGTID and candidateGTID func (m *Mysql) CheckGTID(followerGTID *model.GTID, candidateGTID *model.GTID) bool { log := m.log - fRetrivedGTID := followerGTID.Retrieved_GTID_Set - cRetrivedGTID := candidateGTID.Retrieved_GTID_Set + fExecutedGTID := followerGTID.Executed_GTID_Set + fGTID, err := m.GetLocalGTID(fExecutedGTID) + if err != nil { + log.Error("mysql.CheckGTID.error[%v]", err) + } + + cExecutedGTID := candidateGTID.Executed_GTID_Set + cGTID, err := m.GetLocalGTID(cExecutedGTID) + if err != nil { + log.Error("mysql.CheckGTID.error[%v]", err) + } // follower never generate events, should vote, but if some one execute reset master, this may be error // if a normal restart the follower retrived_gtid_set will be "" can't setState(INVALID) - if fRetrivedGTID == "" { + if fGTID == "" { return false } // candidate has none RetrivedGTID, may be none retrived_gtid_set // this means the candidate or new leader has not written, shouldnt vote - if cRetrivedGTID == "" { + if cGTID == "" { return false } // gtid_sub is not none, means the follower gtid is bigger than candidate gtid - // if viewdiff<=0 it must be localcommitted - gtid_sub, err := m.GetGtidSubtract(fRetrivedGTID, cRetrivedGTID) + // if viewdiff<=0 and gtid_sub is not null it must be localcommitted + gtid_sub, err := m.GetGtidSubtract(fGTID, cGTID) if err != nil { log.Error("mysql.CheckGTID.error[%v]", err) return false diff --git a/src/mysql/api_test.go b/src/mysql/api_test.go index a6dab4b..44c05ed 100644 --- a/src/mysql/api_test.go +++ b/src/mysql/api_test.go @@ -178,6 +178,27 @@ func TestWaitUntilAfterGTID(t *testing.T) { assert.Nil(t, err) } +func TestGetLocalGTID(t *testing.T) { + db, mock, err := sqlmock.New() + assert.Nil(t, err) + defer db.Close() + + //log + log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) + conf := config.DefaultMysqlConfig() + mysql := NewMysql(conf, log) + mysql.db = db + + query := "SELECT @@SERVER_UUID" + columns := []string{"@@SERVER_UUID"} + mockRows := sqlmock.NewRows(columns).AddRow("84030605-66aa-11e6-9465-52540e7fd51c") + mock.ExpectQuery(query).WillReturnRows(mockRows) + + want := "84030605-66aa-11e6-9465-52540e7fd51c:1-160" + got, err := mysql.GetLocalGTID("84030605-66aa-11e6-9465-52540e7fd51c:1-160, 84030605-66bb-11e6-9465-52540e7fd51c:1-160") + assert.Equal(t, want, got) +} + func TestCheckGTID(t *testing.T) { db, mock, err := sqlmock.New() assert.Nil(t, err) @@ -190,13 +211,18 @@ func TestCheckGTID(t *testing.T) { mysql := NewMysql(conf, log) mysql.db = db - // local is a normal follower, leader Executed_Gtid_Set is "" + // local is a normal follower, leader Executed_GTID_Set is "" { + query := "SELECT @@SERVER_UUID" + columns := []string{"@@SERVER_UUID"} + mockRows := sqlmock.NewRows(columns).AddRow("84030605-66aa-11e6-9465-52540e7fd51c") + mock.ExpectQuery(query).WillReturnRows(mockRows) + GTID1 = model.GTID{ - Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:154-160", + Executed_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:154-160", } GTID2 = model.GTID{ - Retrieved_GTID_Set: "", + Executed_GTID_Set: "", } want := false @@ -205,13 +231,13 @@ func TestCheckGTID(t *testing.T) { assert.Equal(t, want, got) } - // local is a normal follower Retrieved_GTID_Set is "", leader Executed_Gtid_Set is "" + // local is a normal follower Executed_GTID_Set is "", leader Executed_GTID_Set is "" { GTID1 = model.GTID{ - Retrieved_GTID_Set: "", + Executed_GTID_Set: "", } GTID2 = model.GTID{ - Retrieved_GTID_Set: "", + Executed_GTID_Set: "", } want := false @@ -220,13 +246,19 @@ func TestCheckGTID(t *testing.T) { assert.Equal(t, want, got) } - // local is a normal follower Retrieved_GTID_Set is "", leader do some dml + // local is a normal follower Executed_GTID_Set is "", leader do some dml { GTID1 = model.GTID{ - Retrieved_GTID_Set: "", + Executed_GTID_Set: "", } + + query := "SELECT @@SERVER_UUID" + columns := []string{"@@SERVER_UUID"} + mockRows := sqlmock.NewRows(columns).AddRow("84030605-66aa-11e6-9465-52540e7fd51c") + mock.ExpectQuery(query).WillReturnRows(mockRows) + GTID2 = model.GTID{ - Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-160", + Executed_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-160", } want := false @@ -237,37 +269,57 @@ func TestCheckGTID(t *testing.T) { // local is a leader bug sprain, remote has leader but has none write { + query := "SELECT @@SERVER_UUID" + columns := []string{"@@SERVER_UUID"} + mockRows := sqlmock.NewRows(columns).AddRow("84030605-66aa-11e6-9465-52540e7fd51c") + mock.ExpectQuery(query).WillReturnRows(mockRows) + GTID1 = model.GTID{ - Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-160", + Executed_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-160", } + + query = "SELECT @@SERVER_UUID" + columns = []string{"@@SERVER_UUID"} + mockRows = sqlmock.NewRows(columns).AddRow("84030605-66aa-11e6-9465-52540e7fd51c") + mock.ExpectQuery(query).WillReturnRows(mockRows) + GTID2 = model.GTID{ - Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-160", + Executed_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-160", } - query := "SELECT GTID_SUBTRACT\\('84030605-66aa-11e6-9465-52540e7fd51c:1-160','84030605-66aa-11e6-9465-52540e7fd51c:1-160'\\) as gtid_sub" - log.Warning("%v", query) - columns := []string{"gtid_sub"} - mockRows := sqlmock.NewRows(columns).AddRow("") + query = "SELECT GTID_SUBTRACT\\('84030605-66aa-11e6-9465-52540e7fd51c:1-160','84030605-66aa-11e6-9465-52540e7fd51c:1-160'\\) as gtid_sub" + columns = []string{"gtid_sub"} + mockRows = sqlmock.NewRows(columns).AddRow("") mock.ExpectQuery(query).WillReturnRows(mockRows) want := false got := mysql.CheckGTID(>ID1, >ID2) - assert.Equal(t, want, got) } // local is a leader bug sprain, remote has leader has writen { + query := "SELECT @@SERVER_UUID" + columns := []string{"@@SERVER_UUID"} + mockRows := sqlmock.NewRows(columns).AddRow("84030605-66aa-11e6-9465-52540e7fd51c") + mock.ExpectQuery(query).WillReturnRows(mockRows) + GTID1 = model.GTID{ - Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-160", + Executed_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-160", } + + query = "SELECT @@SERVER_UUID" + columns = []string{"@@SERVER_UUID"} + mockRows = sqlmock.NewRows(columns).AddRow("84030605-66aa-11e6-9465-52540e7fd51c") + mock.ExpectQuery(query).WillReturnRows(mockRows) + GTID2 = model.GTID{ - Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-160, 84030605-77bb-11e6-9465-52540e7fd51c:1-10", + Executed_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-160, 84030605-77bb-11e6-9465-52540e7fd51c:1-10", } - query := "SELECT GTID_SUBTRACT\\('84030605-66aa-11e6-9465-52540e7fd51c:1-160','84030605-66aa-11e6-9465-52540e7fd51c:1-160, 84030605-77bb-11e6-9465-52540e7fd51c:1-10'\\) as gtid_sub" - columns := []string{"gtid_sub"} - mockRows := sqlmock.NewRows(columns).AddRow("") + query = "SELECT GTID_SUBTRACT\\('84030605-66aa-11e6-9465-52540e7fd51c:1-160','84030605-66aa-11e6-9465-52540e7fd51c:1-160'\\) as gtid_sub" + columns = []string{"gtid_sub"} + mockRows = sqlmock.NewRows(columns).AddRow("") mock.ExpectQuery(query).WillReturnRows(mockRows) want := false @@ -278,16 +330,27 @@ func TestCheckGTID(t *testing.T) { // local is a leader bug sprain and localcommitted, remote has leader has writen { + query := "SELECT @@SERVER_UUID" + columns := []string{"@@SERVER_UUID"} + mockRows := sqlmock.NewRows(columns).AddRow("84030605-66aa-11e6-9465-52540e7fd51c") + mock.ExpectQuery(query).WillReturnRows(mockRows) + GTID1 = model.GTID{ - Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-161", + Executed_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-161", } + + query = "SELECT @@SERVER_UUID" + columns = []string{"@@SERVER_UUID"} + mockRows = sqlmock.NewRows(columns).AddRow("84030605-66aa-11e6-9465-52540e7fd51c") + mock.ExpectQuery(query).WillReturnRows(mockRows) + GTID2 = model.GTID{ - Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-160, 84030605-77bb-11e6-9465-52540e7fd51c:1-10", + Executed_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-160, 84030605-77bb-11e6-9465-52540e7fd51c:1-10", } - query := "SELECT GTID_SUBTRACT\\('84030605-66aa-11e6-9465-52540e7fd51c:1-161','84030605-66aa-11e6-9465-52540e7fd51c:1-160, 84030605-77bb-11e6-9465-52540e7fd51c:1-10'\\) as gtid_sub" - columns := []string{"gtid_sub"} - mockRows := sqlmock.NewRows(columns).AddRow("84030605-66aa-11e6-9465-52540e7fd51c:161") + query = "SELECT GTID_SUBTRACT\\('84030605-66aa-11e6-9465-52540e7fd51c:1-161','84030605-66aa-11e6-9465-52540e7fd51c:1-160'\\) as gtid_sub" + columns = []string{"gtid_sub"} + mockRows = sqlmock.NewRows(columns).AddRow("84030605-66aa-11e6-9465-52540e7fd51c:161") mock.ExpectQuery(query).WillReturnRows(mockRows) want := true @@ -521,7 +584,7 @@ func TestGetGTID(t *testing.T) { want := model.GTID{Master_Log_File: "mysql-bin.000001", Read_Master_Log_Pos: 147, - Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:154-160", + Retrieved_GTID_Set: "", Executed_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:154-160", Slave_IO_Running: true, Slave_SQL_Running: true, diff --git a/src/mysql/mock.go b/src/mysql/mock.go index 2d6fa5e..7d4fb4a 100644 --- a/src/mysql/mock.go +++ b/src/mysql/mock.go @@ -35,6 +35,7 @@ type MockGTID struct { ChangeToMasterFn func(*sql.DB) error WaitUntilAfterGTIDFn func(*sql.DB, string) error GetGtidSubtractFn func(*sql.DB, string, string) (string, error) + GetUUIDFn func(*sql.DB) (string, error) CheckGTIDFn func(*model.GTID, *model.GTID) bool SetGlobalSysVarFn func(*sql.DB, string) error ResetMasterFn func(*sql.DB) error @@ -70,6 +71,16 @@ func (mogtid *MockGTID) GetSlaveGTID(db *sql.DB) (*model.GTID, error) { return mogtid.GetSlaveGTIDFn(db) } +// DefaultGetUUID mock. +func DefaultGetUUID(db *sql.DB) (string, error) { + return "84030605-66aa-11e6-9465-52540e7fd51c", nil +} + +// GetUUID mock. +func (mogtid *MockGTID) GetUUID(db *sql.DB) (string, error) { + return mogtid.GetUUIDFn(db) +} + // DefaultGetMasterGTID mock. func DefaultGetMasterGTID(db *sql.DB) (*model.GTID, error) { gtid := &model.GTID{} @@ -386,6 +397,7 @@ func defaultMockGTID() *MockGTID { mock.ChangeToMasterFn = DefaultChangeToMaster mock.WaitUntilAfterGTIDFn = DefaultWaitUntilAfterGTID mock.GetGtidSubtractFn = DefaultGetGtidSubtract + mock.GetUUIDFn = DefaultGetUUID mock.CheckGTIDFn = DefaultCheckGTID mock.SetGlobalSysVarFn = DefaultSetGlobalSysVar mock.ResetMasterFn = DefaultResetMaster @@ -450,31 +462,21 @@ func GetMasterGTIDA(db *sql.DB) (*model.GTID, error) { return gtid, nil } +func GetUUIDA(db *sql.DB) (string, error) { + return "052077a5-b6f4-ee1b-61ec-d80a8b27d749", nil +} + // NewMockGTIDA mock. func NewMockGTIDA() *MockGTID { mock := defaultMockGTID() mock.GetMasterGTIDFn = GetMasterGTIDA mock.GetSlaveGTIDFn = GetSlaveGTIDA + mock.GetUUIDFn = GetUUIDA return mock } -// GetSlaveGTIDLC mock. -func GetSlaveGTIDLC(db *sql.DB) (*model.GTID, error) { - gtid := &model.GTID{} - gtid.Master_Log_File = "" - gtid.Read_Master_Log_Pos = 0 - gtid.Slave_IO_Running = true - gtid.Slave_SQL_Running = true - gtid.Slave_IO_Running_Str = "Yes" - gtid.Slave_SQL_Running_Str = "Yes" - gtid.Seconds_Behind_Master = "1" - gtid.Last_Error = "" - gtid.Slave_SQL_Running_State = "Slave has read all relay log; waiting for the slave I/O thread to update it" - gtid.Executed_GTID_Set = `052077a5-b6f4-ee1b-61ec-d80a8b27d749:1-37, - 12446bf7-3219-11e5-9434-080027079e3d:8058-963126` - gtid.Retrieved_GTID_Set = `052077a5-b6f4-ee1b-61ec-d80a8b27d749:1-36, - 12446bf7-3219-11e5-9434-080027079e3d:8058-963126` - return gtid, nil +func GetUUIDLC(db *sql.DB) (string, error) { + return "052077a5-b6f4-ee1b-61ec-d80a8b27d749", nil } // GetMasterGTIDLC mock. @@ -505,6 +507,7 @@ func NewMockGTIDLC() *MockGTID { mock.GetSlaveGTIDFn = GetMasterGTIDLC mock.CheckGTIDFn = CheckGTIDLC mock.GetGtidSubtractFn = GetGtidSubtractInvalid + mock.GetUUIDFn = GetUUIDLC return mock } diff --git a/src/mysql/mysql.go b/src/mysql/mysql.go index 0bea927..3f62f81 100644 --- a/src/mysql/mysql.go +++ b/src/mysql/mysql.go @@ -123,6 +123,27 @@ func (m *Mysql) Ping() { m.pingEntry = *pe } +// GetUUID used to get local uuid. +func (m *Mysql) GetUUID() (string, error) { + var err error + var db *sql.DB + var uuid string + log := m.log + + if db, err = m.getDB(); err != nil { + log.Error("mysql.get.local.uuid.error[%v]", err) + return "", err + } + + if uuid, err = m.mysqlHandler.GetUUID(db); err != nil { + log.Error("mysql.get.local.uuid.error[%v]", err) + return "", err + } + log.Info("mysql.get.local.uuid:[%v]", uuid) + + return uuid, nil +} + // GetMasterGTID used to get master binlog info. func (m *Mysql) GetMasterGTID() (*model.GTID, error) { var err error diff --git a/src/mysql/mysql_handler.go b/src/mysql/mysql_handler.go index 332f062..58e0f53 100644 --- a/src/mysql/mysql_handler.go +++ b/src/mysql/mysql_handler.go @@ -48,6 +48,9 @@ type MysqlHandler interface { // waits until slave replication reaches at least targetGTID WaitUntilAfterGTID(*sql.DB, string) error + // get local uuid + GetUUID(db *sql.DB) (string, error) + // get gtid subtract with slavegtid and master gtid GetGtidSubtract(*sql.DB, string, string) (string, error) diff --git a/src/mysql/mysqlbase.go b/src/mysql/mysqlbase.go index 3cf353b..639a637 100644 --- a/src/mysql/mysqlbase.go +++ b/src/mysql/mysqlbase.go @@ -126,7 +126,6 @@ func (my *MysqlBase) GetMasterGTID(db *sql.DB) (*model.GTID, error) { gtid.Master_Log_File = row["File"] gtid.Read_Master_Log_Pos, _ = strconv.ParseUint(row["Position"], 10, 64) gtid.Executed_GTID_Set = row["Executed_Gtid_Set"] - gtid.Retrieved_GTID_Set = row["Executed_Gtid_Set"] gtid.Seconds_Behind_Master = "0" gtid.Slave_IO_Running = true gtid.Slave_SQL_Running = true @@ -134,6 +133,22 @@ func (my *MysqlBase) GetMasterGTID(db *sql.DB) (*model.GTID, error) { return gtid, nil } +// GetUUID used to get local uuid. +func (my *MysqlBase) GetUUID(db *sql.DB) (string, error) { + uuid := "" + query := "SELECT @@SERVER_UUID" + rows, err := QueryWithTimeout(db, reqTimeout, query) + if err != nil { + return uuid, err + } + if len(rows) > 0 { + row := rows[0] + uuid = row["@@SERVER_UUID"] + } + + return uuid, nil +} + // StartSlaveIOThread used to start the io thread. func (my *MysqlBase) StartSlaveIOThread(db *sql.DB) error { cmd := "START SLAVE IO_THREAD" diff --git a/src/mysql/mysqlbase_test.go b/src/mysql/mysqlbase_test.go index f42f0ed..21926ce 100644 --- a/src/mysql/mysqlbase_test.go +++ b/src/mysql/mysqlbase_test.go @@ -136,7 +136,7 @@ func TestMysqlBaseGetMasterGTID(t *testing.T) { want := model.GTID{Master_Log_File: "mysql-bin.000001", Read_Master_Log_Pos: 147, - Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:154-160", + Retrieved_GTID_Set: "", Executed_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:154-160", Slave_IO_Running: true, Slave_SQL_Running: true, @@ -159,6 +159,22 @@ func TestMysqlBaseGetMasterGTID(t *testing.T) { } } +func TestGetUUID(t *testing.T) { + db, mock, err := sqlmock.New() + assert.Nil(t, err) + defer db.Close() + + query := "SELECT @@SERVER_UUID" + columns := []string{"@@SERVER_UUID"} + mockRows := sqlmock.NewRows(columns).AddRow("84030605-66aa-11e6-9465-52540e7fd51c") + mock.ExpectQuery(query).WillReturnRows(mockRows) + + want := "84030605-66aa-11e6-9465-52540e7fd51c" + + got, _ := mysqlbase.GetUUID(db) + assert.Equal(t, want, got) +} + func TestMysqlBaseChangeMasterToCommand(t *testing.T) { db, _, err := sqlmock.New() assert.Nil(t, err) diff --git a/src/raft/follower.go b/src/raft/follower.go index a88d826..887063e 100644 --- a/src/raft/follower.go +++ b/src/raft/follower.go @@ -315,11 +315,12 @@ func (r *Follower) startCheckUpgradeToC() { } } - if cnt >= r.GetQuorums() { - r.WARNING("ping.responses[%v].more.than.half.upgrade.to.candidate", cnt) - r.fUpgradeToC = true + if cnt < r.GetQuorums() { + r.fUpgradeToC = false continue } + r.WARNING("ping.responses[%v].more.than.half.upgrade.to.candidate", cnt) + r.fUpgradeToC = true } } }() @@ -377,7 +378,6 @@ func (r *Follower) degradeToInvalid(followerGTID *model.GTID, candidateGTID *mod r.setState(INVALID) return } - return } // setMySQLAsync used to setting mysql in async diff --git a/src/raft/raft_test.go b/src/raft/raft_test.go index 485f519..2af6e71 100644 --- a/src/raft/raft_test.go +++ b/src/raft/raft_test.go @@ -825,7 +825,6 @@ func TestRaftEpochChangeUnderIDLE(t *testing.T) { // wait epoch change broadcast MockWaitLeaderEggs(rafts, 0) - whoisleader = 0 for _, raft := range rafts { peers := raft.GetPeers() for _, peer := range peers { @@ -1050,10 +1049,8 @@ func TestRaft11Rafts1Cluster(t *testing.T) { // wait leader eggs MockWaitLeaderEggs(rafts, 1) - whoisleader = 0 - for i, raft := range rafts { + for _, raft := range rafts { if raft.getState() == LEADER { - whoisleader = i break } } @@ -1732,6 +1729,7 @@ func TestRaftLeaderWaitUntilAfterGTIDError(t *testing.T) { want := (LEADER + FOLLOWER + FOLLOWER) for _, raft := range rafts { got += raft.getState() + log.Printf("test.raft.state.[%v]", raft.getState()) } assert.True(t, got < want) }