本篇文章主要是PBFT共识的简单实现,其中有许多地方都做了简化。PBFT的原理已在上篇文章中描述过,如果对PBFT的原理不太清晰的的可以进行查看。文章地址:共识算法学习总结。
代码实现的主要功能有:通过客户端添加区块,使用libp2p的mdns进行节点发现,动态的添加节点。
在启动客户端时,首先根据端口号创建一个客户端,然后启动客户端。
var clientCmd = &cobra.Command{
Use: "client",
Short: "client manage",
Run: func(cmd *cobra.Command, args []string) {
// 获取客户端的端口
port, err := cmd.Flags().GetInt("port")
if err != nil {
log.Println("get param error: ", err)
}
// 客户端传递的数据
data, err := cmd.Flags().GetString("data")
if err != nil{
log.Println("get param error: ", err)
}
client := NewClient(port)
client.Start(data)
},
}
创建的客户端为libp2p节点,并设置节点的私钥。这里的加密算法使用的是Ed25519算法,不但效率更高并且可以在别的节点获取当前节点的公钥。
// 创建客户端
func NewClient(listenPort int) *Client {
// 生成密钥对
r := rand.Reader
prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 2048, r)
if err != nil{
log.Println(err)
}
pubKey := prvKey.GetPublic()
sourceMultiAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", "127.0.0.1", listenPort))
// 创建libp2p节点
h, err := libp2p.New(
libp2p.ListenAddrs(sourceMultiAddr),
libp2p.Identity(prvKey),
)
if err != nil {
log.Println("创建的客户端节点失败:", err)
}
h.SetStreamHandler(protocol.ID(protocolID), handleStream)
fmt.Printf(">>> 创建客户端p2p节点成功,客户端多路地址是: /ip4/%s/tcp/%v/p2p/%s\n", "0.0.0.0", listenPort, h.ID().Pretty())
keyPair := Keypair{
privkey: prvKey,
pubkey: pubKey,
}
// 创建客户端
client := &Client{
h,
keyPair,
[]KnownNode{},
sync.Mutex{},
make(map[string]*common.ReplyMsg),
}
fmt.Println(">>> 创建客户端成功...")
return client
}
客户端启动时,首先不断获取网络中的节点,然后发送request请求,并等待回应。
这里做了简化,在客户端启动时就直接发送request请求。
func (c *Client) Start (data string) {
fmt.Println(">>> 开始启动客户端...")
ctx := context.Background()
// 通过协程获取网络中的节点,使用libp2p的mdns节点发现
go c.getAllKonwons(c.client)
// 发送客户端请求
c.sendRequest(ctx, data)
// 处理响应
go c.handleConnection()
select {}
}
客户端首先创建request消息,消息格式为<REQUEST, o, t, c>。o: 请求的具体操作,t: 请求时客户端追加的时间戳,c:客户端标识。REQUEST: 包含消息内容m,以及消息摘要d(m)。客户端对请求进行签名。
客户端创建完request请求后就可以向主节点发送该请求。
func (c *Client) sendRequest(ctx context.Context, data string) {
fmt.Println(">>> 客户端准备request消息...")
// 构建request
req := common.Request{
data,
hex.EncodeToString(common.GenerateDigest(data)),
}
// 序列化pubKey
marshalPubkey, err := crypto.MarshalPublicKey(c.keypair.pubkey)
sendClient := common.SendClient{
c.client.ID().Pretty(),
marshalPubkey,
}
// 构建request消息
reqMsg := common.RequestMsg{
"solve",
int(time.Now().Unix()),
sendClient,
req,
}
// 对发送的消息进行签名
sig, err := c.signMessage(reqMsg)
if err != nil{
fmt.Printf("%v\n", err)
}
// 组合并发送消息
c.send(ctx, common.ComposeMsg(common.HRequest, reqMsg, sig), c.findPrimaryNode())
fmt.Println(">>> 客户端发送消息完成...")
}
客户端发送数据时,首先连接到主节点,然后打开与主节点的stream。再打开数据发送的通道,最后序列化数据并发数据添加到发送数据的通道中。
func (c *Client) send(ctx context.Context, msg []byte, node KnownNode) {
// 开始连接到主节点
if err := c.client.Connect(ctx, node.h); err != nil{
log.Println(">>> 连接到主节点失败")
}
// 打开stream
s, err := c.client.NewStream(context.Background(), node.h.ID, protocol.ID(protocolID))
if err != nil {
fmt.Println(">>> 打开stream失败", err)
}
fmt.Println(">>> 开始连接到: ", node.h)
rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
// 准备发送数据的通道
go sendData(rw)
// 序列化数据
data, err := json.Marshal(msg)
if err != nil{
fmt.Println("序列化数据错误", err)
}
sendDataChan <- data
close(sendDataChan)
}
服务端启动启动时,首先创建一个server,然后启动server。
var serverCmd = &cobra.Command{
Use: "server",
Short: "server manage",
Run: func(cmd *cobra.Command, args []string) {
port, err := cmd.Flags().GetInt("port")
if err != nil {
log.Println("get param error: ", err)
}
// 创建server
server := NewServer(port)
// 开始server
server.start()
},
}
创建服务端与创建客户端类似。
func NewNode(port int) *Node {
// 生成密钥对
r := rand.Reader
prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 2048, r)
if err != nil{
log.Println(err)
}
pubKey := prvKey.GetPublic()
sourceMultiAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", "127.0.0.1", port))
// 创建libp2p节点
h, err := libp2p.New(
libp2p.ListenAddrs(sourceMultiAddr),
libp2p.Identity(prvKey),
)
if err != nil {
log.Println(err)
}
h.SetStreamHandler(protocol.ID(protocolID), handleStream)
fmt.Printf(">>> 创建客户端p2p节点成功,客户端多路地址是: /ip4/%s/tcp/%v/p2p/%s\n", "0.0.0.0", port, h.ID().Pretty())
keyPair := Keypair{
privkey: prvKey,
pubkey: pubKey,
}
// 创建node
return &Node{
[]KnownNode{},
ClientNode{},
0,
h,
ViewID,
make(chan []byte),
keyPair,
&MsgLog{
make(map[string]map[string]bool),
make(map[string]map[string]bool),
make(map[string]map[string]bool),
make(map[string]bool),
},
make(map[string]*common.RequestMsg),
sync.Mutex{},
}
}
服务端的启动仅仅开启一个消息处理协程。 通过消息处理协程,会把服务端接收到的消息分配到对应的逻辑中进行处理。
func (node *Node)Start() {
// 处理消息
go node.handleMsg()
}
func (node *Node) handleMsg() {
fmt.Println(">>> 启动节点,等待接收消息...")
for {
// 待改进 todo
rawData := <- receiveChan
// 反序列得到的数据
var data []byte
err := json.Unmarshal([]byte(rawData), &data)
if err != nil {
fmt.Println("反序列消息化失败:", err)
return
}
// 分割消息,分别处理不同的消息
header, payload, sign:= common.SplitMsg(data)
switch header {
case common.HRequest:
node.handleRequest(payload, sign)
case common.HPrePrepare:
node.handlePrePrepare(payload, sign)
case common.HPrepare:
node.handlePrepare(payload, sign)
case common.HCommit:
node.handleCommit(payload, sign)
default:
fmt.Println("===============无法处理对应的消息============")
}
}
}
节点接收到消息后,首先反序列化request消息,然后设置客户端。接着校验request消息的摘要及签名。通过验证后放入请求池,接着创建pre-prepare消息并进行签名。最组合消息进行发送。剩下的三个函数类似,这里就不再叙述。
func (node *Node) handleRequest(payload []byte, sig []byte) {
fmt.Println(">>> 主节点接收request消息...")
var request common.RequestMsg
var prePrepareMsg common.PrePrepareMsg
// 反序列化请求消息
err := json.Unmarshal(payload, &request)
if err != nil{
log.Println("反序列化request错误: ", err)
return
}
// 设置节点的客户端
clientPubKey, err := crypto.UnmarshalPublicKey(request.Client.PubKey)
if err != nil{
fmt.Println(">>> 反序列化客户端公钥失败", err)
}
clientNode := ClientNode{
request.Client.ID,
clientPubKey,
}
node.clientNode = clientNode
// 校验request的摘要
vdig := common.VerifyDigest(request.CRequest.Message, request.CRequest.Digest)
if vdig == false {
fmt.Printf("验证摘要错误\n")
return
}
// 校验request的签名
_, err = common.VerifySignatrue(request, sig, clientPubKey)
if err != nil {
fmt.Printf("验证签名错误:%v\n", err)
return
}
// 添加进请求池
node.mutex.Lock()
node.requestPool[request.CRequest.Digest] = &request
seqID := node.getSequenceID()
node.mutex.Unlock()
// 构建pre-Prepare消息
prePrepareMsg = common.PrePrepareMsg{
request,
request.CRequest.Digest,
ViewID,
seqID,
}
// 消息签名
msgSig, err:= node.signMessage(prePrepareMsg)
if err != nil{
fmt.Printf("%v\n", err)
return
}
// 消息组合
msg := common.ComposeMsg(common.HPrePrepare, prePrepareMsg, msgSig)
// 日志处理
node.mutex.Lock()
if node.msgLog.preprepareLog[prePrepareMsg.Digest] == nil {
node.msgLog.preprepareLog[prePrepareMsg.Digest] = make(map[string]bool)
}
node.msgLog.preprepareLog[prePrepareMsg.Digest][node.node.ID().String()] = true
node.mutex.Unlock()
// 序列化消息
data, err := json.Marshal(msg)
if err != nil{
fmt.Println("序列化request消息出错", err)
return
}
fmt.Println(">>> 主节点广播prePrepare消息...")
// 广播消息
node.broadcast(data)
}
func (node *Node) handlePrePrepare(payload []byte, sig []byte) {
fmt.Println(">>> 副节点开始接收prePrepare消息...")
// 反序列化prePrepare消息
var prePrepareMsg common.PrePrepareMsg
err := json.Unmarshal(payload,&prePrepareMsg)
if err != nil {
fmt.Printf("error happened:%v", err)
return
}
// 找到主节点的公钥
pnodeId := node.findPrimaryNode()
pubKey, err := pnodeId.h.ID.ExtractPublicKey()
if err != nil {
fmt.Println("获取主节点的公钥失败", err)
return
}
// 校验消息签名
_, err = common.VerifySignatrue(prePrepareMsg, sig, pubKey)
if err != nil {
fmt.Printf("验证主节点签名错误:%v\n", err)
return
}
// 校验消息的摘要
if prePrepareMsg.Digest != prePrepareMsg.Request.CRequest.Digest {
fmt.Printf("校验摘要错误\n")
return
}
node.mutex.Lock()
node.requestPool[prePrepareMsg.Request.CRequest.Digest] = &prePrepareMsg.Request
node.mutex.Unlock()
// 校验request的摘要
err = node.verifyRequestDigest(prePrepareMsg.Digest)
if err != nil{
fmt.Printf("%v\n", err)
return
}
node.mutex.Lock()
node.requestPool[prePrepareMsg.Request.CRequest.Digest] = &prePrepareMsg.Request
node.mutex.Unlock()
err = node.verifyRequestDigest(prePrepareMsg.Digest)
if err != nil{
fmt.Printf("%v\n", err)
return
}
node.mutex.Lock()
if node.msgLog.preprepareLog[prePrepareMsg.Digest] == nil {
node.msgLog.preprepareLog[prePrepareMsg.Digest] = make(map[string]bool)
}
node.msgLog.preprepareLog[prePrepareMsg.Digest][node.node.ID().String()] = true
node.mutex.Unlock()
// 构建prePare消息
prepareMsg := common.PrepareMsg{
prePrepareMsg.Digest,
ViewID,
prePrepareMsg.SequenceID,
node.node.ID(),
}
// 签名
msgSig, err := common.SignMessage(prepareMsg, node.keypair.privkey)
if err != nil{
fmt.Printf("%v\n", err)
return
}
// 消息组合
sendMsg := common.ComposeMsg(common.HPrepare,prepareMsg,msgSig)
// 序列化消息
data, err := json.Marshal(sendMsg)
if err != nil{
fmt.Println("序列化prepare消息出错", err)
return
}
fmt.Println(">>> 副节点广播prepare消息...")
node.broadcast(data)
}
func (node *Node) handlePrepare(payload []byte, sig []byte) {
fmt.Println(">>> 副节点开始接收prepare消息...")
// 反序列化prepare消息
var prepareMsg common.PrepareMsg
err := json.Unmarshal(payload,&prepareMsg)
if err != nil {
fmt.Printf("error happened:%v", err)
return
}
// 得到节点的公钥
pnodeID := prepareMsg.NodeID
pubKey, err:= findNodePubkey(pnodeID)
if err != nil {
fmt.Println("获取主节点的公钥失败", err)
return
}
_, err = common.VerifySignatrue(prepareMsg, sig, pubKey)
if err != nil {
fmt.Printf("校验签名prepare消息错误:%v\n", err)
return
}
err = node.verifyRequestDigest(prepareMsg.Digest)
if err != nil{
fmt.Printf("%v\n", err)
return
}
// 日记记录
node.mutex.Lock()
if node.msgLog.prepareLog[prepareMsg.Digest] == nil {
node.msgLog.prepareLog[prepareMsg.Digest] = make(map[string]bool)
}
node.msgLog.prepareLog[prepareMsg.Digest][prepareMsg.NodeID.String()] = true
node.mutex.Unlock()
// if receive prepare msg >= 2f +1, then broadcast commit msg
limit := node.countNeedReceiveMsgAmount()
sum, err := node.findVerifiedPrepareMsgCount(prepareMsg.Digest)
if err != nil {
fmt.Printf("error happened:%v", err)
return
}
if sum >= limit {
//send commit msg
commitMsg := common.CommitMsg{
prepareMsg.Digest,
prepareMsg.ViewID,
prepareMsg.SequenceID,
node.node.ID(),
}
sig, err := node.signMessage(commitMsg)
if err != nil{
fmt.Printf("sign message happened error:%v\n", err)
}
sendMsg := common.ComposeMsg(common.HCommit,commitMsg,sig)
data, err := json.Marshal(sendMsg)
if err != nil{
fmt.Println("序列化commit消息出错", err)
}
node.broadcast(data)
fmt.Println(">>> 副节点广播commit消息成功")
}
}
func (node *Node) handleCommit(payload []byte, sig []byte) {
fmt.Println(">>> 副节点开始接收commit消息")
// 反序列化消息
var commitMsg common.CommitMsg
err := json.Unmarshal(payload,&commitMsg)
if err != nil {
fmt.Printf("error happened:%v", err)
}
msgPubKey, err := findNodePubkey(commitMsg.NodeID)
if err != nil{
fmt.Println(err)
return
}
verify, err := common.VerifySignatrue(commitMsg, sig, msgPubKey)
if err != nil {
fmt.Printf("verify signature failed:%v\n", err)
return
}
if verify == false {
fmt.Printf("verify signature failed\n")
return
}
err = node.verifyRequestDigest(commitMsg.Digest)
if err != nil{
fmt.Printf("%v\n", err)
return
}
node.mutex.Lock()
if node.msgLog.commitLog[commitMsg.Digest] == nil {
node.msgLog.commitLog[commitMsg.Digest] = make(map[string]bool)
}
node.msgLog.commitLog[commitMsg.Digest][commitMsg.NodeID.String()] = true
node.mutex.Unlock()
// if receive commit msg >= 2f +1, then send reply msg to client
limit := node.countNeedReceiveMsgAmount()
sum, err := node.findVerifiedCommitMsgCount(commitMsg.Digest)
if err != nil{
fmt.Printf("error happened:%v", err)
return
}
if sum >= limit {
// if already send reply msg, then do nothing
node.mutex.Lock()
exist := node.msgLog.replyLog[commitMsg.Digest]
node.mutex.Unlock()
if exist == true {
return
}
// send reply msg
node.mutex.Lock()
requestMsg := node.requestPool[commitMsg.Digest]
node.mutex.Unlock()
fmt.Printf("operstion:%s message:%s executed... \n",requestMsg.Operation, requestMsg.CRequest.Message)
done := fmt.Sprintf("operstion:%s message:%s done ",requestMsg.Operation, requestMsg.CRequest.Message)
replyMsg := common.ReplyMsg{
node.View,
int(time.Now().Unix()),
requestMsg.Client.ID,
node.node.ID().String(),
done,
}
hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", requestMsg.Client.ID))
clientNode, err := peer.AddrInfoFromString(hostAddr.String())
fmt.Println(">>> 客户端地址:", clientNode.ID.Pretty())
if err != nil{
fmt.Println(err)
return
}
fmt.Println(">>> 开始向客户端回复数据...")
sendMsg := common.ComposeMsg(common.HReply,replyMsg,[]byte{})
data, err := json.Marshal(sendMsg)
if err != nil{
fmt.Println("序列化commit消息出错", err)
}
node.reply(context.Background(), data, *clientNode)
node.mutex.Lock()
node.msgLog.replyLog[commitMsg.Digest] = true
node.mutex.Unlock()
fmt.Println(">>> 回复客户端成功...")
}
}

