Netty Channel源码分析

栏目: Java · 发布时间: 5年前

内容简介:原文链接:前面,我们大致了解了Netty中的几个核心组件。今天我们就来先来介绍Netty的网络通信组件,用于执行网络I/O操作 ——Netty版本:4.1.30

原文链接: wangwei.one/posts/netty…

前面,我们大致了解了Netty中的几个核心组件。今天我们就来先来介绍Netty的网络通信组件,用于执行网络I/O操作 —— Channel

Netty版本:4.1.30

概述

数据在网络中总是以字节的形式进行流通。我们在进行网络编程时选用何种传输方式编码(OIO、NIO等)决定了这些字节的传输方式。

在没有Netty之前,为了提升系统的并发能力,从OIO切换到NIO时,需要对代码进行大量的重构,因为相应的Java NIO 与 IO API大不相同。而Netty在这些 Java 原生API的基础上做了一层封装,对用户提供了高度抽象而又统一的API,从而让传输方式的切换不在变得困难,只需要直接使用即可,而不需要对整个代码进行重构。

Netty Channel UML

netty channel族如下:

Netty Channel源码分析

整个族群中,AbstractChannel 是最为关键的一个抽象类,从它继承出了AbstractNioChannel、AbstractOioChannel、AbstractEpollChannel、LocalChannel、EmbeddedChannel等类,每个类代表了不同的协议以及相应的IO模型。除了 TCP 协议以外,Netty 还支持很多其他的连接协议,并且每种协议还有 NIO(异步 IO) 和 OIO(Old-IO,即传统的阻塞 IO) 版本的区别. 不同协议不同的阻塞类型的连接都有不同的 Channel 类型与之对应。下面是一些常用的 Channel 类型:

  • NioSocketChannel:代表异步的客户端 TCP Socket 连接
  • NioServerSocketChannel:异步的服务器端 TCP Socket 连接
  • NioDatagramChannel:异步的 UDP 连接
  • NioSctpChannel:异步的客户端 Sctp 连接
  • NioSctpServerChannel:异步的 Sctp 服务器端连接
  • OioSocketChannel:同步的客户端 TCP Socket 连接
  • OioServerSocketChannel:同步的服务器端 TCP Socket 连接
  • OioDatagramChannel:同步的 UDP 连接
  • OioSctpChannel:同步的 Sctp 服务器端连接
  • OioSctpServerChannel:同步的客户端 TCP Socket 连接

Channel API

我们先来看下最顶层接口channel 主要的API,常用的如下:

接口名 描述
eventLoop() Channel需要注册到EventLoop的多路复用器上,用于处理I/O事件,通过eventLoop()方法可以获取到Channel注册的EventLoop。EventLoop本质上就是处理网络读写事件的Reactor线程。在Netty中,它不仅仅用来处理网络事件,也可以用来执行定时任务和用户自定义NioTask等任务。
pipeline() 返回channel分配的ChannelPipeline
isActive() 判断channel是否激活。激活的意义取决于底层的传输类型。例如,一个Socket传输一旦连接到了远程节点便是活动的,而一个Datagram传输一旦被打开便是活动的
localAddress() 返回本地的socket地址
remoteAddress() 返回远程的socket地址
flush() 将之前已写的数据冲刷到底层Channel上去
write(Object msg) 请求将当前的msg通过ChannelPipeline写入到目标Channel中。注意,write操作只是将消息存入到消息发送环形数组中,并没有真正被发送,只有调用flush操作才会被写入到Channel中,发送给对方。
writeAndFlush() 等同于调用write()并接着调用flush()
metadate() 熟悉TCP协议的读者可能知道,当创建Socket的时候需要指定TCP参数,例如接收和发送的TCP缓冲区大小,TCP的超时时间。是否重用地址等。在Netty中,每个Channel对应一个物理链接,每个连接都有自己的TCP参数配置。所以,Channel会聚合一个ChannelMetadata用来对TCP参数提供元数据描述信息,通过metadata()方法就可以获取当前Channel的TCP参数配置。
read() 从当前的Channel中读取数据到第一个inbound缓冲区中,如果数据被成功读取,触发ChannelHandler.channelRead(ChannelHandlerContext,Object)事件。读取操作API调用完成后,紧接着会触发ChannelHander.channelReadComplete(ChannelHandlerContext)事件,这样业务的ChannelHandler可以决定是否需要继续读取数据。如果已经有操作请求被挂起,则后续的读操作会被忽略。
close(ChannelPromise promise) 主动关闭当前连接,通过ChannelPromise设置操作结果并进行结果通知,无论操作是否成功,都可以通过ChannelPromise获取操作结果。该操作会级联触发ChannelPipeline中所有ChannelHandler的ChannelHandler.close(ChannelHandlerContext,ChannelPromise)事件。
parent() 对于服务端Channel而言,它的父Channel为空;对于客户端Channel,它的父Channel就是创建它的ServerSocketChannel。
id() 返回ChannelId对象,ChannelId是Channel的唯一标识。

