【源码阅读】交易池txs_fetcher&txs_noncer

【源码阅读】交易池txs_fetcher&txs_noncertxs noncer

大家好,欢迎来到IT知识分享网。

txs_noncer

1、 txNoncer

type txNoncer struct { 
    fallback evmtypes.IntraBlockState nonces map[types.Address]uint64 lock sync.Mutex } 
  1. fallback evmtypes.IntraBlockState:表示回退状态,用于在没有找到对应地址的nonce时使用。
  2. nonces map[types.Address]uint64:表示一个映射,用于存储每个地址对应的nonce值。
  3. lock sync.Mutex:表示一个互斥锁,用于在多线程环境下保护对nonces映射的操作。

1.1 newTxNoncer

func newTxNoncer(db evmtypes.IntraBlockState) *txNoncer { 
    return &txNoncer{ 
    fallback: db, nonces: make(map[types.Address]uint64), } } 

创建一个虚拟的状态数据库来追踪交易池里面的nonce

1.2 get

func (txn *txNoncer) get(addr types.Address) uint64 
  • get 返回帐户的当前随机数return txn.nonces[addr]
  • 如果找不到,则回退到真实状态数据库txn.nonces[addr] = txn.fallback.GetNonce(addr)
  • 需要上锁txn.lock.Lock()

1.3 set

func (txn *txNoncer) set(addr types.Address, nonce uint64) 
  • set 将新的虚拟随机数插入到虚拟状态数据库中txn.nonces[addr] = nonce
  • 需要上锁txn.lock.Lock()

1.4 setIfLower

func (txn *txNoncer) setIfLower(addr types.Address, nonce uint64) 
  • 如果新的虚拟随机数较低,则 setIfLower 将新的虚拟随机数更新到虚拟状态数据库中。
if txn.nonces[addr] <= nonce { 
    return } txn.nonces[addr] = nonce 
  • 需要上锁txn.lock.Lock()
  • 如果找不到,则回退到真实状态数据库txn.nonces[addr] = txn.fallback.GetNonce(addr)

1.5 setAll

func (txn *txNoncer) setAll(all map[types.Address]uint64) 
  • setAll 将所有帐户的随机数设置为给定映射txn.nonces = all
  • 需要上锁txn.lock.Lock()

2、txsRequest

type txsRequest struct { 
    hashes hashes bloom *types.Bloom peer peer.ID } 
  1. hashes hashes:表示交易哈希值。
  2. bloom *types.Bloom:表示布隆过滤器,用于快速检查某个哈希值是否在集合中。
  3. peer peer.ID:表示请求发送者的节点ID。

3、TxsFetcher

