Netty 源码解析系列-客户端连接接入及读I/O解析

栏目: Java · 发布时间: 6年前

内容简介:根据k.attachment()获取附加的对象,那我们是在哪里附加上去的呢?上一篇

上一章节《Netty 源码解析系列-服务端启动流程解析》 我们完成了服务端启动,那么服务端启动完成后,客户端接入以及读 I/O 事件是怎么哪里开始的?以及 nettyboss 线程接收到客户端 TCP 连接请求后如何将链路注册到 worker 线程池?带着这些疑问,我们开始客户端连接接入及读写 I/O 解析。

1.NioEventLoop run()开始

processSelectedKeys();
复制代码
private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized(selectedKeys.flip());
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}
复制代码

根据 selectedKeys 是否为空,判断是否采用优化后的 selectedKeys ,进到 processSelectedKeysOptimized

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
    for (int i = 0;; i ++) {
        final SelectionKey k = selectedKeys[i];
        if (k == null) {
            break;
        }
        selectedKeys[i] = null;

        final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {
               processSelectedKey(k, (AbstractNioChannel) a);
        } else {
               @SuppressWarnings("unchecked")
               NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
               processSelectedKey(k, task);
        }
            ...
}
}
复制代码

k.attachment()获取附加的对象,那我们是在哪里附加上去的呢?上一篇 《Netty 源码解析-服务端启动流程解析》 注册时 attach 上去的对象,其实就是 NioServerSocketChannel 自身。

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
	...
	selectionKey = javaChannel().register(eventLoop().selector, 0, this);
	...        
    }
}
复制代码

我们再回到 k.attachment() ,在取出附加对象后,判断类型是否为 AbstractNioChannel ,从这里我们可以看到,不是附加 AbstractNioChannel 类型,那么就是附加的 NioTask 对象,在这里我们只看关于 AbstractNioChannel 的,进到 processSelectedKey() 方法。

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final NioUnsafe unsafe = ch.unsafe();
    ...
    int readyOps = k.readyOps();
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
         unsafe.read();
    if (!ch.isOpen()) {
         return;
    }
    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
          ch.unsafe().forceFlush();
    }
    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
          int ops = k.interestOps();
          ops &= ~SelectionKey.OP_CONNECT;
          k.interestOps(ops);
          unsafe.finishConnect();
    }
    ...
}
复制代码

当操作类型是读操作或者连接操作,进入 unsafe.read() ,有两个类实现了这个方法,一个是 AbstractNioByteChannel 的内部类 NioByteUnsafe ,一个是 AbstractNioMessageChannel 的内部类 NioMessageUnsafe ,这两个类都是 NioUnsafe 实现类 AbstractNioChannel 的子类,那到底是哪一个子类?我们看看 NioServerSocketChannel 创建时是创建的 NioByteUnsafe 还是 NioMessageUnsafe

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {
        public NioServerSocketChannel() {
                this(newSocket(DEFAULT_SELECTOR_PROVIDER));
        }
}
复制代码
public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
复制代码
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
    protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
          super(parent, ch, readInterestOp);
      }
}
复制代码
public abstract class AbstractNioChannel extends AbstractChannel {
	protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    		super(parent);
	}
}

复制代码
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
        protected AbstractChannel(Channel parent) {
                this.parent = parent;
                unsafe = newUnsafe();
                pipeline = new DefaultChannelPipeline(this);
        }
}
复制代码

NioServerSocketChannel是 AbstractNioMessageChannel 的子类, AbstractNioMessageChannelAbstractNioChannel 的子类, newUnsafe()AbstractChannel 的抽象方法,那么我们从这里就知道, AbstractNioMessageChannel 实现了 AbstractChannel的newUnsafe() 抽象方法,由此判断,我们选择 AbstractNioMessageChannel 的内部类 NioMessageUnsaferead()

private final class NioMessageUnsafe extends AbstractNioUnsafe {
    private final List<Object> readBuf = new ArrayList<Object>();
    @Override
    public void read() {
        ...
        for (;;) {
           int localRead = doReadMessages(readBuf);
           ...
    }
    setReadPending(false);
    int size = readBuf.size();
    for (int i = 0; i < size; i ++) {
            pipeline.fireChannelRead(readBuf.get(i));
    }
    readBuf.clear();
    pipeline.fireChannelReadComplete();
    ...
}
复制代码

这里分两部分,一个是处理消息,一个是处理事件。

1.处理消息

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = javaChannel().accept();
    ...
    buf.add(new NioSocketChannel(this, ch));
    return 1;
    ...
}
复制代码

接受了一个客户端 SocketChannel ,封装到 NioSocketChannel ,添加到 list 集合中,我们看看 new NioSocketChannel()

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
	public NioSocketChannel(Channel parent, SocketChannel socket) {
    		super(parent, socket);
    		config = new NioSocketChannelConfig(this, socket.socket());
	}
}
复制代码
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
	protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    		super(parent, ch, SelectionKey.OP_READ);
	}

    @Override
    protected AbstractNioUnsafe newUnsafe() {
    	return new NioByteUnsafe();
    }

    protected class NioByteUnsafe extends AbstractNioUnsafe {
	    @Override
	    public final void read() {
		    ...
	    }
    }
}
复制代码