Channel创建

对Netty Channel API以及相关的类有了一个初步了解之后,接下来我们来详细了解一下在Netty的启动过程中Channel是如何创建的。服务端Channel的创建过程,主要分为四个步骤:1)Channel创建;2)Channel初始化;3)Channel注册;4)Channel绑定。

Netty Channel源码分析

我们以下面的代码为例进行解析:

// 创建两个线程组,专门用于网络事件的处理,Reactor线程组
// 用来接收客户端的连接,
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 用来进行SocketChannel的网络读写
EventLoopGroup workGroup = new NioEventLoopGroup();

// 创建辅助启动类ServerBootstrap,并设置相关配置:
ServerBootstrap b = new ServerBootstrap();
// 设置处理Accept事件和读写操作的事件循环组
b.group(bossGroup, workGroup)
         // 配置Channel类型
        .channel(NioServerSocketChannel.class)
         // 配置监听地址
        .localAddress(new InetSocketAddress(port))
         // 设置服务器通道的选项,设置TCP属性
        .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
         // 设置建立连接后的客户端通道的选项
        .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
         // channel属性,便于保存用户自定义数据
        .attr(AttributeKey.newInstance("UserId"), "60293")
    	.handler(new LoggingHandler(LogLevel.INFO))
        // 设置子处理器,主要是用户的自定义处理器,用于处理IO网络事件
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(serverHandler);
            }
        });

// 调用bind()方法绑定端口,sync()会阻塞等待处理请求。这是因为bind()方法是一个异步过程,会立即返回一个ChannelFuture对象,调用sync()会等待执行完成
ChannelFuture f = b.bind().sync();
// 获得Channel的closeFuture阻塞等待关闭,服务器Channel关闭时closeFuture会完成
f.channel().closeFuture().sync();
复制代码

调用channel()接口设置 AbstractBootstrap 的成员变量 channelFactory ,该变量顾名思义就是用于创建channel的工厂类。源码如下:

...

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    // 创建 channelFactory
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

...

public B channelFactory(ChannelFactory<? extends C> channelFactory) {
    if (channelFactory == null) {
        throw new NullPointerException("channelFactory");
    }
    
    if (this.channelFactory != null) {
        throw new IllegalStateException("channelFactory set already");
    }
    this.channelFactory = channelFactory;
    return (B) this;
}

...

复制代码

channelFactory 设置为 ReflectiveChannelFactory ,在我们这个例子中 clazz 为 NioServerSocketChannel ,我们可以看到其中有个 newChannel() 接口,通过反射的方式来调用,这个接口的调用处我们后面会介绍到。源码如下:

// Channel工厂类
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }
	
    @Override
    public T newChannel() {
        try {
            // 通过反射来进行常见Channel实例
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

    @Override
    public String toString() {
        return StringUtil.simpleClassName(clazz) + ".class";
    }
}
复制代码

接下来我们来看下 NioServerSocketChannel 的构造函数,主要就是:

  • 生成ServerSocketChannel对象。NioServerSocketChannel创建时,首先使用SelectorProvider的openServerSocketChannel打开服务器套接字通道。SelectorProvider是Java的NIO提供的抽象类,是选择器和可选择通道的服务提供者。具体的实现类有SelectorProviderImpl,EPollSelectorProvide,PollSelectorProvider。选择器的主要工作是根据操作系统类型和版本选择合适的Provider:如果 LInux 内核版本>=2.6则,具体的SelectorProvider为EPollSelectorProvider,否则为默认的PollSelectorProvider。
  • 设置 ServerSocketChannelConfig 成员变量。
private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        // 调用JDK底层API生成 ServerSocketChannel 对象实例
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException("Failed to open a server socket.", e);
    }
}