type TxsFetcher struct { 
    quit chan struct{ 
   } finished bool peers common.PeerMap p2pServer common.INetwork peerRequests map[peer.ID]*txsRequest // other peer requests bloom *types.Bloom fetched map[types.Hash]bool // getTx func(hash types.Hash) *transaction.Transaction addTxs func([]*transaction.Transaction) []error pendingTxs func(enforceTips bool) map[types.Address][]*transaction.Transaction // fetchTxs //fetchTxs func(string, []types.Hash) // ch peerJoinCh chan *common.PeerJoinEvent peerDropCh chan *common.PeerDropEvent // ctx context.Context cancel context.CancelFunc } 
  1. quit chan struct{}:一个用于退出的通道。
  2. finished bool:表示是否已完成。
  3. peers common.PeerMap:表示节点映射。
  4. p2pServer common.INetwork:表示P2P网络接口。
  5. peerRequests map[peer.ID]*txsRequest:表示其他节点请求。
  6. bloom *types.Bloom:表示布隆过滤器。
  7. fetched map[types.Hash]bool:表示已获取的交易哈希值。
  8. getTx func(hash types.Hash) *transaction.Transaction:表示获取交易的函数。
  9. addTxs func([]*transaction.Transaction) []error:表示添加交易的函数。
  10. pendingTxs func(enforceTips bool) map[types.Address][]*transaction.Transaction:表示获取待处理交易的函数。
  11. peerJoinCh chan *common.PeerJoinEvent:表示节点加入事件的通道。
  12. peerDropCh chan *common.PeerDropEvent:表示节点离开事件的通道。
  13. ctx context.Context:表示上下文。
  14. cancel context.CancelFunc:表示取消函数。

3.1 Start

func (f TxsFetcher) Start() error { 
    go f.sendBloomTransactionLoop() go f.bloomBroadcastLoop() return nil } 

调用sendBloomTransactionLoopbloomBroadcastLoop

3.1.1 sendBloomTransactionLoop

func (f TxsFetcher) sendBloomTransactionLoop() 
  • 创建定时器tick := time.NewTicker(BloomSendTransactionTime),在时间内进行
  • 片段1
for i := 0; i < BloomSendMaxTransactions; i++ { 
    hash := req.hashes.pop() tx := f.getTx(hash) txs = append(txs, tx.ToProtoMessage().(*types_pb.Transaction)) } 

在定时时间内,根据网络获得hash,从而通过getTx获得交易,添加进交易列表中

  • 片段2
msg := &sync_proto.SyncTask{ 
    Id: rand.Uint64(), Ok: true, SyncType: sync_proto.SyncType_TransactionRes, Payload: &sync_proto.SyncTask_SyncTransactionResponse{ 
    SyncTransactionResponse: &sync_proto.SyncTransactionResponse{ 
    Transactions: txs, }, }, } 

这段代码的作用是创建一个同步任务,并将其赋值给msg变量。

  1. Id:一个随机生成的无符号64位整数,用于唯一标识同步任务。
  2. Ok:一个布尔值,表示同步任务是否成功。在这个例子中,它被设置为true
  3. SyncType:一个枚举类型,表示同步任务的类型。在这个例子中,它被设置为sync_proto.SyncType_TransactionRes,表示这是一个事务响应类型的同步任务。
  4. Payload:一个指向sync_proto.SyncTask_SyncTransactionResponse类型的指针,表示同步任务的有效载荷。在这个例子中,它包含了一个名为SyncTransactionResponse的字段,该字段是一个指向sync_proto.SyncTransactionResponse类型的指针,其中包含了一个名为Transactions的字段,该字段是一个包含多个交易信息的切片。
  • 如果获得通道信息,就表示已完成,则返回case <-f.ctx.Done():

3.1.2 bloomBroadcastLoop

func (f TxsFetcher) bloomBroadcastLoop() 
  • 创建定时器tick := time.NewTicker(BloomFetcherMaxTime)
  • 序列化为字节bloom, err := f.bloom.Marshal()
  • 片段1
msg := &sync_proto.SyncTask{ 
    Id: rand.Uint64(), Ok: true, SyncType: sync_proto.SyncType_TransactionReq, Payload: &sync_proto.SyncTask_SyncTransactionRequest{ 
    SyncTransactionRequest: &sync_proto.SyncTransactionRequest{ 
    Bloom: bloom, }, }, } 

理解同上,创建一个同步任务,并将其赋值给msg变量。

  • 这段代码是Go语言编写的,它处理了一个名为peerJoinCh的通道。当从该通道接收到数据时,它会将数据存储在f.peers字典中,并使用WriteMsg方法向对应的peer发送一个消息。

以下是对代码的解析:

case peerId := <-f.peerJoinCh: // 从 f.peerJoinCh 通道接收数据,并将其赋值给 peerId 变量 // peerId 是一个结构体,包含了要加入的 peer 的信息 f.peers[peerId.Peer].WriteMsg(message.MsgTransaction, request) // 通过 peerId.Peer 获取对应的 peer 对象 // 使用 WriteMsg 方法向该 peer 发送一个类型为 message.MsgTransaction 的消息,并将 request 作为参数传递 
  • peerJoinCh 用于接收新加入的 peer 的信息,而 peers 用于存储已加入的 peer 对象。
case peerId := <-f.peerJoinCh: f.peers[peerId.Peer].WriteMsg(message.MsgTransaction, request) 

4 ConnHandler

func (f TxsFetcher) ConnHandler(data []byte, ID peer.ID) error 
  • 根据id是否能获取返回值_, ok := f.peers[ID]
  • 获取同步任务syncTask := sync_proto.SyncTask{}
  • 处理同步任务的请求
  1. 当接收到类型为sync_proto.SyncType_TransactionReq的同步任务时,它会解析请求中的事务信息,并检查是否已经存在该peer的请求_, ok := f.peerRequests[ID]
  2. 如果不存在,则创建一个新的bloom过滤器bloom := new(types.Bloom)并将请求中的bloom数据反序列化到新的bloom过滤器中
  3. 然后创建交易列表txs和哈希hash
  4. 遍历所有待处理的交易pending,将不在bloom过滤器中的交易哈希值if !bloom.Contain(hash.Bytes())添加到一个列表中hashes = append(hashes, hash)
  5. 将这些哈希值存储在f.peerRequests[ID]中,以便后续处理
f.peerRequests[ID] = &txsRequest{ 
    bloom: bloom, hashes: hashes, peer: ID, } 

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/155504.html

(0)
上一篇 2025-02-18 20:15
下一篇 2025-02-18 20:20

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信