AbstractNioByteChannel也继承了 AbstractNioChannel ,并实现了 newUnsafe() 方法,由此我们可以推断出当客户端第一次连接时,走的是 AbstractNioMessageChannel 的子类 NioMessageUnsafe的read() ,当客户端发送数据时,走的是 AbstractNioByteChannel 的内部类 AbstractNioUnsaferead() 方法。

2.处理事件

for (int i = 0; i < size; i ++) {
    	   pipeline.fireChannelRead(readBuf.get(i));
     }

复制代码
@Override
public ChannelPipeline fireChannelRead(Object msg) {
    head.fireChannelRead(msg);
    return this;
}
复制代码
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    final AbstractChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(msg);
    } else {
        executor.execute(new OneTimeTask() {
            @Override
            public void run() {
                next.invokeChannelRead(msg);
            }
        });
    }
    return this;
}
复制代码

Netty 源码解析系列-客户端连接接入及读I/O解析
nextdebug 可以看出,当前 handlerServerBootstrapAcceptor 这个处理器来处理 ChannelRead() 方法,如果看了 上一篇《Netty 源码解析-服务端启动流程解析》 就会知道,这是在 init() 方法中 pipeline.addLast(new ServerBootstrapAcceptor()) 。为什么不是 p.addLast(new ChannelInitializer())? 因为在 ChannelInitializer.channelRegistered() 会删除当前 initChannel

处理器。

public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    initChannel((C) ctx.channel());
    ctx.pipeline().remove(this);
    ctx.fireChannelRegistered();
}
复制代码

我们继续看 ServerBootstrapAcceptorChannelRead() 方法。

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
    child.pipeline().addLast(childHandler);
    for (Entry<ChannelOption<?>, Object> e: childOptions) {
       try {
          if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
              logger.warn("Unknown channel option: " + e);
          }
        } catch (Throwable t) {
              logger.warn("Failed to set a channel option: " + child, t);
        }
    }
    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
         child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }
    try {
        childGroup.register(child).addListener(new ChannelFutureListener() {
           @Override
           public void operationComplete(ChannelFuture future) throws Exception {
               if (!future.isSuccess()) {
                   forceClose(child, future.cause());
                }
            }
        });
     } catch (Throwable t) {
           forceClose(child, t);
     }
}
复制代码

这里分三个步骤

(1)将 childHandler 添加到处理器上,这个从哪里来?就是从最开始设置 serverBootstrap.childHandler(new IOChannelInitialize())

(2)设置一些参数。

(3) work线程池 register 客户端的 channel

@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

复制代码
@Override
public EventLoop next() {
    return (EventLoop) super.next();
}
复制代码
@Override
public EventExecutor next() {
    return chooser.next();
}
复制代码
private final class GenericEventExecutorChooser implements EventExecutorChooser {
    @Override
    public EventExecutor next() {
        return children[Math.abs(childIndex.getAndIncrement() % children.length)];
    }
}
复制代码

work 线程池选一个线程来执行 register

@Override
public ChannelFuture register(Channel channel) {
    return register(channel, new DefaultChannelPromise(channel, this));
}
复制代码
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
	 ...
        channel.unsafe().register(this, promise);
        return promise;
}
复制代码
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
	 ...
     AbstractChannel.this.eventLoop = eventLoop;
     if (eventLoop.inEventLoop()) {
     register0(promise);
     } else {
          try {
              eventLoop.execute(new OneTimeTask() {
              @Override
              public void run() {
                 register0(promise);
              }
              });
           } catch (Throwable t) {
	            ...
           }
     }
}
复制代码
@Override
protected void doRegister() throws Exception {
	...
	selectionKey = javaChannel().register(eventLoop().selector, 0, this);
	...
}
复制代码

后面的流程和 上一篇《Netty 源码解析-服务端启动流程解析》 的注册流程是一样的,区别在于服务启动时注册是在 boss 线程池任务队列中执行注册,客户端新接入注册是在 work 线程池任务队列中执行 register0() 方法,并将 work 线程池的 selector 注册到 Java NIO 到这里,我们就可以回答开篇的的几个问题:客户端是如何接入? nettyboss 线程接收到客户端 TCP 连接请求后如何将链路注册到 worker 线程池? 现在我们还剩下一个问题:读写 I/O 事件是怎么哪里开始的?

我们回到文章开头

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
    for (int i = 0;; i ++) {
        final SelectionKey k = selectedKeys[i];
        if (k == null) {
            break;
        }
        selectedKeys[i] = null;

        final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {
               processSelectedKey(k, (AbstractNioChannel) a);
        } else {
               @SuppressWarnings("unchecked")
               NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
               processSelectedKey(k, task);
        }
             ...
    } 
}
复制代码

