正好这些天要有一个需求要帮客户魔改Fabric-v0.6,把一些hyperchain的高级特性移植过去,借此机会把之前看过的源码在梳理一下。
下面就是对Fabric共识模块的源码分析和梳理,代码都是以Fabric-v0.6-preview为例,在1.0及后续版本中都移除了PBFT部分,用了更好的SBFT,目前这一部分还在开发中。
目录结构
可以看到共识模块目录如下。
1 2 3 4 5 6 7 8 9
| consensus ├── controller ├── executor ├── helper │ └── persist ├── noops ├── pbft └── util └── events
|
目录含义如下
controller
用来控制Fabric选择什么样的共识算法,默认是noops
。
executor
封装了消息队列中对交易的处理。
helper
对外提供接口调用和数据持久化接口。
noops
提供了如何编写Fabric共识算法的Demo。
pbft
PBFT算法的具体实现。
util
实现了一个peer节点到共识算法的一个消息通道,和一个消息队列。
流程概览
Fabric网络通过一个EventLoop
和共识算法进行交互,所有的操作都通过对事件循环中的事件监听进行推进。
整体流程如下图所示。
Consensus模块接口
fabric/consensus/consensus.go
对外提供共识模块的方法调用。
其中最核心也是每个算法必须实现的接口是Consenter
。
1 2 3 4 5 6 7 8 9 10 11
| type ExecutionConsumer interface { Executed(tag interface{}) Committed(tag interface{}, target *pb.BlockchainInfo) RolledBack(tag interface{}) StateUpdated(tag interface{}, target *pb.BlockchainInfo) } type Consenter interface { RecvMsg(msg *pb.Message, senderHandle *pb.PeerID) error ExecutionConsumer }
|
接口的具体实现在fabric/consensus/pbft/external.go
。
因为对交易的操作都是异步的,所以必须手动实现Executed
,Committed
,RolledBack
,StateUpdated
方法来监听对应动作的完成。
RecvMsg
方法用来从不用的peer
节点接收消息。
初始化共识模块
共识算法引擎在peer
启动的时候初始化,初始化的具体函数如下所示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| func GetEngine(coord peer.MessageHandlerCoordinator) (peer.Engine, error) { var err error engineOnce.Do(func() { engine = new(EngineImpl) engine.helper = NewHelper(coord) engine.consenter = controller.NewConsenter(engine.helper) engine.helper.setConsenter(engine.consenter) engine.peerEndpoint, err = coord.GetPeerEndpoint() engine.consensusFan = util.NewMessageFan() go func() { logger.Debug("Starting up message thread for consenter") for msg := range engine.consensusFan.GetOutChannel() { engine.consenter.RecvMsg(msg.Msg, msg.Sender) } }() }) return engine, err }
|
GetEngine
的作用是进行共识模块的初始化,同时启动一个goroutine
等待消息进入共识。
具体的engine.consenter
是在consensus/controller/controller.go
里进行选择。
1 2 3 4 5 6 7 8 9 10 11 12
| func NewConsenter(stack consensus.Stack) consensus.Consenter { plugin := strings.ToLower(viper.GetString("peer.validator.consensus.plugin")) if plugin == "pbft" { logger.Infof("Creating consensus plugin %s", plugin) return pbft.GetPlugin(stack) } logger.Info("Creating default consensus plugin (noops)") return noops.GetNoops(stack) }
|
默认选择的是noops
,如果需要添加自己编写的共识模块需要在这里自行添加判断。
noops
只是演示如何编写Fabric共识模块,不要用在生产环境。
如果选择了PBFT
则会调用consensus/pbft/pbft.go
进行初始化。
使用PBFT
的batch
模式启动时会调用newObcBatch
进行PBFT
算法初始化。
PBFT只有batch
一种模式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| func newObcBatch(id uint64, config *viper.Viper, stack consensus.Stack) *obcBatch { var err error ... op.manager = events.NewManagerImpl() op.manager.SetReceiver(op) etf := events.NewTimerFactoryImpl(op.manager) op.pbft = newPbftCore(id, config, op, etf) op.manager.Start() blockchainInfoBlob := stack.GetBlockchainInfoBlob() op.externalEventReceiver.manager = op.manager ... return op }
|
newObcBatch
主要做了这几项工作
- 初始化了
eventLoop
的消息队列。
- 设置了消息的接收者,用来处理对应的消息。
- 创建监听消息超时的定时器。
- 初始化
pbft
算法。
- 启动消息队列,不断监听事件的到来并且分发给接收者处理。
消息处理
Fabric的共识消息是通过eventLoop
注射给对应处理函数的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| func SendEvent(receiver Receiver, event Event) { next := event for { next = receiver.ProcessEvent(next) if next == nil { break } } } func (em *managerImpl) Inject(event Event) { if em.receiver != nil { SendEvent(em.receiver, event) } } func (em *managerImpl) eventLoop() { for { select { case next := <-em.events: em.Inject(next) case <-em.exit: logger.Debug("eventLoop told to exit") return } } }
|
eventLoop
函数不断的从em.events
里取出事件,通过Inject
注射给对应的接收者,注意,通过SendEvent
注射给接收者的ProcessEvent
方法。
SendEvent
函数实现非常有意思,如果receiver.ProcessEvent
的返回不为nil
则不断的调用receiver.ProcessEvent
直到找到对应的消息处理函数,在ProcessEvent
函数中,其余case
均为事件处理函数,唯独pbftMessage
依赖SendEvent
发送消息给其余函数处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| func (instance *pbftCore) ProcessEvent(e events.Event) events.Event { ... case *pbftMessage: return pbftMessageEvent(*et) case pbftMessageEvent: msg := et logger.Debugf("Replica %d received incoming message from %v", instance.id, msg.sender) next, err := instance.recvMsg(msg.msg, msg.sender) if err != nil { break } return next case *RequestBatch: err = instance.recvRequestBatch(et) case *PrePrepare: err = instance.recvPrePrepare(et) ... }
|
可以看到*pbftMessage
和pbftMessageEvent
这两个case
通过recvMsg
的返回值又把消息分发给其余case
,非常巧妙。
PBFT
算法的不同阶段都会按着上面的流程映射到不同的处理函数往前推进,本质上是一个状态机。
至此Fabric的Consensus
模块主要流程已经梳理清楚,熟悉了这个流程以后再结合PBFT
算法的过程就可以很容易在此基础上添加新的功能了。