-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathleader.go
135 lines (117 loc) · 2.74 KB
/
leader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package jsn_raft
import (
"fmt"
"sync"
"github.com/jsn4ke/jsn_raft/v2/pb"
)
func (r *Raft) runLeader() {
r.logger.Info("[%v] run leader",
r.who)
var (
// orphanTimeout = time.After(randomTimeout(r.orphanTimeout()))
lastLogIndex = r.lastLogIndex()
// 通知所有replication结束
replicationDone = make(chan struct{})
// 通知leader任期结束
usurper = make(chan uint64, 1)
// 通知leader更新自身的commit index
commitmentNotify = make(chan struct{}, 1)
// 通知所有的replication有新的数据拉取
fetchList []chan struct{}
)
// 生成leader 易失性数据 next 和 match
commitment := &commitment{
rw: sync.RWMutex{},
notify: commitmentNotify,
who: make(map[string]int),
nextIndex: []int64{},
matchIndex: []int64{},
commitIndex: r.getCommitIndex(),
leaderStartIndex: lastLogIndex + 1,
}
// 初始化 commitment
for _, v := range r.config.List {
commitment.who[v.Who] = len(commitment.nextIndex)
commitment.nextIndex = append(commitment.nextIndex, lastLogIndex+1)
commitment.matchIndex = append(commitment.matchIndex, 0)
}
// 提交leader的首条日志
r.appendLog(&pb.JsnLog{
Cmd: []byte(fmt.Sprintf("leader commit log term %v", r.getCurrentTerm())),
})
lastLogIndex = r.lastLogIndex()
commitment.updateIndex(r.who, lastLogIndex+1, lastLogIndex)
// 生成同步逻辑
for _, v := range r.config.List {
if v.Who == r.who {
continue
}
fetch := make(chan struct{}, 1)
fetchList = append(fetchList, fetch)
rpl := newReplication(r, commitment, v.Who, replicationDone, usurper, fetch)
r.safeGo("replication", func() {
rpl.run()
})
}
// 创建完毕,执行第一次同步
for _, v := range fetchList {
notifyChan(v)
}
// todo 临时清理
defer func() {
// 通知所有replication结束
close(replicationDone)
// 清空log
of:
for {
select {
case <-r.logModify:
default:
break of
}
}
// 清空下
select {
case <-r.leaderTransfer:
default:
}
}()
of:
// 清空log
for {
select {
case <-r.logModify:
default:
break of
}
}
// 清空下
select {
case <-r.leaderTransfer:
default:
}
// 开始leader的逻辑
for leader == r.getServerState() {
select {
case term := <-usurper:
if term > r.getCurrentTerm() {
r.setServerState(follower)
r.setCurrentTerm(term)
}
case <-commitmentNotify:
lastCommitIndex := r.getCommitIndex()
newCommitIndex := commitment.getCommitmentIndex()
r.setCommitIndex(newCommitIndex)
if newCommitIndex > lastCommitIndex {
// todo 日志落地
}
case jlog := <-r.logModify:
r.appendLog(jlog)
for _, v := range fetchList {
notifyChan(v)
}
case <-r.leaderTransfer:
r.setServerState(follower)
}
}
}