兄弟连区块链入门教程以太坊源码分析p2p-peer.go源码分析

栏目: 服务器 · 发布时间: 6年前

内容简介:nat是网络地址转换的意思。 这部分的源码比较独立而且单一,这里就暂时不分析了。 大家了解基本的功能就行了。nat下面有upnp和pmp两种网络协议。upnp的应用场景(pmp是和upnp类似的协议)

nat是网络地址转换的意思。 这部分的源码比较独立而且单一,这里就暂时不分析了。 大家了解基本的功能就行了。

nat下面有upnp和pmp两种网络协议。

upnp的应用场景(pmp是和upnp类似的协议)

如果用户是通过NAT接入Internet的,同时需要使用BC、电骡eMule等P2P这样的软件,这时UPnP功能就会带来很大的便利。利用UPnP能自动的把BC、电骡eMule等侦听的端口号映射到公网上,以便公网上的用户也能对NAT私网侧发起连接。

主要功能就是提供接口可以把内网的IP+端口 映射为 路由器的IP+端口。 这样就等于内网的程序有了外网的IP地址, 这样公网的用户就可以直接对你进行访问了。 不然就需要通过UDP打洞这种方式来进行访问。

p2p中的UDP协议

现在大部分用户运行的环境都是内网环境。 内网环境下监听的端口,其他公网的程序是无法直接访问的。需要经过一个打洞的过程。 双方才能联通。这就是所谓的UDP打洞。

在p2p代码里面。 peer代表了一条创建好的网络链路。在一条链路上可能运行着多个协议。比如以太坊的协议(eth)。 Swarm的协议。 或者是Whisper的协议。

peer的结构

type protoRW struct {
    Protocol
    in     chan Msg        // receices read messages
    closed <-chan struct{} // receives when peer is shutting down
    wstart <-chan struct{} // receives when write may start
    werr   chan<- error    // for write results
    offset uint64
    w      MsgWriter
}

// Protocol represents a P2P subprotocol implementation.
type Protocol struct {
    // Name should contain the official protocol name,
    // often a three-letter word.
    Name string

    // Version should contain the version number of the protocol.
    Version uint

    // Length should contain the number of message codes used
    // by the protocol.
    Length uint64

    // Run is called in a new groutine when the protocol has been
    // negotiated with a peer. It should read and write messages from
    // rw. The Payload for each message must be fully consumed.
    //
    // The peer connection is closed when Start returns. It should return
    // any protocol-level error (such as an I/O error) that is
    // encountered.
    Run func(peer *Peer, rw MsgReadWriter) error

    // NodeInfo is an optional helper method to retrieve protocol specific metadata
    // about the host node.
    NodeInfo func() interface{}

    // PeerInfo is an optional helper method to retrieve protocol specific metadata
    // about a certain peer in the network. If an info retrieval function is set,
    // but returns nil, it is assumed that the protocol handshake is still running.
    PeerInfo func(id discover.NodeID) interface{}
}

// Peer represents a connected remote node.
type Peer struct {
    rw      *conn
    running map[string]*protoRW   //运行的协议
    log     log.Logger
    created mclock.AbsTime

    wg       sync.WaitGroup
    protoErr chan error
    closed   chan struct{}
    disc     chan DiscReason

    // events receives message send / receive events if set
    events *event.Feed
}

peer的创建,根据匹配找到当前Peer支持的protomap

func newPeer(conn conn, protocols []Protocol) Peer {

protomap := matchProtocols(protocols, conn.caps, conn)
p := &Peer{
    rw:       conn,
    running:  protomap,
    created:  mclock.Now(),
    disc:     make(chan DiscReason),
    protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop
    closed:   make(chan struct{}),
    log:      log.New("id", conn.id, "conn", conn.flags),
}
return p

}

peer的启动, 启动了两个goroutine线程。 一个是读取。一个是执行ping操作。

func (p *Peer) run() (remoteRequested bool, err error) {

var (
    writeStart = make(chan struct{}, 1)  //用来控制什么时候可以写入的管道。
    writeErr   = make(chan error, 1)
    readErr    = make(chan error, 1)
    reason     DiscReason // sent to the peer
)
p.wg.Add(2)
go p.readLoop(readErr)
go p.pingLoop()

// Start all protocol handlers.
writeStart <- struct{}{}
//启动所有的协议。
p.startProtocols(writeStart, writeErr)

// Wait for an error or disconnect.

loop:

for {
    select {
    case err = <-writeErr:
        // A write finished. Allow the next write to start if
        // there was no error.
        if err != nil {
            reason = DiscNetworkError
            break loop
        }
        writeStart <- struct{}{}
    case err = <-readErr:
        if r, ok := err.(DiscReason); ok {
            remoteRequested = true
            reason = r
        } else {
            reason = DiscNetworkError
        }
        break loop
    case err = <-p.protoErr:
        reason = discReasonForError(err)
        break loop
    case err = <-p.disc:
        break loop
    }
}

close(p.closed)
p.rw.close(reason)
p.wg.Wait()
return remoteRequested, err

}