private final ServerSocketChannelConfig config;

public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

public NioServerSocketChannel(SelectorProvider provider) {
    this(newSocket(provider));
}

public NioServerSocketChannel(ServerSocketChannel channel) {
    // 调用 AbstractNioChannel 构造器,创建 NioServerSocketChannel,设置SelectionKey为ACCEPT
    super(null, channel, SelectionKey.OP_ACCEPT);
    // 创建ChannleConfig对象,主要是TCP参数配置类
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
复制代码

AbstractNioChannel 的构造器如下:

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    // 调用 AbstractChannel 构造器
    super(parent);
    this.ch = ch;
    // 从上一步过来,这里设置为 SelectionKey.OP_ACCEPT
    this.readInterestOp = readInterestOp;
    try {
        // 设置为非阻塞状态
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to close a partially initialized socket.", e2);
            }
        }
        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}
复制代码

AbstractChannel 构造器中,会设Channel关联的三个核心对象:ChannelId、ChannelPipeline、Unsafe。

  • 初始化ChannelId,ChannelId是一个全局唯一的值;
  • 创建 NioMessageUnsafe 实例,该类为Channel提供了用于完成网络通讯相关的底层操作,如connect(),read(),register(),bind(),close()等;
  • 为Channel创建DefaultChannelPipeline,初始事件传播管道。关于Pipeline的分析,请看后文 的分析。
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    // 设置ChannelId
    id = newId();
    // 设置Unsafe
    unsafe = newUnsafe();
    // 设置Pipeline
    pipeline = newChannelPipeline();
}
复制代码

从 NioServerSocketChannelConfig 的构造函数追溯下去,在 DefaultChannelConfig 会设置channel成员变量。

public DefaultChannelConfig(Channel channel) {
    this(channel, new AdaptiveRecvByteBufAllocator());
}

protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
    setRecvByteBufAllocator(allocator, channel.metadata());
    // 绑定channel
    this.channel = channel;
}
复制代码

以上就是channel创建的过程,总结一下:

  • 通过 ReflectiveChannelFactory 工厂类,以反射的方式对channel进行创建;
  • channel创建的过程中,会创建四个重要的对象:ChannelId、ChannelConfig、ChannelPipeline、Unsafe。

Channel初始化

主要分为以下两步:

  • 将启动器(Bootstrap)设置的选项和属性设置到NettyChannel上面
  • 向Pipeline添加初始化Handler,供注册后使用

我们从 AbstractBootstrap 的 bind() 接口进去,调用链:bind() —> doBind(localAddress) —> initAndRegister() —> init(Channel channel),我们看下 ServerBootstrap 中 init() 接口的实现:

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        // 调用Channel工程类的newChannel()接口,创建channel,就是前面我们讲的部分内容
        channel = channelFactory.newChannel();
        // 初始化channel
        init(channel);
    } catch (Throwable t) {
        ....
}
复制代码

初始化Channel,我们来重点看下 init(channel) 接口:

