A simple consensus of PBFT
使用 Go 实现简单的 PBFT 共识。共识具体示意图如下:
既然是共识,肯定会有几点,在本共识中,节点定义如下:
type Node struct {
NodeID string
NodeTable map[string]string
View *View
CurrentState *State
CommitMsgs []*RequestMsg
MsgBuffer *MsgBuffer
MsgEntrance chan interface{}
MsgDelivery chan interface{}
Alarm chan bool
}
其中比较重要的是 CurrentState
,它决定着每一个节点处于什么状态并且需要执行什么操作,其具体定义如下:
type State struct {
...
CurrentStage Stage
}
type Stage int
const (
Idle Stage = iota // Node is created successfully, but the consensus process is not started yet.
PrePrepared // The ReqMsgs is processed successfully. The node is ready to head to the Prepare stage.
Prepared // Same with `prepared` stage explained in the original paper.
Committed // Same with `committed-local` stage explained in the original paper.
)
NodeTable
是一个map
类型,主要用于存放相关节点的NodeID
及其url
。其他的变量都与信息的传递有关,我们之后会讲到。
再来说一下View
字段,在PBFT
中有一个视图view
的概念,在一个视图里,一个是主节点,其余的都叫备份节点。主节点负责将来自客户端的请求排好序,然后按序发送给备份节点。但主节点可能是拜占庭的:它会给不同的请求编上相同的序号,或者不去分配序号,或者让相邻的序号不连续。备份节点应当有职责来主动检查这些序号的合法性,并能通过timeout
机制检测到主节点是否已经宕掉。当出现这些异常时,这些备份节点就会触发视图更换view change
协议选举出新的主节点。
视图是一个连续编号的整数。主节点由公式$p=v\ mod\ |R|$得到,$v$是视图编号,$p$是副本编号,$|R|$是副本集合的个数。View字段中应该不只有当前视图
ID,还应保存着当前主节点的
ID,故
View`字段的定义如下:
type View struct {
ID int64
Primary string
}
MsgBuffer
是Node
节点中用于接收信息的一个buffer
,PBFT
中消息的类型主要有四种:
ReqMsgs
:请求信息PrePrepareMsgs
:预准备阶段信息PrepareMsgs
:准备阶段信息CommitMsgs
: 提交阶段信息
MsgBuffer
就是用于接收这些信息的:
type MsgBuffer struct {
ReqMsgs []*RequestMsg
PrePrepareMsgs []*PrePrepareMsg
PrepareMsgs []*VoteMsg
CommitMsgs []*VoteMsg
}
在具体实现中,PrepareMsgs
和CommitMsgs
都属于投票信息,因此可以使用同样的结构进行定义。
RequestMsg
定义如下:
type RequestMsg struct {
Timestamp int64 `json:"timestamp"`
ClinetID string `json:"clientID"`
Operation string `json:"operation"`
SequenceID int64 `json:"sequenceID"`
}
SequenceID
表示的是发送的信息的序列号,因为需要对请求进行排序。ClientID
表示的是发送信息的客户端ID
。
PrePrepareMsgs
定义如下:
type PrePrepareMsg struct {
ViewID int64 `json:"viewID"`
SequenceID int64 `json:"sequenceID"`
Digest string `json:"digest"`
RequestMsg *RequestMsg `json:"requestMsg"`
}
除了RequestMsg之外,其余节点都是处于视图里面的操作,因此都会涉及到ViewID
字段。而且,为保证信息准确,会涉及到签名Digest
字段。
整个PBFT
共识会涉及到两次投票,因此投票信息既适用于PrepareMsgs
,也适用于CommitMsgs
。具体定义如下:
type VoteMsg struct {
ViewID int64 `json:"viewID"`
SequenceID int64 `json:"sequenceID"`
Digest string `json:"digest"`
NodeID string `json:"nodeID"`
MsgType `json:"msgType"`
}
MsgType
就决定着是属于哪一阶段的投票信息:
type MsgType int
const(
PrepareMsg MsgType = iota
CommitMsg
)
在整个共识结束后,还会将消息传递回客户端,此时需要一个ReplyMsg
,具体定义如下:
type ReplyMsg struct {
ViewID int64 `json:"viewID"`
Timestamp int64 `json:"timestamp"`
ClientID string `json:"clientID"`
NodeID string `json:"nodeID"`
Result string `json:"result"`
}
Result
即返回的最后结果。
基本定义介绍完后,我们就可以考虑开始研究共识内部的内容了,第一步肯定是要先创建节点了,其实就是初始化Node
结构体的过程:
func NewNode(nodeID string) *Node {
node := &Node {
nodeID,
map[string]string{
"Apple": "localhost:1111",
"Ball": "localhost:1112",
"Candy": "localhost:1113",
"Dog": "localhost:1114",
},
&View{
viewID,
Primary: "Apple",
},
nil,
make([]*consensus.RequestMsg, 0),
&MsgBuffer{
make([]*consensus.RequestMsg, 0),
make([]*consensus.PrePrepareMsg, 0),
make([]*consensus.VoteMsg, 0),
make([]*consensus.VoteMsg, 0),
},
// channels
make(chan interface{}),
make(chan interface{}),
make(chan bool),
}
return node
}
因为我们暂时还没有ViewID
,所以我们暂定const viewID = 10000000000
。
除了基本的初始化外,我们还需要对信息进行调度,处理信息,设立一个负责检测节点正常运行的通知器。
// Start message dispatcher
go node.dispatchMsg()
// start alarm trigger
go node.alarmToDispatcher()
// start message resolver
go node.resolveMsg()
在处理信息时,信息可能会有两大类,即普通信息和通知信息,普通信息正常处理即可,通知信息则需要额外的处理方式:
func (node *Node) dispatchMsg() {
for {
select {
case msg := <-node.MsgEntrance:
err := node.routeMsg(msg)
if err != nil {
fmt.Println(err)
}
case <- node.Alarm:
err := node.routeMsgWhenAlarmed()
if err != nil {
fmt.Println(err)
}
}
}
}
其实就是对于前面所提及的四种信息进行顺序处理(RequestMsg
, PrePrepareMsg
, PrepareMsg
, CommitMsg
).
处理时仍然分成三种情况来处理,但RequestMsg
, PrePrepareMsg
,这两种处理方式实质上是基本一致的,具体执行操作流程图如下:
逻辑代码如下:
if node.CurrentState == nil {
msgs := make([]*consensus.(MsgType), len(node.MsgBuffer.(MsgType)))
copy(msgs, node.MsgBuffer.(MsgType))
// copy 之后添加新的信息
msgs = append(msgs, msg.(*consensus.(MsgType))
// 将 buffer 清空
node.MsgBuffer.(MsgType) = make([]*consensus.(MsgType), 0)
// 开始发送消息
node.MsgDelivery <- msgs
} else {
// 否则直接添加进 buffer 中
node.MsgBuffer.(MsgType) = append(node.MsgBuffer.(MsgType), msg.(*consensus.(MsgType)))
}
(MsgType)
既可以表示为RequestMsg
,也可以表示为PrePrepareMsg
。
而PrepareMsg
, CommitMsgs
的routeMsg
与前面两种信息处理逻辑完全相反,且在判断条件上稍加了修改,对于PrepareMsg
来说:
// 判断条件
if node.CurrentState == nil || node.CurrentState.CurrentStage != consensus.PrePrepared
对于CommitMsg
来说:
// 判断条件
if node.CurrentState == nil || node.CurrentState.CurrentStage != consensus.Prepared
流程图如下所示:
在该方法中,对于四种信息的处理方式是完全一致的,唯一的区别的就是执行条件(同上一节的内容)。具体处理流程如下:
逻辑代码如下:
msgs := make([]*consensus.(MsgType), len(node.MsgBuffer.(MsgType)))
copy(msgs, node.MsgBuffer.(MsgType))
node.MsgDelivery <- msgs
(MsgType)
表示的是前面所提及的RequestMsg
, PrePrepareMsg
, PrepareMsg
, CommitMsg
四种信息。
该方法是当出现特殊情况时才会触发,这里先假定每隔一段时间执行一次。
func (node *Node) alarmToDispatcher() {
for {
time.Sleep(20 * time.Second)
node.Alarm <- true
}
}
该方法是对前面通过node.MegDelivery
接收的信息进行处理,对于每一个类型的信息又有不同的处理方式。
该方法对Message
序列进行依次处理,处理方法为 GetReq()
。
当节点的CurrentState
为空时即可调用该方法。此方法会初始化共识,并开始执行共识:
func (node *Node) GetReq(reqMsg *consensus.RequestMsg) error {
err := node.createStateForNewConsensus()
if err != nil {
return err
}
// 开始执行共识
prePrepareMsg, err := node.CurrentState.StartConsensus(reqMsg)
if err != nil {
return nil
}
// 发送 getPrePrepare 信息
if prePrepareMsg != nil {
node.Broadcast(prePrepareMsg, "/preprepare")
}
return nil
}
在开始共识时,信息类型就变成了PrePrepareMsg
,当开始执行共识时,会返回该类型的信息序列,当序列不为空时,即可对所有的节点进行广播。
该方法对Message
序列进行依次处理,处理方法为 GetPrePrepare()
。
当节点的CurrentState
为空时即可调用该方法。此方法会将共识转换至Prepare
状态:
func (node *Node) GetPrePrepare(prePrepareMsg *consensus.PrePrepareMsg) error {
err := node.createStateForNewConsensus()
if err != nil {
return err
}
prePareMsg, err := node.CurrentState.PrePrepare(prePrepareMsg)
if err != nil {
return err
}
if prePareMsg != nil {
prePareMsg.NodeID = node.NodeID
node.Broadcast(prePareMsg, "/prepare")
}
return nil
}
通过该方法处理后,信息类型此时转化成PrepareMsg
类型,同样需要将信息进行广播。
该方法对Message
序列进行依次处理,处理方法为 GetPrepare()
。
当节点的CurrentState
不为空是才可调用该方法。此方法会将共识转换至Commit
状态:
func (node *Node) GetPrepare(prepareMsg *consensus.VoteMsg) error {
commitMsg, err := node.CurrentState.Prepare(prepareMsg)
if err != nil {
return err
}
if commitMsg != nil {
commitMsg.NodeID = node.NodeID
node.Broadcast(commitMsg, "/commit")
}
return nil
}
通过该方法处理后,信息类型此时转化成CommitMsg
类型,将信息进行广播。
该方法对Message
序列进行依次处理,处理方法为 GetPrepare()
。
当节点的CurrentState
不为空是才可调用该方法。此方法会将共识转换至Reply
状态:
func (node *Node) GetCommit(prepareMsg *consensus.VoteMsg) error {
replyMsg, committedMsg, err := node.CurrentState.Commit(prepareMsg)
if err != nil {
return err
}
if replyMsg != nil {
if committedMsg == nil {
return errors.New("committed message is nil, even though the reply message is not nil")
}
replyMsg.NodeID = node.NodeID
node.CommitMsgs = append(node.CommitMsgs, committedMsg)
node.Reply(replyMsg)
}
return nil
}
通过该方法处理后,信息类型此时转化成ReplyMsg
类型,将信息进行回复,注意在Commit
阶段内,信息不能为空,若为空时,应进行错误处理。
将回复信息进行简单的输出:
func (node *Node) GetReply(msg *consensus.ReplyMsg) {
fmt.Printf("Result: %s by %s\n", msg.Result, msg.NodeID)
}
在前两种信息处理过程中,发现在每次执行开始时,由于CurrentState
为空,需要创建一个新状态:
func (node *Node) createStateForNewConsensus() error {
// 先检查是否有存在的状态
if node.CurrentState != nil {
return errors.New("another consensus is ongoing")
}
// 获取最后一个序列ID
var lastSequenceID int64
if len(node.CommitMsgs) == 0 {
lastSequenceID = -1
} else {
lastSequenceID = node.CommitMsgs[len(node.CommitMsgs)-1].SequenceID
}
// 创建一个新的状态
node.CurrentState = consensus.CreateState(node.View.ID, lastSequenceID)
return nil
}
在进行广播时,需要用到在一开始Node
中的NodeTable
进行广播。广播的消息需要进行json
处理:
func (node *Node) Broadcast(msg interface{}, path string) map[string]error {
errorMap := make(map[string]error)
for nodeID, url := range node.NodeTable {
if nodeID == node.NodeID {
continue
}
jsonMsg, err := json.Marshal(msg)
send(url + path, jsonMsg)
}
return errorMap
}
Reply
即将信息最终ReplyMsg
信息发送给客户端。
func (node *Node) Reply(msg *consensus.ReplyMsg) error {
for _, value := range node.CommitMsgs {
fmt.Printf("Committed value: %s, %d, %s, %d", value.ClinetID, value.Timestamp, value.Operation, value.SequenceID)
}
jsonMsg, err := json.Marshal(msg)
...
return nil
}
网络信息传递其实就是使用http
进行信息处理。
首先需要设置路由:
func (server *Server) setRoute() {
http.HandleFunc("/req", server.getReq)
http.HandleFunc("/preprepare", server.getPrePrepare)
http.HandleFunc("/prepare", server.getPrepare)
http.HandleFunc("/commit", server.getCommit)
http.HandleFunc("/reply", server.getReply)
}
不同的请求对应的路由不同。但每个路由的处理方法基本上是一致的。
处理信息的方式都是按以下流程执行:
func (server *Server) getXXX(w http.ResponseWriter, r *http.Request) {
var msg consensus.XXXMsg
err := json.NewDecoder(r.Body).Decode(&msg)
server.node.MsgEntrance <- &msg
}
func send(url string, msg []byte) {
buff := bytes.NewBuffer(msg)
http.Post("http://" + url, "application/json", buff)
}
信息发送方法实质上就是调用了http.Post
方法。
最后是共识部分,该部分控制着主要的共识操作。
根据PBFT
的原理,节点个数$N$应当大于等于$3f+1$,这里的$f$是指出现故障/拜占庭节点的数量。在进行Node
初始化的时候启动了4个节点,故此时f的选择为1。
在第3部分中,当node.CurrentState
为空时,需要进行状态创建,具体State
结构如下:
type State struct {
ViewID int64
MsgLogs *MsgLogs
LastSequenceID int64
CurrentStage Stage
}
type MsgLogs struct {
ReqMsg *RequestMsg
PrepareMsgs map[string]*VoteMsg
CommitMsgs map[string]*VoteMsg
}
Stage
的定义在前面提及过,在这里不再赘述。
对于CreateState
来说,可以认为是对State
状态执行初始化:
func CreateState(viewID int64, lastSequenceID int64) *State{
return &State{
ViewID: viewID,
MsgLogs: &MsgLogs{
ReqMsg:nil,
PrepareMsgs:make(map[string]*VoteMsg),
CommitMsgs:make(map[string]*VoteMsg),
},
LastSequenceID: lastSequenceID,
CurrentStage: Idle,
}
}
第3部分中所提及的对于各种状态的处理函数中,都需要执行共识部分的一些转换函数以保证共识状态实现转换。
StartConsensus
对应的是Request
阶段,其目的是将共识状态转换至PrePrepare
状态:
func (state *State) StartConsensus(request *RequestMsg)(*PrePrepareMsg, error) {
sequenceID := time.Now().UnixNano()
// 找到当前序列 id 中的最大值
if state.LastSequenceID != -1 {
for state.LastSequenceID >= sequenceID {
sequenceID += 1
}
}
// 为请求消息对象分配一个新的序列ID
request.SequenceID = sequenceID
// 向日志中保存 reqMsgs
state.MsgLogs.ReqMsg = request
// 获取请求消息的签名
digest, err := digest(request)
// 将状态转换为 pre-prepared
state.CurrentStage = PrePrepared
return PrePrepareMsg
}
在执行签名时,使用的是sha256
方法进行签名,最终返回的是PrePrepareMsg
,表示已经转换至PrePrepare
状态。
PrePrepare
方法对应的是PrePrepare
阶段,其目的是将共识状态转换至Prepare
状态:
func (state *State) PrePrepare(prePrepareMsg *PrePrepareMsg) (*VoteMsg, error) {
// 获取 msg 并将其放入 log 中
state.MsgLogs.ReqMsg = prePrepareMsg.RequestMsg
// 检验信息正确与否
if !state.verifyMsg(prePrepareMsg.ViewID, prePrepareMsg.SequenceID, prePrepareMsg.Digest) {
return nil, errors.New("pre-prepare message is corrupted")
}
// 将状态更改为 pre-prepare
state.CurrentStage = PrePrepared
return VoteMsg
}
在对由前一阶段转换后的PrePrepareMsg
进行验证后,将状态转换至Prepare
,并生成PrepareMsg
。
Prepare
方法对应的是Prepare
阶段,其目的是将共识状态转换至Commit
状态:
func (state *State) Prepare(prepareMsg *VoteMsg) (*VoteMsg, error) {
if !state.verifyMsg(prepareMsg.ViewID, prepareMsg.SequenceID, prepareMsg.Digest) {
return nil, errors.New("Prepare message is corrupted.")
}
// 输出当前投票信息
fmt.Printf("[Prepare-Vote]: %d\n", len(state.MsgLogs.PrepareMsgs))
if state.prepared() {
// 更改当前状态至 prepared
state.CurrentStage = Prepared
return VoteMsg
}
return nil, nil
}
在对由前一阶段转换后的PrepareMsg
进行验证后,将状态转换至Commit
,并生成CommitMsg
。需要注意的是,在对其进行转换前,还需要对状态进行一次判定,以确定该状态为prepared
,这样才能够将其转换至CommitMsg
类型。
prepared
方法具体逻辑如下:
func (state *State) prepared() bool {
if state.MsgLogs.ReqMsg == nil {
return false
}
if len(state.MsgLogs.PrepareMsgs) < 2 * f {
return false
}
return true
}
该方法的主要作用实际上就是在检查该共识的得票数是否达到标准,若达到标准则继续执行后面的操作,否则本次共识失败。
Commit
方法对应的是Commit
阶段,其目的是将共识状态转换至Reply
状态:
func (state *State) Commit(commitMsg *VoteMsg) (*ReplyMsg, *RequestMsg, error) {
if !state.verifyMsg(commitMsg.ViewID, commitMsg.SequenceID, commitMsg.Digest) {
return nil, nil, errors.New("commit message is corrupted")
}
if state.committed() {
// 此节点在本地执行请求的操作并获取结果。
result := "Executed"
// 更改状态至 committed
state.CurrentStage = Committed
return ReplyMsg
}
return nil, nil, nil
}
其中涉及到committed
方法,该方法与前面所述的prepared
具有同样的功能。
该方法主要是用来检查消息的准确性:
func (state *State) verifyMsg(viewID int64, sequenceID int64, digestGot string) bool {
// 试图错误,将导致无法启动共识
if state.ViewID != viewID {
return false
}
// 检查是否传递错误序列号
if state.LastSequenceID != -1 {
// 要保证传递的 sequenceID 是比 LastSequenceID 大的
if state.LastSequenceID >= sequenceID {
return false
}
}
digest, err := digest(state.MsgLogs.ReqMsg)
// 检验 digest
if digestGot != digest {
return false
}
return true
}
经过上述步骤,完整的PBFT
基本实现。