内容简介:阅读和理解本文需要以下技术储备:深入理解现代计算机CPU计算架构,精通至少一门常用编程语言,至少5年web项目经验。好吧,这是我编的,如果这三条你都满足,对不起打扰了,关闭按钮应该在左上角。为什么会突然想到跟你们聊聊Pipeline呢?有一次跟同事一起去门口的拉面馆拉面吃,漫长的等待中,我发现这儿的后厨师傅只有一个人,一个人负责和面拉面、煮熟、加汤、加牛肉、加萝卜、加香菜。。。因为等的时间太长了,所以记得这么清楚[手动捂脸]。我们等在外面就像这样:
阅读和理解本文需要以下技术储备:深入理解现代计算机CPU计算架构,精通至少一门常用编程语言,至少5年web项目经验。好吧,这是我编的,如果这三条你都满足,对不起打扰了,关闭按钮应该在左上角。
为什么会突然想到跟你们聊聊Pipeline呢?有一次跟同事一起去门口的拉面馆拉面吃,漫长的等待中,我发现这儿的后厨师傅只有一个人,一个人负责和面拉面、煮熟、加汤、加牛肉、加萝卜、加香菜。。。因为等的时间太长了,所以记得这么清楚[手动捂脸]。我们等在外面就像这样:
作为一枚程序猿脑子里就蹦出来Pipeline,不要把逻辑都耦合在一起啊,代码又多又难看。我们进去不到10分钟,排队的人已经排到门外面去了,假如把和面拉面交给一个人;煮面交给一个人;加牛肉、加萝卜、加香菜交给另外一个人,老板啊你一天少说多卖100碗。
什么是Pipeline
Pipeline的工作其实就是将一件需要重复做的事情切割成各个不同的阶段,每一个阶段由独立的单元负责,所有待执行的对象依次进入作业队列。业界用到Pipeline的地方很多,例如CPU的计算单元里,linux管道命令里,netty框架里等等,今天我就以netty框架为依托,从为什么要使用Pipeline,netty里的读写事件是如何依托Pipeline传播的两点跟大家聊聊。
为什么要使用Pipeline
netty的使用场景里一个最常见的就是IM系统的开发,而一个请求从客户端发起之后,大概会经历一下几个步骤:
在“一系列逻辑”部分,最先让人想到的处理方式就是根据某个标识,分辨消息类型,用if/else来进行处理,为什么第一反应是if/else呢?这跟我们的思维方式有很大关系,人第一反应想到的都是先干什么,然后再干什么,这样的处理方式在逻辑很简单的情况下既清晰又快捷,可是如果这个过程包括很多复杂的逻辑(加牛肉、加萝卜、加香菜),势必会导致编写逻辑的类越来越臃肿,直到有一天你自己看自己写的代码都头皮发麻。Pipeline的作用就是组织不同处理器,在netty里就相当于它的大动脉,一个请求在生命周期内都是在Pipeline上的不同位置扭转的,这样既保证了逻辑的清晰,又使代码变得优雅。
读写事件在Pipeline中是如何传播的
在聊事件传播在之前先说下Pipeline的结构,Pipeline的结构是双向链表,每个节点上是包装了具体逻辑处理器channelHandler的channelHandlerContext对象,绕死我了,参考下图:
Pipeline会在创建对应的channel时创建,同时会初始化Pipeline的head和tail两个节点,而我们编写的业务逻辑处理器,调用Pipeline的addLast方法会被添加到head节点和tail节点之间。netty的事件从类型上可以分为inbound和outbound事件,我们在编写具体逻辑处理器时也会继承ChannelInboundHandlerAdapter或者ChannelOutboundHandlerAdapter,也就标识了事件处理的方向。在Pipeline的源码里,有个很贴心的图,贴出来给大家回忆一下:
先说结论:inbound事件的传播顺序是和添加顺序正相关,比如添加顺序是A—>B—>C,则netty在处理时的顺序也是A—>B—>C;而outbound事件的传播顺序是和添加顺序逆相关,比如添加顺序是A—>B—>C,则处理顺序是C—>B—>A;下面重点以inbound事件为例讲解。
从ChannelInboundHandlerAdapter开始:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
读事件的传播会调用ctx.fireChannelRead(msg)方法,该方法回调到AbstractChannelHandlerContext
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
我们再看看findContextInbound()方法
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound); return ctx;
}
恍然大悟有没有,我们说过Pipeline是双向链表结构,在处理完当前handler后,针对读事件,会去从当前节点开始往后循环,找到下一个inbound事件,执行具体读事件后继续调用fireChannelRead,就是一个递归的过程;
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {//事件在netty线程组里
next.invokeChannelRead(m);
} else {
...
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);//此处会调用到添加的inboundhandler
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
那netty又是怎么判断事件是inbound还是outbound的呢?
我们上面提过,Pipeline的每个节点上是一个channelHandlerContext对象,具体业务逻辑处理器在放入Pipeline之前,会被封装成channelHandlerContext对象,到DefaultChannelPipeline里:
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
…
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
…
}
关键代码就是这个newContext操作,可以看到这里传入的handler被封装成一个channelHandlerContext对象,然后再执行addLast0操作,继续往newContext方法里看,来到DefaultChannelHandlerContext里:
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
} this.handler = handler;
}
我们在DefaultChannelHandlerContext的构造器里,看到一个isInbound方法,对,没错,就是这里判断的添加的handler是入还是出,逻辑很简单,一个instanceof关键字搞定。
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
还有个点需要和小伙伴们强调一下,在具体的逻辑处理器中,我们可以有两种主动发起事件传播的方式:
ctx.pipeline().fireChannelRead(msg); ctx.fireChannelRead(msg);
第一种是从头节点开始往后开始查找,而第二种则是从调用方法的当前节点开始查找紧邻的inbound事件,两种方式的区别不言而喻,那如何选择呢?netty这么优秀,怎么会让你自己负责事件传播这种和业务无关的事情呢?实际开发中只需要将自定义handler继承SimpleChannelInboundHandler类,实现channelRead0方法即可,上面说的buf释放,事件传播,在父类里都给帮你干了。
没看过瘾?写长了您也没时间看啊。留个家庭作业,各位看官自己研究下netty里写事件是怎么传播的,异常事件又是怎么传播的吧?提个醒:我们是聊Pipeline的,不要身陷源码细节~
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Algorithms + Data Structures = Programs
Niklaus Wirth / Prentice Hall / 1975-11-11 / GBP 84.95
It might seem completely dated with all its examples written in the now outmoded Pascal programming language (well, unless you are one of those Delphi zealot trying to resist to the Java/.NET dominanc......一起来看看 《Algorithms + Data Structures = Programs》 这本书的介绍吧!