前面 boss 线程池在这里完成了客户端连接接入,并将链路注册到 worker 线程池任务队列,添加了 read 事件的监听,那么现在 work 线程不停循环 selectedKeys 中有没有待处理的事件,当有待处理事件,那么会执行 processSelectedKey() 方法。

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
	...
	int readyOps = k.readyOps();
	if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    		unsafe.read();
    		...
	}
	...
}
复制代码

在这里 unsafe.read() 选择 AbstractNioByteChannelread()

@Override
public final void read() {
    final ChannelConfig config = config();
    if (!config.isAutoRead() && !isReadPending()) {
        // ChannelConfig.setAutoRead(false) was called in the meantime
        removeReadOp();
        return;
    }
    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    final int maxMessagesPerRead = config.getMaxMessagesPerRead();
    RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
    if (allocHandle == null) {
       this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
    }
    ByteBuf byteBuf = null;
    int messages = 0;
    boolean close = false;
    try {
       int totalReadAmount = 0;
       boolean readPendingReset = false;
       do {
          byteBuf = allocHandle.allocate(allocator);
          int writable = byteBuf.writableBytes();
          int localReadAmount = doReadBytes(byteBuf);
          if (localReadAmount <= 0) {
           // not was read release the buffer
              byteBuf.release();
              byteBuf = null;
              close = localReadAmount < 0;
              break;
           }
          if (!readPendingReset) {
               readPendingReset = true;
               setReadPending(false);
          }
          pipeline.fireChannelRead(byteBuf);
          byteBuf = null;

          if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
               totalReadAmount = Integer.MAX_VALUE;
               break;
          }
          totalReadAmount += localReadAmount;

          if (!config.isAutoRead()) {
               break;
          }

          if (localReadAmount < writable) {
              break;
          }
       } while (++ messages < maxMessagesPerRead);
         pipeline.fireChannelReadComplete();
         allocHandle.record(totalReadAmount);

        if (close) {
            closeOnRead(pipeline);
            close = false;
        }
     } catch (Throwable t) {
          handleReadException(pipeline, byteBuf, t, close);
     } finally {
         if (!config.isAutoRead() && !isReadPending()) {
                removeReadOp();
          }
     }
    }
}
复制代码

把这一大段代码分解成几部分

1.设置循环读,16次,未读完则会等到下一轮 select 继续读取, maxMessagesPerRead 默认等于16。

2.获取缓存操作 handlerconfig.getRecvByteBufAllocator().newHandle()

3.申请缓存空间, allocHandle.allocate(allocator)

4.从 socket 中读取数据到 byteBuf 中。

5.传递读事件到下一个 handler 处理器。

6.读完之后发送读完时间到下一个 handler 处理器 我们只看读事件,其他细节后面的文章再详细解析。

@Override
public ChannelPipeline fireChannelRead(Object msg) {
    head.fireChannelRead(msg);
    return this;
}
复制代码
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    if (msg == null) {
        throw new NullPointerException("msg");
    }

    final AbstractChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(msg);
 } else {
        executor.execute(new OneTimeTask() {
            @Override
            public void run() {
                next.invokeChannelRead(msg);
            }
        });
    }
    return this;
}
复制代码
Netty 源码解析系列-客户端连接接入及读I/O解析
Handler 事件顺序是 HeadContextHandler --> IdleStateHandler -->IOHandler --> TailContext
private void invokeChannelRead(Object msg) {
    try {
        ((ChannelInboundHandler) handler()).channelRead(this, msg);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}
复制代码

进到 IdleStateHandler

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
        reading = true;
        firstReaderIdleEvent = firstAllIdleEvent = true;
    }
    ctx.fireChannelRead(msg);
}
复制代码

设置读事件为true,为后面状态检测做准备,继续向下传递读事件,这次是 IOHandler 的读事件。

Netty 源码解析系列-客户端连接接入及读I/O解析
public class IOHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        super.channelRead(ctx, msg);
        System.out.println(msg.toString());
    }
	...
}
复制代码

交给用户自定义 handler 处理读事件,自此读 I/O 事件是怎么哪里开始,如何交给用户 handler 处理已解析完毕。

总结:

1.boss线程处理 NioServerSocketChannelaccept 事件,并将客户端添加到 work 任务队列,任务队列执行 redister0() 方法, 将 read 事件注册到 work 线程的 selector

2.work线程轮询 selectkeys ,当有事件上来时,将缓存数据发送到用户 handler


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

复杂网络理论及其应用

复杂网络理论及其应用

汪小帆、李翔、陈关荣 / 清华大学出版社 / 2006 / 45.00元

国内首部复杂网络专著 【图书目录】 第1章 引论 1.1 引言 1.2 复杂网络研究简史 1.3 基本概念 1.4 本书内容简介 参考文献 第2章 网络拓扑基本模型及其性质 2.1 引言 2.2 规则网络 2.3 随机图 2.4 小世界网络模型 2.5 无标度网络模型 ......一起来看看 《复杂网络理论及其应用》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试