内容简介:nat是网络地址转换的意思。 这部分的源码比较独立而且单一,这里就暂时不分析了。 大家了解基本的功能就行了。nat下面有upnp和pmp两种网络协议。upnp的应用场景(pmp是和upnp类似的协议)
nat是网络地址转换的意思。 这部分的源码比较独立而且单一,这里就暂时不分析了。 大家了解基本的功能就行了。
nat下面有upnp和pmp两种网络协议。
upnp的应用场景(pmp是和upnp类似的协议)
如果用户是通过NAT接入Internet的,同时需要使用BC、电骡eMule等P2P这样的软件,这时UPnP功能就会带来很大的便利。利用UPnP能自动的把BC、电骡eMule等侦听的端口号映射到公网上,以便公网上的用户也能对NAT私网侧发起连接。
主要功能就是提供接口可以把内网的IP+端口 映射为 路由器的IP+端口。 这样就等于内网的程序有了外网的IP地址, 这样公网的用户就可以直接对你进行访问了。 不然就需要通过UDP打洞这种方式来进行访问。
p2p中的UDP协议
现在大部分用户运行的环境都是内网环境。 内网环境下监听的端口,其他公网的程序是无法直接访问的。需要经过一个打洞的过程。 双方才能联通。这就是所谓的UDP打洞。
在p2p代码里面。 peer代表了一条创建好的网络链路。在一条链路上可能运行着多个协议。比如以太坊的协议(eth)。 Swarm的协议。 或者是Whisper的协议。
peer的结构
type protoRW struct { Protocol in chan Msg // receices read messages closed <-chan struct{} // receives when peer is shutting down wstart <-chan struct{} // receives when write may start werr chan<- error // for write results offset uint64 w MsgWriter } // Protocol represents a P2P subprotocol implementation. type Protocol struct { // Name should contain the official protocol name, // often a three-letter word. Name string // Version should contain the version number of the protocol. Version uint // Length should contain the number of message codes used // by the protocol. Length uint64 // Run is called in a new groutine when the protocol has been // negotiated with a peer. It should read and write messages from // rw. The Payload for each message must be fully consumed. // // The peer connection is closed when Start returns. It should return // any protocol-level error (such as an I/O error) that is // encountered. Run func(peer *Peer, rw MsgReadWriter) error // NodeInfo is an optional helper method to retrieve protocol specific metadata // about the host node. NodeInfo func() interface{} // PeerInfo is an optional helper method to retrieve protocol specific metadata // about a certain peer in the network. If an info retrieval function is set, // but returns nil, it is assumed that the protocol handshake is still running. PeerInfo func(id discover.NodeID) interface{} } // Peer represents a connected remote node. type Peer struct { rw *conn running map[string]*protoRW //运行的协议 log log.Logger created mclock.AbsTime wg sync.WaitGroup protoErr chan error closed chan struct{} disc chan DiscReason // events receives message send / receive events if set events *event.Feed }
peer的创建,根据匹配找到当前Peer支持的protomap
func newPeer(conn conn, protocols []Protocol) Peer {
protomap := matchProtocols(protocols, conn.caps, conn) p := &Peer{ rw: conn, running: protomap, created: mclock.Now(), disc: make(chan DiscReason), protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop closed: make(chan struct{}), log: log.New("id", conn.id, "conn", conn.flags), } return p
}
peer的启动, 启动了两个goroutine线程。 一个是读取。一个是执行ping操作。
func (p *Peer) run() (remoteRequested bool, err error) {
var ( writeStart = make(chan struct{}, 1) //用来控制什么时候可以写入的管道。 writeErr = make(chan error, 1) readErr = make(chan error, 1) reason DiscReason // sent to the peer ) p.wg.Add(2) go p.readLoop(readErr) go p.pingLoop() // Start all protocol handlers. writeStart <- struct{}{} //启动所有的协议。 p.startProtocols(writeStart, writeErr) // Wait for an error or disconnect.
loop:
for { select { case err = <-writeErr: // A write finished. Allow the next write to start if // there was no error. if err != nil { reason = DiscNetworkError break loop } writeStart <- struct{}{} case err = <-readErr: if r, ok := err.(DiscReason); ok { remoteRequested = true reason = r } else { reason = DiscNetworkError } break loop case err = <-p.protoErr: reason = discReasonForError(err) break loop case err = <-p.disc: break loop } } close(p.closed) p.rw.close(reason) p.wg.Wait() return remoteRequested, err
}
startProtocols方法,这个方法遍历所有的协议。
func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) {
p.wg.Add(len(p.running)) for _, proto := range p.running { proto := proto proto.closed = p.closed proto.wstart = writeStart proto.werr = writeErr var rw MsgReadWriter = proto if p.events != nil { rw = newMsgEventer(rw, p.events, p.ID(), proto.Name) } p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version)) // 等于这里为每一个协议都开启了一个goroutine。 调用其Run方法。 go func() { // proto.Run(p, rw)这个方法应该是一个死循环。 如果返回就说明遇到了错误。 err := proto.Run(p, rw) if err == nil { p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version)) err = errProtocolReturned } else if err != io.EOF { p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err) } p.protoErr <- err p.wg.Done() }() }
}
回过头来再看看readLoop方法。 这个方法也是一个死循环。 调用p.rw来读取一个Msg(这个rw实际是之前提到的frameRLPx的对象,也就是分帧之后的对象。然后根据Msg的类型进行对应的处理,如果Msg的类型是内部运行的协议的类型。那么发送到对应协议的proto.in队列上面。
func (p *Peer) readLoop(errc chan<- error) {
defer p.wg.Done() for { msg, err := p.rw.ReadMsg() if err != nil { errc <- err return } msg.ReceivedAt = time.Now() if err = p.handle(msg); err != nil { errc <- err return } }
}
func (p *Peer) handle(msg Msg) error {
switch { case msg.Code == pingMsg: msg.Discard() go SendItems(p.rw, pongMsg) case msg.Code == discMsg: var reason [1]DiscReason // This is the last message. We don't need to discard or // check errors because, the connection will be closed after it. rlp.Decode(msg.Payload, &reason) return reason[0] case msg.Code < baseProtocolLength: // ignore other base protocol messages return msg.Discard() default: // it's a subprotocol message proto, err := p.getProto(msg.Code) if err != nil { return fmt.Errorf("msg code out of range: %v", msg.Code) } select { case proto.in <- msg: return nil case <-p.closed: return io.EOF } } return nil
}
在看看pingLoop。这个方法很简单。就是定时的发送pingMsg消息到对端。
func (p *Peer) pingLoop() {
ping := time.NewTimer(pingInterval) defer p.wg.Done() defer ping.Stop() for { select { case <-ping.C: if err := SendItems(p.rw, pingMsg); err != nil { p.protoErr <- err return } ping.Reset(pingInterval) case <-p.closed: return } }
}
最后再看看protoRW的read和write方法。 可以看到读取和写入都是阻塞式的。
func (rw *protoRW) WriteMsg(msg Msg) (err error) {
if msg.Code >= rw.Length { return newPeerError(errInvalidMsgCode, "not handled") } msg.Code += rw.offset select { case <-rw.wstart: //等到可以写入的受在执行写入。 这难道是为了多线程控制么。 err = rw.w.WriteMsg(msg) // Report write status back to Peer.run. It will initiate // shutdown if the error is non-nil and unblock the next write // otherwise. The calling protocol code should exit for errors // as well but we don't want to rely on that. rw.werr <- err case <-rw.closed: err = fmt.Errorf("shutting down") } return err
}
func (rw *protoRW) ReadMsg() (Msg, error) {
select { case msg := <-rw.in: msg.Code -= rw.offset return msg, nil case <-rw.closed: return Msg{}, io.EOF }
}
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] nfs-client-provisioner源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Zen of CSS Design
Dave Shea、Molly E. Holzschlag / Peachpit Press / 2005-2-27 / USD 44.99
Proving once and for all that standards-compliant design does not equal dull design, this inspiring tome uses examples from the landmark CSS Zen Garden site as the foundation for discussions on how to......一起来看看 《The Zen of CSS Design》 这本书的介绍吧!