void init(Channel channel) throws Exception {
    // 获取启动器 启动时配置的option参数,主要是TCP的一些属性
    final Map<ChannelOption<?>, Object> options = options0();
    // 将获得到 options 配置到 ChannelConfig 中去
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }

    // 获取 ServerBootstrap 启动时配置的 attr 参数
    final Map<AttributeKey<?>, Object> attrs = attrs0();
    // 配置 Channel attr,主要是设置用户自定义的一些参数
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }
	
    // 获取channel中的 pipeline,这个pipeline使我们前面在channel创建过程中设置的 pipeline
    ChannelPipeline p = channel.pipeline();

    // 将启动器中配置的 childGroup 保存到局部变量 currentChildGroup
    final EventLoopGroup currentChildGroup = childGroup;
    // 将启动器中配置的 childHandler 保存到局部变量 currentChildHandler
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    // 保存用户设置的 childOptions 到局部变量 currentChildOptions
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    // 保存用户设置的 childAttrs 到局部变量 currentChildAttrs
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            // 获取启动器上配置的handler
            ChannelHandler handler = config.handler();
            if (handler != null) {
                // 添加 handler 到 pipeline 中
                pipeline.addLast(handler);
            }
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    // 用child相关的参数创建出一个新连接接入器ServerBootstrapAcceptor
                    // 通过 ServerBootstrapAcceptor 可以将一个新连接绑定到一个线程上去
                    // 每次有新的连接进来 ServerBootstrapAcceptor 都会用child相关的属性对它们进行配置,并注册到ChaildGroup上去
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}
复制代码

对于新连接接入器 ServerBootstrapAcceptor 的分析 ,请查看后文

Channel注册

在channel完成创建和初始化之后,接下来就需要将其注册到事件轮循器Selector上去。我们回到 initAndRegister 接口上去:

final ChannelFuture initAndRegister() {

    ...

    // 获取 EventLoopGroup ,并调用它的 register 方法来注册 channel
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}
复制代码

最终会向下调用到 SingleThreadEventLoop 中的 register 接口:

如何调用到这里,里面的细节需要等到后面文章讲到 MultithreadEventExecutorGroup 再详细说明

@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    // 调用unsafe的register接口
    promise.channel().unsafe().register(this, promise);
    return promise;
}
复制代码

代码跟踪下去,直到 AbstractChannel 中的 AbstractUnsafe 这个类中的 register 接口。

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    // 将该Channel与eventLoop 进行绑定,后续与该channel相关的IO操作都由eventLoop来处理
    AbstractChannel.this.eventLoop = eventLoop;
	// 初次注册时 eventLoop.inEventLoop() 返回false
    if (eventLoop.inEventLoop()) {
        // 调用实际的注册接口register0
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    // 调用实际的注册接口register0
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}
复制代码

register0接口主要分为以下三段逻辑:

  • doRegister();

  • pipeline.invokeHandlerAddedIfNeeded();

  • pipeline.fireChannelRegistered();

private void register0(ChannelPromise promise) {
    try {
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        // 调用 doRegister() 接口
        doRegister();
        neverRegistered = false;
        registered = true;
        
       	// 通过pipeline的传播机制,触发handlerAdded事件
        pipeline.invokeHandlerAddedIfNeeded();
        safeSetSuccess(promise);
        // 通过pipeline的传播机制,触发channelRegistered事件
        pipeline.fireChannelRegistered();
        // 还没有绑定,所以这里的 isActive() 返回false.
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}
复制代码

我们来看 AbstractNioChannel 中的 doRegister()接口,最终调用的就是Java JDK底层的NIO API来注册。

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // eventLoop().unwrappedSelector():获取selector,将在后面介绍 EventLoop 创建时会讲到
            // 将selector注册到Java NIO Channel上
            // ops 设置为 0,表示不关心任何事件
            // att 设置为 channel自身,表示后面还会将channel取出来用作它用(后面文章会讲到)
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                eventLoop().selectNow();
                selected = true;
            } else {
                throw e;
            }
        }
    }
}
复制代码

Channel绑定

在完成创建、初始化以及注册之后,接下来就是Channel绑定操作。

本小节涉及到的pipeline事件传播机制,我们放到后面的文章中去讲解。

从启动器的bind()接口开始,往下调用 doBind() 方法:

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 初始化及注册
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        // 调用 doBind0
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        ....
    }
}
复制代码

doBind 方法又会调用 doBind0() 方法,在doBind0()方法中会通过EventLoop去执行channel的bind()任务,关于EventLoop的execute接口的分析,请看后面的文章 。

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                // 调用channel.bind接口
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}
复制代码