项目中做了很多简化并且有很多设计不合理的地方,以后会继续进行改进。源码:https://github.com/blockchainGuide/Consensus_Algorithm
如何在buildr项目中使用Ruby?我在很多不同的项目中使用过Ruby、JRuby、Java和Clojure。我目前正在使用我的标准Ruby开发一个模拟应用程序,我想尝试使用Clojure后端(我确实喜欢功能代码)以及JRubygui和测试套件。我还可以看到在未来的不同项目中使用Scala作为后端。我想我要为我的项目尝试一下buildr(http://buildr.apache.org/),但我注意到buildr似乎没有设置为在项目中使用JRuby代码本身!这看起来有点傻,因为该工具旨在统一通用的JVM语言并且是在ruby中构建的。除了将输出的jar包含在一个独特的、仅限ruby
在rails源中:https://github.com/rails/rails/blob/master/activesupport/lib/active_support/lazy_load_hooks.rb可以看到以下内容@load_hooks=Hash.new{|h,k|h[k]=[]}在IRB中,它只是初始化一个空哈希。和做有什么区别@load_hooks=Hash.new 最佳答案 查看rubydocumentationforHashnew→new_hashclicktotogglesourcenew(obj)→new_has
我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden
我的主要目标是能够完全理解我正在使用的库/gem。我尝试在Github上从头到尾阅读源代码,但这真的很难。我认为更有趣、更温和的踏脚石就是在使用时阅读每个库/gem方法的源代码。例如,我想知道RubyonRails中的redirect_to方法是如何工作的:如何查找redirect_to方法的源代码?我知道在pry中我可以执行类似show-methodmethod的操作,但我如何才能对Rails框架中的方法执行此操作?您对我如何更好地理解Gem及其API有什么建议吗?仅仅阅读源代码似乎真的很难,尤其是对于框架。谢谢! 最佳答案 Ru
我的假设是moduleAmoduleBendend和moduleA::Bend是一样的。我能够从thisblog找到解决方案,thisSOthread和andthisSOthread.为什么以及什么时候应该更喜欢紧凑语法A::B而不是另一个,因为它显然有一个缺点?我有一种直觉,它可能与性能有关,因为在更多命名空间中查找常量需要更多计算。但是我无法通过对普通类进行基准测试来验证这一点。 最佳答案 这两种写作方法经常被混淆。首先要说的是,据我所知,没有可衡量的性能差异。(在下面的书面示例中不断查找)最明显的区别,可能也是最著名的,是你的
几个月前,我读了一篇关于rubygem的博客文章,它可以通过阅读代码本身来确定编程语言。对于我的生活,我不记得博客或gem的名称。谷歌搜索“ruby编程语言猜测”及其变体也无济于事。有人碰巧知道相关gem的名称吗? 最佳答案 是这个吗:http://github.com/chrislo/sourceclassifier/tree/master 关于ruby-寻找通过阅读代码确定编程语言的rubygem?,我们在StackOverflow上找到一个类似的问题:
我目前正在使用以下方法获取页面的源代码:Net::HTTP.get(URI.parse(page.url))我还想获取HTTP状态,而无需发出第二个请求。有没有办法用另一种方法做到这一点?我一直在查看文档,但似乎找不到我要找的东西。 最佳答案 在我看来,除非您需要一些真正的低级访问或控制,否则最好使用Ruby的内置Open::URI模块:require'open-uri'io=open('http://www.example.org/')#=>#body=io.read[0,50]#=>"["200","OK"]io.base_ur
前言作为一名程序员,自己的本质工作就是做程序开发,那么程序开发的时候最直接的体现就是代码,检验一个程序员技术水平的一个核心环节就是开发时候的代码能力。众所周知,程序开发的水平提升是一个循序渐进的过程,每一位程序员都是从“菜鸟”变成“大神”的,所以程序员在程序开发过程中的代码能力也是根据平时开发中的业务实践来积累和提升的。提高代码能力核心要素程序员要想提高自身代码能力,尤其是新晋程序员的代码能力有很大的提升空间的时候,需要针对性的去提高自己的代码能力。提高代码能力其实有几个比较关键的点,只要把握住这些方面,就能很好的、快速的提高自己的一部分代码能力。1、多去阅读开源项目,如有机会可以亲自参与开源
华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o
嗨~大家好,这里是可莉!今天给大家带来的是7个C语言的经典基础代码~那一起往下看下去把【程序一】打印100到200之间的素数#includeintmain(){ inti; for(i=100;i 【程序二】输出乘法口诀表#includeintmain(){inti;for(i=1;i 【程序三】判断1000年---2000年之间的闰年#includeintmain(){intyear;for(year=1000;year 【程序四】给定两个整形变量的值,将两个值的内容进行交换。这里提供两种方法来进行交换,第一种为创建临时变量来进行交换,第二种是不创建临时变量而直接进行交换。1.创建临时变量来