-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcandidate.go
157 lines (138 loc) · 3.41 KB
/
candidate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package raft
import (
"context"
"sync"
)
var _ server = (*candidate)(nil)
// candidate 实现一致性模型在 candidate 状态下的行为
type candidate struct {
*raft
once sync.Once
}
func (c *candidate) Run() (server, error) {
config := c.raft.configs.GetConfig()
peers := config.GetPeers()
voteCh, err := c.elect(peers)
if err != nil {
return nil, err
}
decider := config.NewDecider()
for {
select {
case <-c.Done():
return nil, ErrStopped
case args := <-c.rpcArgs:
server, converted, err := c.reactToRPCArgs(args)
if err != nil {
return nil, err
}
if converted {
return server, nil
}
case <-c.ticker.C:
c.debug("Election timeout")
// If election timeout elapses:
// start new election
return c.toCandidate(), nil
case voterId, ok := <-voteCh:
if !ok {
c.debug("Failed to win the election")
voteCh = (<-chan RaftId)(nil)
continue
}
// If votes received from
// majority of servers: become leader
decider.AddVote(voterId)
if decider.HasAchievedMajority() {
c.debug("Achieved Majority vote(%v)", decider.Counts())
return c.toLeader()
}
}
}
}
func (c *candidate) Handle(context.Context, ...Command) error {
return ErrIsNotLeader
}
func (c *candidate) ResetTimer() {
c.once.Do(func() {
c.debug("Reset election timer")
timeout := c.randomElectionTimeout()
c.ticker.Reset(timeout)
})
}
func (c *candidate) String() string {
return "Candidate"
}
// reactToRPCArgs
//
// • If AppendEntries RPC received from new leader: convert to follower
//
// While waiting for votes, a candidate may receive an
// AppendEntries RPC from another server claiming to be
// leader. If the leader’s term (included in its RPC) is at least
// as large as the candidate’s current term, then the candidate
// recognizes the leader as legitimate and returns to follower
// state. If the term in the RPC is smaller than the candidate’s
// current term, then the candidate rejects the RPC and continues in candidate state.
func (c *candidate) reactToRPCArgs(args rpcArgs) (server server, converted bool, err error) {
if args.getType() == rpcArgsTypeAppendEntriesArgs {
if args.getTerm() >= c.GetCurrentTerm() {
server, err = c.toFollower(args.getTerm())
if err != nil {
return nil, false, err
}
return server, true, nil
} else {
return nil, false, nil
}
}
return c.raft.reactToRPCArgs(args)
}
// elect
//
// Send RequestVote RPCs to all other servers
func (c *candidate) elect(peers []RaftPeer) (<-chan RaftId, error) {
lastLogIndex, lastLogTerm, err := c.Last()
if err != nil {
return nil, err
}
args := RequestVoteArgs{
Term: c.GetCurrentTerm(),
CandidateId: c.Id(),
LastLogIndex: lastLogIndex,
LastLogTerm: lastLogTerm,
}
voteCh := make(chan RaftId, len(peers))
go func() {
defer close(voteCh)
var wg sync.WaitGroup
for _, peer := range peers {
id, addr := peer.Id, peer.Addr
if c.Id() == id {
voteCh <- id
continue
}
wg.Add(1)
go func() {
defer wg.Done()
c.debug("-> Request a vote %s", id)
results, err := c.rpc.CallRequestVote(addr, args)
if err != nil {
c.debug("Call %s's RequestVote, err: %+v", id, err)
return
}
if results.VoteGranted {
c.debug("<- Vote up %s", id)
voteCh <- id
} else {
c.debug("<- Vote down %s", id)
}
}()
}
wg.Wait()
}()
return voteCh, nil
}
func (*candidate) IsLeader() bool {
return false
}