startProtocols方法,这个方法遍历所有的协议。

func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) {

p.wg.Add(len(p.running))
for _, proto := range p.running {
    proto := proto
    proto.closed = p.closed
    proto.wstart = writeStart
    proto.werr = writeErr
    var rw MsgReadWriter = proto
    if p.events != nil {
        rw = newMsgEventer(rw, p.events, p.ID(), proto.Name)
    }
    p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version))
    // 等于这里为每一个协议都开启了一个goroutine。 调用其Run方法。
    go func() {
        // proto.Run(p, rw)这个方法应该是一个死循环。 如果返回就说明遇到了错误。
        err := proto.Run(p, rw)
        if err == nil {
            p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version))
            err = errProtocolReturned
        } else if err != io.EOF {
            p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err)
        }
        p.protoErr <- err
        p.wg.Done()
    }()
}

}

回过头来再看看readLoop方法。 这个方法也是一个死循环。 调用p.rw来读取一个Msg(这个rw实际是之前提到的frameRLPx的对象,也就是分帧之后的对象。然后根据Msg的类型进行对应的处理,如果Msg的类型是内部运行的协议的类型。那么发送到对应协议的proto.in队列上面。

func (p *Peer) readLoop(errc chan<- error) {

defer p.wg.Done()
for {
    msg, err := p.rw.ReadMsg()
    if err != nil {
        errc <- err
        return
    }
    msg.ReceivedAt = time.Now()
    if err = p.handle(msg); err != nil {
        errc <- err
        return
    }
}

}

func (p *Peer) handle(msg Msg) error {

switch {
case msg.Code == pingMsg:
    msg.Discard()
    go SendItems(p.rw, pongMsg)
case msg.Code == discMsg:
    var reason [1]DiscReason
    // This is the last message. We don't need to discard or
    // check errors because, the connection will be closed after it.
    rlp.Decode(msg.Payload, &reason)
    return reason[0]
case msg.Code < baseProtocolLength:
    // ignore other base protocol messages
    return msg.Discard()
default:
    // it's a subprotocol message
    proto, err := p.getProto(msg.Code)
    if err != nil {
        return fmt.Errorf("msg code out of range: %v", msg.Code)
    }
    select {
    case proto.in <- msg:
        return nil
    case <-p.closed:
        return io.EOF
    }
}
return nil

}

在看看pingLoop。这个方法很简单。就是定时的发送pingMsg消息到对端。

func (p *Peer) pingLoop() {

ping := time.NewTimer(pingInterval)
defer p.wg.Done()
defer ping.Stop()
for {
    select {
    case <-ping.C:
        if err := SendItems(p.rw, pingMsg); err != nil {
            p.protoErr <- err
            return
        }
        ping.Reset(pingInterval)
    case <-p.closed:
        return
    }
}

}

最后再看看protoRW的read和write方法。 可以看到读取和写入都是阻塞式的。

func (rw *protoRW) WriteMsg(msg Msg) (err error) {

if msg.Code >= rw.Length {
    return newPeerError(errInvalidMsgCode, "not handled")
}
msg.Code += rw.offset
select {
case <-rw.wstart:  //等到可以写入的受在执行写入。 这难道是为了多线程控制么。
    err = rw.w.WriteMsg(msg)
    // Report write status back to Peer.run. It will initiate
    // shutdown if the error is non-nil and unblock the next write
    // otherwise. The calling protocol code should exit for errors
    // as well but we don't want to rely on that.
    rw.werr <- err
case <-rw.closed:
    err = fmt.Errorf("shutting down")
}
return err

}

func (rw *protoRW) ReadMsg() (Msg, error) {

select {
case msg := <-rw.in:
    msg.Code -= rw.offset
    return msg, nil
case <-rw.closed:
    return Msg{}, io.EOF
}

}


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Zen of CSS Design

The Zen of CSS Design

Dave Shea、Molly E. Holzschlag / Peachpit Press / 2005-2-27 / USD 44.99

Proving once and for all that standards-compliant design does not equal dull design, this inspiring tome uses examples from the landmark CSS Zen Garden site as the foundation for discussions on how to......一起来看看 《The Zen of CSS Design》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具