doBind0() 方法往下会条用到 pipeline.bind(localAddress, promise); 方法,通过pipeline的传播机制,最终会调用到 AbstractChannel.AbstractUnsafe.bind() 方法,这个方法主要做两件事情:

  • 调用doBind():调用底层JDK API进行Channel的端口绑定。
  • 调用pipeline.fireChannelActive():

关于Pipeline的传播机制,请看后文

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    
    ....
    
    // wasActive 在绑定成功前为 false
    boolean wasActive = isActive();
    try {
        // 调用doBind()调用JDK底层API进行端口绑定
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }
	// 完成绑定之后,isActive() 返回true
    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                // 触发channelActive事件
                pipeline.fireChannelActive();
            }
        });
    }
    safeSetSuccess(promise);
}
复制代码

我们这里看服务端 NioServerSocketChannel 实现的 doBind方法,最终会调用JDK 底层 NIO Channel的bind方法:

@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}
复制代码

调用 pipeline.fireChannelActive(),开始传播active事件,pipeline首先就会调用HeadContext节点进行事件传播,会调用到 DefaultChannelPipeline.HeadContext.channelActive() 方法:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // 触发heanlder 的 ChannelActive 方法
    ctx.fireChannelActive();
    // 调用接口readIfIsAutoRead
    readIfIsAutoRead();
}

private void readIfIsAutoRead() {
    if (channel.config().isAutoRead()) {
        // 调用channel.read()
        channel.read();
    }
}
复制代码

channel.read() 方法往下会调用到 AbstractChannelHandlerContext.read() 方法:

@Override
public ChannelHandlerContext read() {
    // 获取下一个ChannelHandlerContext节点
    final AbstractChannelHandlerContext next = findContextOutbound();
    // 获取EventExecutor
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        // 调用下一个节点的invokeRead接口
        next.invokeRead();
    } else {
        Runnable task = next.invokeReadTask;
        if (task == null) {
            next.invokeReadTask = task = new Runnable() {
                @Override
                public void run() {
                    next.invokeRead();
                }
            };
        }
        executor.execute(task);
    }
    return this;
}
复制代码

通过pipeline的事件传播机制,最终会调用到 AbstractChannel.AbstractUnsafe.beginRead() 方法:

@Override
public final void beginRead() {
    assertEventLoop();
    if (!isActive()) {
        return;
    }
    try {
        // 调用 doBeginRead();
        doBeginRead();
    } catch (final Exception e) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireExceptionCaught(e);
            }
        });
        close(voidPromise());
    }
}
复制代码

我们看下 AbstractNioChannel 对doBeginRead接口的实现逻辑:

// 注册一个OP_ACCEPT
@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    // 获取channel注册是的设置的 selectionKey
    final SelectionKey selectionKey = this.selectionKey;
    // selectionKey无效则返回
    if (!selectionKey.isValid()) {
        return;
    }
	
    readPending = true;
	// 前面讲到channel在注册的时候,这是 interestOps 设置的是 0
    final int interestOps = selectionKey.interestOps();
    // readInterestOp 在前面讲到channel创建的时候,设置值为 SelectionKey.OP_ACCEPT
    if ((interestOps & readInterestOp) == 0) {
        // 最终 selectionKey 的兴趣集就会设置为 SelectionKey.OP_ACCEPT
        // 表示随时可以接收新连接的接入
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}
复制代码

总结

至此,我们就分析完了Channel的创建、初始化、注册、绑定的流程。其中涉及到的EventLoopGroup和Pipeline事件传播机制的知识点,我们放到后面的文章中去讲解。

参考资料


以上所述就是小编给大家介绍的《Netty Channel源码分析》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

高效算法

高效算法

[法] Christoph Dürr、[法] Jill-Jênn Vie / 史世强 / 人民邮电出版社 / 2018-5 / 55.00元

本书旨在探讨如何优化算法效率,详细阐述了经典算法和特殊算法的实现、应用技巧和复杂度验证过程,内容由浅入深,能帮助读者快速掌握复杂度适当、正确率高的高效编程方法以及自检、自测技巧,是参加ACM/ICPC、Google Code Jam 等国际编程竞赛、备战编程考试、提高编程效率、优化编程方法的参考书目。一起来看看 《高效算法》 这本书的介绍吧!

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换