正好这些天要有一个需求要帮客户魔改Fabric-v0.6,把一些hyperchain的高级特性移植过去,借此机会把之前看过的源码在梳理一下。
下面就是对Fabric共识模块的源码分析和梳理,代码都是以Fabric-v0.6-preview为例,在1.0及后续版本中都移除了PBFT部分,用了更好的SBFT,目前这一部分还在开发中。

目录结构

可以看到共识模块目录如下。

1
2
3
4
5
6
7
8
9
consensus
├── controller
├── executor
├── helper
│ └── persist
├── noops
├── pbft
└── util
└── events

目录含义如下

  • controller 用来控制Fabric选择什么样的共识算法,默认是noops
  • executor 封装了消息队列中对交易的处理。
  • helper 对外提供接口调用和数据持久化接口。
  • noops 提供了如何编写Fabric共识算法的Demo。
  • pbft PBFT算法的具体实现。
  • util 实现了一个peer节点到共识算法的一个消息通道,和一个消息队列。

流程概览

Fabric网络通过一个EventLoop和共识算法进行交互,所有的操作都通过对事件循环中的事件监听进行推进。

整体流程如下图所示。
Alt text

Consensus模块接口

fabric/consensus/consensus.go对外提供共识模块的方法调用。

其中最核心也是每个算法必须实现的接口是Consenter

1
2
3
4
5
6
7
8
9
10
11
type ExecutionConsumer interface {
Executed(tag interface{})
Committed(tag interface{}, target *pb.BlockchainInfo)
RolledBack(tag interface{})
StateUpdated(tag interface{}, target *pb.BlockchainInfo)
}
type Consenter interface {
RecvMsg(msg *pb.Message, senderHandle *pb.PeerID) error
ExecutionConsumer
}

接口的具体实现在fabric/consensus/pbft/external.go

因为对交易的操作都是异步的,所以必须手动实现ExecutedCommittedRolledBackStateUpdated方法来监听对应动作的完成。

RecvMsg方法用来从不用的peer节点接收消息。

初始化共识模块

共识算法引擎在peer启动的时候初始化,初始化的具体函数如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// consensus/helper/engine.go
func GetEngine(coord peer.MessageHandlerCoordinator) (peer.Engine, error) {
var err error
engineOnce.Do(func() {
engine = new(EngineImpl)
engine.helper = NewHelper(coord)
engine.consenter = controller.NewConsenter(engine.helper)
engine.helper.setConsenter(engine.consenter)
engine.peerEndpoint, err = coord.GetPeerEndpoint()
engine.consensusFan = util.NewMessageFan()
go func() {
logger.Debug("Starting up message thread for consenter")
for msg := range engine.consensusFan.GetOutChannel() {
engine.consenter.RecvMsg(msg.Msg, msg.Sender)
}
}()
})
return engine, err
}

GetEngine的作用是进行共识模块的初始化,同时启动一个goroutine等待消息进入共识。

具体的engine.consenter是在consensus/controller/controller.go里进行选择。

1
2
3
4
5
6
7
8
9
10
11
12
// consensus/controller/controller.go
func NewConsenter(stack consensus.Stack) consensus.Consenter {
plugin := strings.ToLower(viper.GetString("peer.validator.consensus.plugin"))
if plugin == "pbft" {
logger.Infof("Creating consensus plugin %s", plugin)
return pbft.GetPlugin(stack)
}
logger.Info("Creating default consensus plugin (noops)")
return noops.GetNoops(stack)
}

默认选择的是noops,如果需要添加自己编写的共识模块需要在这里自行添加判断。

noops 只是演示如何编写Fabric共识模块,不要用在生产环境。

如果选择了PBFT则会调用consensus/pbft/pbft.go进行初始化。

使用PBFTbatch模式启动时会调用newObcBatch进行PBFT算法初始化。

PBFT只有batch一种模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// consensus/pbft/batch.go
func newObcBatch(id uint64, config *viper.Viper, stack consensus.Stack) *obcBatch {
var err error
...
op.manager = events.NewManagerImpl()
op.manager.SetReceiver(op)
etf := events.NewTimerFactoryImpl(op.manager)
op.pbft = newPbftCore(id, config, op, etf)
op.manager.Start()
blockchainInfoBlob := stack.GetBlockchainInfoBlob()
op.externalEventReceiver.manager = op.manager
...
return op
}

newObcBatch主要做了这几项工作

  • 初始化了eventLoop的消息队列。
  • 设置了消息的接收者,用来处理对应的消息。
  • 创建监听消息超时的定时器。
  • 初始化pbft算法。
  • 启动消息队列,不断监听事件的到来并且分发给接收者处理。

消息处理

Fabric的共识消息是通过eventLoop注射给对应处理函数的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// consensus/util/events/events.go
func SendEvent(receiver Receiver, event Event) {
next := event
for {
next = receiver.ProcessEvent(next)
if next == nil {
break
}
}
}
func (em *managerImpl) Inject(event Event) {
if em.receiver != nil {
SendEvent(em.receiver, event)
}
}
func (em *managerImpl) eventLoop() {
for {
select {
case next := <-em.events:
em.Inject(next)
case <-em.exit:
logger.Debug("eventLoop told to exit")
return
}
}
}

eventLoop函数不断的从em.events里取出事件,通过Inject注射给对应的接收者,注意,通过SendEvent注射给接收者的ProcessEvent方法。

SendEvent函数实现非常有意思,如果receiver.ProcessEvent的返回不为nil则不断的调用receiver.ProcessEvent直到找到对应的消息处理函数,在ProcessEvent函数中,其余case均为事件处理函数,唯独pbftMessage依赖SendEvent发送消息给其余函数处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// consensus/pbft/pbft-core.go
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
...
case *pbftMessage:
return pbftMessageEvent(*et)
case pbftMessageEvent:
msg := et
logger.Debugf("Replica %d received incoming message from %v", instance.id, msg.sender)
next, err := instance.recvMsg(msg.msg, msg.sender)
if err != nil {
break
}
return next
case *RequestBatch:
err = instance.recvRequestBatch(et)
case *PrePrepare:
err = instance.recvPrePrepare(et)
...
}

可以看到*pbftMessagepbftMessageEvent这两个case通过recvMsg的返回值又把消息分发给其余case,非常巧妙。

PBFT算法的不同阶段都会按着上面的流程映射到不同的处理函数往前推进,本质上是一个状态机。

至此Fabric的Consensus模块主要流程已经梳理清楚,熟悉了这个流程以后再结合PBFT算法的过程就可以很容易在此基础上添加新的功能了。