From 2026588a1c9a107a3cd5cd8ec6e605218569ad60 Mon Sep 17 00:00:00 2001 From: WenShuang Lu Date: Wed, 27 Nov 2019 14:44:44 +0800 Subject: [PATCH] raft: add learner state for rebuildme --- src/cli/callx/callx.go | 16 +++ src/cli/cmd/mysql.go | 25 ++-- src/model/ha.go | 1 + src/raft/attr.go | 5 + src/raft/learner.go | 214 ++++++++++++++++++++++++++++++ src/raft/raft.go | 4 + src/raft/raft_test.go | 69 ++++++++++ src/raft/rpc_ha.go | 22 ++++ src/raft/rpc_ha_test.go | 280 ++++++++++++++++++++++++++++++++++++++++ 9 files changed, 628 insertions(+), 8 deletions(-) create mode 100644 src/raft/learner.go diff --git a/src/cli/callx/callx.go b/src/cli/callx/callx.go index ad08a78..d30a04e 100644 --- a/src/cli/callx/callx.go +++ b/src/cli/callx/callx.go @@ -553,6 +553,22 @@ func DisableRaftRPC(node string) (*model.HARPCResponse, error) { return rsp, err } +func SetLearnerRPC(node string) (*model.HARPCResponse, error) { + cli, cleanup, err := GetClient(node) + + if err != nil { + return nil, err + } + defer cleanup() + + method := model.RPCHASetLearner + req := model.NewHARPCRequest() + rsp := model.NewHARPCResponse(model.OK) + err = cli.Call(method, req, rsp) + + return rsp, err +} + func TryToLeaderRPC(node string) (*model.HARPCResponse, error) { cli, cleanup, err := GetClient(node) if err != nil { diff --git a/src/cli/cmd/mysql.go b/src/cli/cmd/mysql.go index 53fd531..27a55f2 100644 --- a/src/cli/cmd/mysql.go +++ b/src/cli/cmd/mysql.go @@ -210,11 +210,11 @@ func mysqlRebuildMeCommandFn(cmd *cobra.Command, args []string) { log.Warning("S3-->check.bestone[%v].is.OK....", bestone) } - // 4. disable raft + // 4. set learner { - log.Warning("S4-->disable.raft") - if _, err := callx.DisableRaftRPC(self); err != nil { - log.Error("disableRaftRPC.error[%v]", err) + log.Warning("S4-->set.learner") + if _, err := callx.SetLearnerRPC(self); err != nil { + log.Error("SetLearnerRPC.error[%v]", err) } } @@ -328,11 +328,20 @@ func mysqlRebuildMeCommandFn(cmd *cobra.Command, args []string) { // 16. enable raft { - log.Warning("S16-->enable.raft.begin...") - if _, err := callx.EnableRaftRPC(self); err != nil { - log.Error("enbleRaftRPC.error[%v]", err) + // check whether the state is IDLE or not + if conf.Raft.SuperIDLE { + log.Warning("S16-->disable.raft.again...") + if _, err := callx.DisableRaftRPC(self); err != nil { + log.Error("enbleRaftRPC.error[%v]", err) + } + log.Warning("S16-->run.as.IDLE...") + } else { + log.Warning("S16-->enable.raft.begin...") + if _, err := callx.EnableRaftRPC(self); err != nil { + log.Error("enbleRaftRPC.error[%v]", err) + } + log.Warning("S16-->enable.raft.done...") } - log.Warning("S16-->enable.raft.done...") } // 17. wait change to master diff --git a/src/model/ha.go b/src/model/ha.go index 79dbe26..c3eb6b0 100644 --- a/src/model/ha.go +++ b/src/model/ha.go @@ -9,6 +9,7 @@ package model const ( + RPCHASetLearner = "HARPC.HASetLearner" RPCHADisable = "HARPC.HADisable" RPCHAEnable = "HARPC.HAEnable" RPCHATryToLeader = "HARPC.HATryToLeader" diff --git a/src/raft/attr.go b/src/raft/attr.go index d84c731..fc03109 100644 --- a/src/raft/attr.go +++ b/src/raft/attr.go @@ -42,6 +42,9 @@ const ( // neither process heartbeat nor voterequest(return ErrorInvalidRequest) INVALID + // LEARNER state. + LEARNER + // STOPPED state. STOPPED ) @@ -59,6 +62,8 @@ func (s State) String() string { case 1 << 4: return "INVALID" case 1 << 5: + return "LEARNER" + case 1 << 6: return "STOPPED" } return "UNKNOW" diff --git a/src/raft/learner.go b/src/raft/learner.go new file mode 100644 index 0000000..7a484fd --- /dev/null +++ b/src/raft/learner.go @@ -0,0 +1,214 @@ +/* + * Xenon + * + * Copyright 2018 The Xenon Authors. + * Code is licensed under the GPLv3. + * + */ + +package raft + +import ( + "model" +) + +// LEARNER is a special STATE with other FOLLOWER/CANDICATE/LEADER states. +// It is usually used as READ-ONLY but does not have RAFT features, such as +// LEADER election +// FOLLOWER promotion +// +// Because of we bring LEARNER state in RaftRPCResponse as vote-request response, +// the LEARNER vote will be filtered out by other CANDIDATEs. +// LEARNER is one member of a RAFT cluster but without the rights to vote. + +// Learner tuple. +type Learner struct { + *Raft + + // learner process heartbeat request handler + processHeartbeatRequestHandler func(*model.RaftRPCRequest) *model.RaftRPCResponse + + // learner process voterequest request handler + processRequestVoteRequestHandler func(*model.RaftRPCRequest) *model.RaftRPCResponse + + // learner process raft ping request handler + processPingRequestHandler func(*model.RaftRPCRequest) *model.RaftRPCResponse +} + +// NewLearner creates new learner. +func NewLearner(r *Raft) *Learner { + B := &Learner{Raft: r} + B.initHandlers() + return B +} + +// Loop used to start the loop of the state machine. +//-------------------------------------- +// State Machine +//-------------------------------------- +// in LEARNER state, we never do leader election +// +func (r *Learner) Loop() { + // update begin + r.updateStateBegin() + r.stateInit() + + for r.getState() == LEARNER { + select { + case <-r.fired: + r.WARNING("state.machine.loop.got.fired") + case e := <-r.c: + switch e.Type { + // 1) Heartbeat + case MsgRaftHeartbeat: + req := e.request.(*model.RaftRPCRequest) + rsp := r.processHeartbeatRequestHandler(req) + e.response <- rsp + + // 2) RequestVote + case MsgRaftRequestVote: + req := e.request.(*model.RaftRPCRequest) + rsp := r.processRequestVoteRequest(req) + e.response <- rsp + + // 3) Ping + case MsgRaftPing: + req := e.request.(*model.RaftRPCRequest) + rsp := r.processPingRequestHandler(req) + e.response <- rsp + + default: + r.ERROR("get.unknown.request[%v]", e.Type) + } + } + } +} + +// processHeartbeatRequest +// EFFECT +// handles the heartbeat request from the leader +// In LEARNER state, we only handle the master changed +// +func (r *Learner) processHeartbeatRequest(req *model.RaftRPCRequest) *model.RaftRPCResponse { + rsp := model.NewRaftRPCResponse(model.OK) + rsp.Raft.From = r.getID() + rsp.Raft.ViewID = r.getViewID() + rsp.Raft.EpochID = r.getEpochID() + rsp.Raft.State = r.state.String() + rsp.Relay_Master_Log_File = r.mysql.RelayMasterLogFile() + + if !r.checkRequest(req) { + rsp.RetCode = model.ErrorInvalidRequest + return rsp + } + + viewdiff := (int)(r.getViewID() - req.GetViewID()) + epochdiff := (int)(r.getEpochID() - req.GetEpochID()) + switch { + case viewdiff <= 0: + // MySQL1: disable master semi-sync because I am a slave + if err := r.mysql.DisableSemiSyncMaster(); err != nil { + r.ERROR("mysql.DisableSemiSyncMaster.error[%v]", err) + } + + // MySQL2: set mysql readonly(mysql maybe down and up then the LEADER changes) + if err := r.mysql.SetReadOnly(); err != nil { + r.ERROR("mysql.SetReadOnly.error[%v]", err) + } + + // MySQL3: start slave + if err := r.mysql.StartSlave(); err != nil { + r.ERROR("mysql.StartSlave.error[%v]", err) + } + + // MySQL4: change master + if r.getLeader() != req.GetFrom() { + r.WARNING("get.heartbeat.from[N:%v, V:%v, E:%v].change.mysql.master[%+v]", req.GetFrom(), req.GetViewID(), req.GetEpochID(), req.GetGTID()) + + if err := r.mysql.ChangeMasterTo(&req.Repl); err != nil { + r.ERROR("change.master.to[FROM:%v, GTID:%v].error[%v]", req.GetFrom(), req.GetRepl(), err) + rsp.RetCode = model.ErrorChangeMaster + return rsp + } + r.leader = req.GetFrom() + } + + // view change + if viewdiff < 0 { + r.WARNING("get.heartbeat.from[N:%v, V:%v, E:%v].update.view", req.GetFrom(), req.GetViewID(), req.GetEpochID()) + r.updateView(req.GetViewID(), req.GetFrom()) + } + + // epoch change + if epochdiff != 0 { + r.WARNING("get.heartbeat.from[N:%v, V:%v, E:%v].update.epoch", req.GetFrom(), req.GetViewID(), req.GetEpochID()) + r.updateEpoch(req.GetEpochID(), req.GetPeers()) + } + } + return rsp +} + +// processRequestVoteRequest +// EFFECT +// handles the requestvote request from other CANDIDATEs +// LEARNER is special, it returns ErrorVoteNotGranted expect Request Denied +// +// RETURN +// 1. ErrorVoteNotGranted: dont give any vote +func (r *Learner) processRequestVoteRequest(req *model.RaftRPCRequest) *model.RaftRPCResponse { + rsp := model.NewRaftRPCResponse(model.ErrorVoteNotGranted) + rsp.Raft.From = r.getID() + rsp.Raft.ViewID = r.getViewID() + rsp.Raft.EpochID = r.getEpochID() + rsp.Raft.State = r.state.String() + + if !r.checkRequest(req) { + rsp.RetCode = model.ErrorInvalidRequest + return rsp + } + return rsp +} + +func (r *Learner) processPingRequest(req *model.RaftRPCRequest) *model.RaftRPCResponse { + rsp := model.NewRaftRPCResponse(model.OK) + rsp.Raft.State = r.state.String() + return rsp +} + +func (r *Learner) stateInit() { + // 1. stop vip + if err := r.leaderStopShellCommand(); err != nil { + // TODO(array): what todo? + r.ERROR("stopshell.error[%v]", err) + } + + // MySQL1: set readonly + if err := r.mysql.SetReadOnly(); err != nil { + r.ERROR("mysql.SetReadOnly.error[%v]", err) + } + + // MySQL2. set mysql slave system variables + if err := r.mysql.SetSlaveGlobalSysVar(); err != nil { + r.ERROR("mysql.SetSlaveGlobalSysVar.error[%v]", err) + } +} + +// handlers +func (r *Learner) initHandlers() { + r.setProcessHeartbeatRequestHandler(r.processHeartbeatRequest) + r.setProcessRequestVoteRequestHandler(r.processRequestVoteRequest) + r.setProcessPingRequestHandler(r.processPingRequest) +} + +// for tests +func (r *Learner) setProcessHeartbeatRequestHandler(f func(*model.RaftRPCRequest) *model.RaftRPCResponse) { + r.processHeartbeatRequestHandler = f +} + +func (r *Learner) setProcessRequestVoteRequestHandler(f func(*model.RaftRPCRequest) *model.RaftRPCResponse) { + r.processRequestVoteRequestHandler = f +} + +func (r *Learner) setProcessPingRequestHandler(f func(*model.RaftRPCRequest) *model.RaftRPCResponse) { + r.processPingRequestHandler = f +} diff --git a/src/raft/raft.go b/src/raft/raft.go index c2b3710..4dae6ac 100644 --- a/src/raft/raft.go +++ b/src/raft/raft.go @@ -63,6 +63,7 @@ type Raft struct { F *Follower I *Idle IV *Invalid + LN *Learner peers map[string]*Peer stats model.RaftStats skipPurgeBinlog bool // if true, purge binlog will skipped @@ -89,6 +90,7 @@ func NewRaft(id string, conf *config.RaftConfig, log *xlog.Log, mysql *mysql.Mys r.F = NewFollower(r) r.I = NewIdle(r) r.IV = NewInvalid(r) + r.LN = NewLearner(r) // setup raft timeout r.resetHeartbeatTimeout() @@ -234,6 +236,8 @@ func (r *Raft) stateLoop() { r.I.Loop() case INVALID: r.IV.Loop() + case LEARNER: + r.LN.Loop() } state = r.getState() } diff --git a/src/raft/raft_test.go b/src/raft/raft_test.go index 2af6e71..2e52730 100644 --- a/src/raft/raft_test.go +++ b/src/raft/raft_test.go @@ -1779,3 +1779,72 @@ func TestRaftLeaderChangeToMasterError(t *testing.T) { assert.True(t, got < want) } } + +// TEST EFFECTS: +// test election under LEARNER in the minority +// +// TEST PROCESSES: +// 1. set rafts GTID +// 1.0 rafts[0] with MockGTID_X1{Master_Log_File = "mysql-bin.000001", Read_Master_Log_Pos = 123} +// 1.1 rafts[1] with MockGTID_X3{Master_Log_File = "mysql-bin.000003", Read_Master_Log_Pos = 123} +// 1.2 rafts[2] with MockGTID_X5{Master_Log_File = "mysql-bin.000005", Read_Master_Log_Pos = 123} +// 2. Start 3 rafts state as FOLLOWER +// 3. wait rafts[2] elected as leader +// 4. set rafts[0] to LEARNER +// 5. wait a few election cycles, the leader remains the same +func TestRaftElectionUnderLearnerInMinority(t *testing.T) { + log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) + port := common.RandomPort(8000, 9000) + _, rafts, cleanup := MockRafts(log, port, 3) + defer cleanup() + + // 1. set rafts GTID + // 1.0 rafts[0] with MockGTIDB{Master_Log_File = "mysql-bin.000001", Read_Master_Log_Pos = 123} + // 1.1 rafts[1] with MockGTIDB{Master_Log_File = "mysql-bin.000003", Read_Master_Log_Pos = 123} + // 1.2 rafts[2] with MockGTIDC{Master_Log_File = "mysql-bin.000005", Read_Master_Log_Pos = 123} + { + rafts[0].mysql.SetMysqlHandler(mysql.NewMockGTIDX1()) + rafts[1].mysql.SetMysqlHandler(mysql.NewMockGTIDX3()) + rafts[2].mysql.SetMysqlHandler(mysql.NewMockGTIDX5()) + } + + // 2. start 3 rafts state as FOLLOWER + for _, raft := range rafts { + raft.Start() + } + + // 3. check new leader is rafts[2] + var whoisleader int + { + var got State + MockWaitLeaderEggs(rafts, 1) + want := (LEADER + FOLLOWER + FOLLOWER) + for i, raft := range rafts { + got += raft.getState() + if raft.getState() == LEADER { + whoisleader = i + } + } + assert.Equal(t, want, got) + assert.Equal(t, whoisleader, 2) + } + + // 4. set rafts[0] to LEARNER and set rafts[2] to FOLLOWER + MockStateTransition(rafts[whoisleader], FOLLOWER) + MockStateTransition(rafts[0], LEARNER) + + // 5. wait a few election cycles, the leader remains the same + { + var got State + time.Sleep(time.Millisecond * 3000) + want := (LEADER + FOLLOWER + LEARNER) + for i, raft := range rafts { + got += raft.getState() + if raft.getState() == LEADER { + whoisleader = i + } + } + assert.Equal(t, want, got) + assert.Equal(t, whoisleader, 2) + } +} diff --git a/src/raft/rpc_ha.go b/src/raft/rpc_ha.go index c6931d2..7f33809 100644 --- a/src/raft/rpc_ha.go +++ b/src/raft/rpc_ha.go @@ -37,6 +37,23 @@ func (h *HARPC) HADisable(req *model.HARPCRequest, rsp *model.HARPCResponse) err return nil } +// HASetLearner rpc. +func (h *HARPC) HASetLearner(req *model.HARPCRequest, rsp *model.HARPCResponse) error { + h.raft.WARNING("RPC.HASetLearner.call.from[%v]", req.GetFrom()) + + // except state STOPPED + state := h.raft.getState() + switch state { + case STOPPED: + rsp.RetCode = model.ErrorInvalidRequest + return nil + } + h.raft.setState(LEARNER) + h.raft.loopFired() + rsp.RetCode = model.OK + return nil +} + // HAEnable rpc. func (h *HARPC) HAEnable(req *model.HARPCRequest, rsp *model.HARPCResponse) error { h.raft.WARNING("RPC.HAEnable.call.from[%v]", req.GetFrom()) @@ -54,6 +71,11 @@ func (h *HARPC) HAEnable(req *model.HARPCRequest, rsp *model.HARPCResponse) erro } rsp.RetCode = model.OK return nil + case LEARNER: + h.raft.setState(FOLLOWER) + h.raft.loopFired() + rsp.RetCode = model.OK + return nil case STOPPED: rsp.RetCode = model.ErrorInvalidRequest return nil diff --git a/src/raft/rpc_ha_test.go b/src/raft/rpc_ha_test.go index 61cf017..8f8a910 100644 --- a/src/raft/rpc_ha_test.go +++ b/src/raft/rpc_ha_test.go @@ -166,6 +166,286 @@ func TestRaftRPCHA(t *testing.T) { } } +// TEST EFFECTS: +// test a hasetlearner command from follower by the client +// +// TEST PROCESSES: +// 1. Start rpc server +// 2. send command to rpc server +// 3. check the response +func TestRaftRPCHASetLearnerFromFollower(t *testing.T) { + log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) + port := common.RandomPort(8000, 9000) + names, rafts, scleanup := MockRafts(log, port, 3) + learner := 2 + defer scleanup() + + // 1. Start 3 rafts state as FOLLOWER + { + for _, raft := range rafts { + raft.Start() + } + + var want, got State + got = 0 + want = (FOLLOWER + FOLLOWER + FOLLOWER) + for _, raft := range rafts { + got += raft.getState() + } + + // [FOLLOWER, FOLLOWER, FOLLOWER] + assert.Equal(t, want, got) + } + + // 2. set rafts[2] to LEARNER + { + c, cleanup := MockGetClient(t, names[learner]) + defer cleanup() + + method := model.RPCHASetLearner + req := model.NewHARPCRequest() + rsp := model.NewHARPCResponse(model.OK) + err := c.Call(method, req, rsp) + assert.Nil(t, err) + + want := model.OK + got := rsp.RetCode + assert.Equal(t, want, got) + } + + // 3. check + { + MockWaitLeaderEggs(rafts, 1) + + var want, got State + got = 0 + want = (LEADER + FOLLOWER + LEARNER) + for _, raft := range rafts { + got += raft.getState() + } + // [LEADER, FOLLOWER, LEARNER] + assert.Equal(t, want, got) + } + + // 4. enable ha for rafts[2] + { + c, cleanup := MockGetClient(t, names[learner]) + defer cleanup() + + method := model.RPCHAEnable + req := model.NewHARPCRequest() + rsp := model.NewHARPCResponse(model.OK) + err := c.Call(method, req, rsp) + assert.Nil(t, err) + + want := model.OK + got := rsp.RetCode + assert.Equal(t, want, got) + } + + // 5. check + { + MockWaitLeaderEggs(rafts, 1) + + var want, got State + got = 0 + want = (LEADER + FOLLOWER + FOLLOWER) + for _, raft := range rafts { + got += raft.getState() + } + // [LEADER, FOLLOWER, FOLLOWER] + assert.Equal(t, want, got) + } +} + +// TEST EFFECTS: +// test a hasetlearner command from invalid by the client +// +// TEST PROCESSES: +// 1. Start rpc server +// 2. send command to rpc server +// 3. check the response +func TestRaftRPCHASetLearnerFromInvalid(t *testing.T) { + log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) + port := common.RandomPort(8000, 9000) + names, rafts, scleanup := MockRafts(log, port, 3) + learner := 2 + defer scleanup() + + // 1. Start 3 rafts state as FOLLOWER and set rafts[2] to INVALID + { + for _, raft := range rafts { + raft.Start() + } + + MockStateTransition(rafts[learner], INVALID) + + var want, got State + got = 0 + want = (FOLLOWER + FOLLOWER + INVALID) + for _, raft := range rafts { + got += raft.getState() + } + + // [FOLLOWER, FOLLOWER, INVALID] + assert.Equal(t, want, got) + } + + // 2. set rafts[2] to LEARNER + { + c, cleanup := MockGetClient(t, names[learner]) + defer cleanup() + + method := model.RPCHASetLearner + req := model.NewHARPCRequest() + rsp := model.NewHARPCResponse(model.OK) + err := c.Call(method, req, rsp) + assert.Nil(t, err) + + want := model.OK + got := rsp.RetCode + assert.Equal(t, want, got) + } + + // 3. check + { + MockWaitLeaderEggs(rafts, 1) + + var want, got State + got = 0 + want = (LEADER + FOLLOWER + LEARNER) + for _, raft := range rafts { + got += raft.getState() + } + // [LEADER, FOLLOWER, LEARNER] + assert.Equal(t, want, got) + } + + // 4. enable ha for rafts[2] + { + c, cleanup := MockGetClient(t, names[learner]) + defer cleanup() + + method := model.RPCHAEnable + req := model.NewHARPCRequest() + rsp := model.NewHARPCResponse(model.OK) + err := c.Call(method, req, rsp) + assert.Nil(t, err) + + want := model.OK + got := rsp.RetCode + assert.Equal(t, want, got) + } + + // 5. check + { + MockWaitLeaderEggs(rafts, 1) + + var want, got State + got = 0 + want = (LEADER + FOLLOWER + FOLLOWER) + for _, raft := range rafts { + got += raft.getState() + } + // [LEADER, FOLLOWER, FOLLOWER] + assert.Equal(t, want, got) + } +} + +// TEST EFFECTS: +// test a hasetlearner command from idle by the client +// +// TEST PROCESSES: +// 1. Start rpc server +// 2. send command to rpc server +// 3. check the response +func TestRaftRPCHASetLearnerFromIdle(t *testing.T) { + log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) + port := common.RandomPort(8000, 9000) + names, rafts, scleanup := MockRafts(log, port, 3) + learner := 2 + defer scleanup() + + // 1. Start 3 rafts state as FOLLOWER and set rafts[2] to IDLE + { + for _, raft := range rafts { + raft.Start() + } + + MockStateTransition(rafts[learner], IDLE) + + var want, got State + got = 0 + want = (FOLLOWER + FOLLOWER + IDLE) + for _, raft := range rafts { + got += raft.getState() + } + + // [FOLLOWER, FOLLOWER, IDLE] + assert.Equal(t, want, got) + } + + // 2. set rafts[2] to LEARNER + { + c, cleanup := MockGetClient(t, names[learner]) + defer cleanup() + + method := model.RPCHASetLearner + req := model.NewHARPCRequest() + rsp := model.NewHARPCResponse(model.OK) + err := c.Call(method, req, rsp) + assert.Nil(t, err) + + want := model.OK + got := rsp.RetCode + assert.Equal(t, want, got) + } + + // 3. check + { + MockWaitLeaderEggs(rafts, 1) + + var want, got State + got = 0 + want = (LEADER + FOLLOWER + LEARNER) + for _, raft := range rafts { + got += raft.getState() + } + // [LEADER, FOLLOWER, LEARNER] + assert.Equal(t, want, got) + } + + // 4. enable ha for rafts[2] + { + c, cleanup := MockGetClient(t, names[learner]) + defer cleanup() + + method := model.RPCHAEnable + req := model.NewHARPCRequest() + rsp := model.NewHARPCResponse(model.OK) + err := c.Call(method, req, rsp) + assert.Nil(t, err) + + want := model.OK + got := rsp.RetCode + assert.Equal(t, want, got) + } + + // 5. check + { + MockWaitLeaderEggs(rafts, 1) + + var want, got State + got = 0 + want = (LEADER + FOLLOWER + FOLLOWER) + for _, raft := range rafts { + got += raft.getState() + } + // [LEADER, FOLLOWER, FOLLOWER] + assert.Equal(t, want, got) + } +} + // TEST EFFECTS: // test TryToLeader command from the client //