Netty源码分析之ChannelPipeline

栏目: 编程工具 · 发布时间: 7年前

内容简介:每个channel内部都会持有一个ChannelPipeline对象pipeline. pipeline默认实现DefaultChannelPipeline内部维护了一个DefaultChannelHandlerContext链表。当channel完成register、active、read等操作时,会触发pipeline的相应方法。

每个channel内部都会持有一个ChannelPipeline对象pipeline. pipeline默认实现DefaultChannelPipeline内部维护了一个DefaultChannelHandlerContext链表。

Netty源码分析之ChannelPipeline

当channel完成register、active、read等操作时,会触发pipeline的相应方法。

1、当channel注册到selector时,触发pipeline的fireChannelRegistered方法。

2、当channel的socket绑定完成时,触发pipeline的fireChannelActive方法。

3、当有客户端请求时,触发pipeline的fireChannelRead方法。

4、当本次客户端请求,pipeline执行完fireChannelRead,触发pipeline的fireChannelReadComplete方法。

接下去看看pipeline是如何组织并运行handler对应的方法。

DefaultChannelPipeline

其中DefaultChannelHandlerContext保存了当前handler的上下文,如channel、pipeline等信息,默认实现了head和tail。

class DefaultChannelPipeline implements ChannelPipeline {
 final Channel channel; // pipeline所属的channel
 //head和tail都是handler上下文
 final DefaultChannelHandlerContext head;
 final DefaultChannelHandlerContext tail;
 ...
 public DefaultChannelPipeline(AbstractChannel channel) {
 if (channel == null) {
 throw new NullPointerException("channel");
 }
 this.channel = channel;

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
 } 
}

1、TailContext实现了ChannelOutboundHandler接口。

2、HeadContext实现了ChannelInboundHandler接口。

3、head和tail形成了一个链表。

对于Inbound的操作,当channel注册到selector时,触发pipeline的fireChannelRegistered,从head开始遍历,找到实现了ChannelInboundHandler接口的handler,并执行其fireChannelRegistered方法。

@Override
public ChannelPipeline fireChannelRegistered() {
    head.fireChannelRegistered();
 return this;
}

@Override
public ChannelHandlerContext fireChannelRegistered() {
 final DefaultChannelHandlerContext next = findContextInbound();
 EventExecutor executor = next.executor();
 if (executor.inEventLoop()) {
 next.invokeChannelRegistered();
 } else {
        executor.execute(new Runnable() {
 @Override
 public void run() {
 next.invokeChannelRegistered();
 }
 });
 }
 return this;
}

private DefaultChannelHandlerContext findContextInbound() {
 DefaultChannelHandlerContext ctx = this;
 do {
        ctx = ctx.next;
 } while (!(ctx.handler() instanceof ChannelInboundHandler));
 return ctx;
}

private void invokeChannelRegistered() {
 try {
 ((ChannelInboundHandler) handler()).channelRegistered(this);
 } catch (Throwable t) {
        notifyHandlerException(t);
 }
}

假如我们通过pipeline的addLast方法添加一个inboundHandler实现。

public class ClientHandler extends ChannelInboundHandlerAdapter {
 @Override 
 public void channelRegistered(ChannelHandlerContext ctx) 
 throws Exception { 
 super.channelRegistered(ctx); 
 System.out.println(" ClientHandler  registered channel "); 
 } 
}

当channel注册完成时会触发pipeline的channelRegistered方法,从head开始遍历,找到ClientHandler,并执行channelRegistered方法。

对于Outbound的操作,则从tail向前遍历,找到实现ChannelOutboundHandler接口的handler,具体实现和Inbound一样。

服务启动过程中,ServerBootstrap在init方法中,会给ServerSocketChannel的pipeline添加ChannelInitializer对象,其中ChannelInitializer继承ChannelInboundHandlerAdapter,并实现了ChannelInboundHandler接口,所以当ServerSocketChannel注册到selector之后,会触发其channelRegistered方法。

public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    initChannel((C) ctx.channel());
    ctx.pipeline().remove(this);
    ctx.fireChannelRegistered();
}

public void initChannel(Channel ch) throws Exception {
 ChannelPipeline pipeline = ch.pipeline();
 ChannelHandler handler = handler();
 if (handler != null) {
        pipeline.addLast(handler);
 }
    pipeline.addLast(new ServerBootstrapAcceptor(
            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}

在initChannel实现中,添加ServerBootstrapAcceptor实例到pipeline中。

ServerBootstrapAcceptor继承自ChannelInboundHandlerAdapter,负责把接收到的客户端socketChannel注册到childGroup中,由childGroup中的eventLoop负责数据处理。

public void channelRead(ChannelHandlerContext ctx, Object msg) {
 final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

 for (Entry<ChannelOption<?>, Object> e: childOptions) {
 try {
 if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                logger.warn("Unknown channel option: " + e);
 }
 } catch (Throwable t) {
            logger.warn("Failed to set a channel option: " + child, t);
 }
 }

 for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
 }

 try {
        childGroup.register(child).addListener(new ChannelFutureListener() {
 @Override
 public void operationComplete(ChannelFuture future) throws Exception {
 if (!future.isSuccess()) {
                    forceClose(child, future.cause());
 }
 }
 });
 } catch (Throwable t) {
        forceClose(child, t);
 }
}

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

司法的过程

司法的过程

(美)亨利·J.亚伯拉罕 / 泮伟江 宦盛奎 韩阳 / 北京大学出版社 / 2009-07-28 / 58.00元

本书是以比较研究的方法来分析司法哲学的经典文本之一。作者以敏锐的眼光透视了司法过程背后的理论、实践和参与其中的人。比较了美国、英国、法国的具体法院运作,审视了“司法能动主义”和“司法克制主义”之间的争辩。本书第七版的介绍吸收了美国、英国、法国和欧洲法院体系运作中的最新和重要的发展。 目前国内非常关注司法的运作过程、法官的裁判过程,此书的翻译对于这方面的研究很有助益,对于英国和法国法院的介绍填补了国......一起来看看 《司法的过程》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

MD5 加密
MD5 加密

MD5 加密工具

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具