netty-bootstrap

栏目: 编程工具 · 发布时间: 5年前

内容简介:Bootstrap是netty中用于创建Server、Client端代码的构造工具类,里面包括了启动Server、Client需要的配置信息等。下面用一段代码来初步了解下。主要的代码类为io.netty.bootstrap.Bootstrap

Bootstrap是netty中用于创建Server、Client端代码的构造 工具 类,里面包括了启动Server、Client需要的配置信息等。下面用一段代码来初步了解下。

// 创建ServerBootstrap
ServerBootstrap bootstrap = new ServerBootstrap();
// 创建用户acceptor和dispatcher io event的io线程池
EventLoopGroupbossGroup=newNioEventLoopGroup();
EventLoopGroupworkerGroup=newNioEventLoopGroup();
try{

// 设置server的channel类型为NioServerSocketChannel
bootstrap.channel(NioServerSocketChannel.class)
// 设置对应的io线程池
.group(bossGroup, workerGroup)
// 设置新建连接的处理器
.childHandler(newChannelInitializer<SocketChannel>() {
protectedvoidinitChannel(SocketChannelch)throwsException{
// 对于新建的连接,在它的ChannelPipeline上增加一个EchoHandler
ch.pipeline().addLast(newEchoHandler());
}
});

// 绑定到8090
ChannelFuturebind=bootstrap.bind(8090);
// netty中大多数操作都是异步的,所以上面的bind方法会立刻返回并返回一个Future, 需要sync阻塞等待bind成功
bind.sync();
// 一直阻塞当前线程知道server channel关闭
bind.channel().closeFuture().sync();
}finally{
// 关闭线程池
bossGroup.shutdownGracefully().sync();
workerGroup.shutdownGracefully().sync();
}

Bootstrap相关源码分析

主要的代码类为

io.netty.bootstrap.Bootstrap

io.netty.bootstrap.ServerBootstrap

io.netty.bootstrap.AbstractBootstrap

io.netty.bootstrap.AbstractBootstrapConfig

io.netty.bootstrap.ServerBootstrapConfig

io.netty.bootstrap.BootstrapConfig

AbstractBootstrap

AbstractBootstrap通过提供chain方法链提供方面的Channel配置方式。

这里要了解一种很多代码中用到的看上去不太好懂的 Hierarchical Builder 模式。

public abstract class AbstractBootstrap<Bextends AbstractBootstrap<B,C>,Cextends Channel>implementsCloneable{

   ...
    /**
* The {@link EventLoopGroup} which is used to handle all the events for the to-be-created
* {@link Channel}
*/
    public B group(EventLoopGroup group) {
        if (group == null) {
            throw new NullPointerException("group");
        }
        if (this.group != null) {
            throw new IllegalStateException("group set already");
        }
        this.group = group;
        return self();
    }

    @SuppressWarnings("unchecked")
    private B self() {
        return (B) this;
    }
这里的AbstractBootstrap

返回了B类型的对象,这样又可以通过其他方法返回B的串联起来,这些方法最后都调用了B self()返回this, 这个this的类型是子类型,这样子类就可以直接使用而不必强行转换父类型到子类型了。

这时ServerBootstrap继承AbstractBootstrap并将B设置为自己时,就可以方便的在AbstractBootstrap基础上增加配置方法了。

public class ServerBootstrapextends AbstractBootstrap<ServerBootstrap,ServerChannel>{
	 public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = childGroup;
        return this;
    }
    ...
}

这里的ServerBootstrap在AbstractBootstrap的基础上又添加了group(parentGroup, childGroup)的方法。所以这种写法很适合多级Builder。

当子类继承

AbstractBootstrapConfig

AbstratBootstrapConfig向外暴露AbstractBootstrap的配置。

public abstract class AbstractBootstrapConfig<Bextends AbstractBootstrap<B,C>,Cextends Channel>{

    protected final B bootstrap;

    protected AbstractBootstrapConfig(B bootstrap) {
        this.bootstrap = ObjectUtil.checkNotNull(bootstrap, "bootstrap");
    }

    /**
* Returns the configured local address or {@code null} if non is configured yet.
*/
    public final SocketAddress localAddress() {
        return bootstrap.localAddress();
    }

AbstractBootstrapConfig提供基础的AbstractBootstrap拥有的配置,具体子类又可以提供其具体AbstractBootstrap的配置,

例如ServerBootstrapConfig中B为ServerBoostrap,就可以返回更多的ServerBootstrap中的方法例如childGroup等。

ublic final class ServerBootstrapConfigextends AbstractBootstrapConfig<ServerBootstrap,ServerChannel>{

    ServerBootstrapConfig(ServerBootstrap bootstrap) {
        super(bootstrap);
    }

    /**
* Returns the configured {@link EventLoopGroup} which will be used for the child channels or {@code null}
* if non is configured yet.
*/
    @SuppressWarnings("deprecation")
    public EventLoopGroup childGroup() {
        return bootstrap.childGroup();
    }
    ...

ServerBootstrap bind过程

通常在给ServerBootstrap配置好各种参数后的最后一步就是bind。

Server的bind将创建一个本地的ServerSocket绑定到对应端口上,并且配置接受到新的socket后的处理等。

下面看一下具体的bind过程。

AbstractBootstrap中

/**
* Create a new {@linkChannel} and bind it.
*/
    public ChannelFuturebind(SocketAddress localAddress){
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }
private ChannelFuturedoBind(final SocketAddress localAddress){
     final ChannelFuture regFuture = initAndRegister();
     final Channel channel = regFuture.channel();
     if (regFuture.cause() != null) {
         return regFuture;
     }

     if (regFuture.isDone()) {
         // At this point we know that the registration was complete and successful.
         ChannelPromise promise = channel.newPromise();
         doBind0(regFuture, channel, localAddress, promise);
         return promise;
     } else {
         // Registration future is almost always fulfilled already, but just in case it's not.
         final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
         regFuture.addListener(new ChannelFutureListener() {
             @Override
             public void operationComplete(ChannelFuture future)throws Exception {
                 Throwable cause = future.cause();
                 if (cause != null) {
                     // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                     // IllegalStateException once we try to access the EventLoop of the Channel.
                     promise.setFailure(cause);
                 } else {
                     // Registration was successful, so set the correct executor to use.
                     // See https://github.com/netty/netty/issues/2586
                     promise.registered();

                     doBind0(regFuture, channel, localAddress, promise);
                 }
             }
         });
         return promise;
     }
 }

initAndRegister

  • 负责通过ChannelFactory创建Channel
  • 初始化 init Channel
  • 注册Channel到EventLoopGroup上
final ChannelFuture initAndRegister() {
     Channel channel = null;
     try {
         channel = channelFactory.newChannel();
         init(channel);
     } catch (Throwable t) {
         if (channel != null) {
             // channel can be null if newChannel crashed (eg SocketException("too many open files"))
             channel.unsafe().closeForcibly();
             // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
             return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
         }
         // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
         return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
     }

     ChannelFuture regFuture = config().group().register(channel);
     if (regFuture.cause() != null) {
         if (channel.isRegistered()) {
             channel.close();
         } else {
             channel.unsafe().closeForcibly();
         }
     }

     // If we are here and the promise is not failed, it's one of the following cases:
     // 1) If we attempted registration from the event loop, the registration has been completed at this point.
     // i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
     // 2) If we attempted registration from the other thread, the registration request has been successfully
     // added to the event loop's task queue for later execution.
     // i.e. It's safe to attempt bind() or connect() now:
     // because bind() or connect() will be executed *after* the scheduled registration task is executed
     // because register(), bind(), and connect() are all bound to the same thread.

     return regFuture;
 }

通过 ServerBootstrap.channel(NioServerSocketChannel.class) 这样配置的ServerBootstrap

使用的是 ReflectiveChannelFactory , 通过对应的类的构造器反射创建Channel对象。

ServerBootstrap的init方法中将前面配置的Bootstrap的attr、option等设置到Channel上

然后给当前ServerChannel增加一个ChannelInitializer, initChannel会在Channel注册完成后调用,

这里会给Channel添加Bootstrap中配置的Handler,然后给继续在pipeline上添加一个ServerBootstrapAcceptor。

ServerBootstrapAcceptor就是ServerSocket接受到新建的socket连接的处理。

注意到这里给pipeline添加ServerBootstrapAcceptor放到的channel的eventloop中去执行,这样做的原因是

void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }

    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor。
											ServerBootstrapAcceptor就是ServerSocket接受到新建的socket连接的处理。(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

ServerBootstrapAcceptor

首先给接受到的Channel(child)添加配置的childHandler、设置ChannelOption、ChannelAttribute等。

然后注册到childGroup上,对应于上面示例代码中的workerGroup。

public void channelRead(ChannelHandlerContext ctx, Object msg) {
  final Channel child = (Channel) msg;

  child.pipeline().addLast(childHandler);

  setChannelOptions(child, childOptions, logger);

  for (Entry<AttributeKey<?>, Object> e: childAttrs) {
      child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
  }

  try {
      childGroup.register(child).addListener(new ChannelFutureListener() {
          @Override
          public void operationComplete(ChannelFuture future) throws Exception {
              if (!future.isSuccess()) {
                  forceClose(child, future.cause());
              }
          }
      });
  } catch (Throwable t) {
      forceClose(child, t);
  }

注册到EventLoopGruop上

ChannelFuture regFuture = config().group().register(channel);

NioEventLoopGroup 继承于 MultithreadEventLoopGroup ,next()方法会round-robin的方式选出一个NioEventLoop,

然后设置Channel的eventLoop为这个NioEventLoop然后发出channelRegister事件等。@Override

public ChannelFuture register(Channel channel) {

return next().register(channel);

}

MultithreadEventLoopGroup

@Override
   public ChannelFutureregister(Channel channel){
       return next().register(channel);
   }

	public EventExecutornext(){
       return chooser.next();
   }

chooser的作用是从EventExecutor中选出一个作为next的返回结果

public EventExecutorChooser newChooser(EventExecutor[] executors) {
       if (isPowerOfTwo(executors.length)) {
           return new PowerOfTwoEventExecutorChooser(executors);
       } else {
           return new GenericEventExecutorChooser(executors);
       }
   }

NioEventLoop 继承于 SingleThreadEventLoop

public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

AbstractChannel

这里register中的处理是

设置当前Channel的eventLoop,fireChannelRegister事件,如果当前Channel是active状态并且是第一次注册则fireChannelActive事件

public final void register(EventLoop eventLoop,final ChannelPromise promise){
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run(){
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

initAndRegister完成后,会进行doBind0

private static void doBind0(
					 final ChannelFuture regFuture, final Channel channel,
					 final SocketAddress localAddress, final ChannelPromise promise) {

			 // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
			 // the pipeline in its channelRegistered() implementation.
			 channel.eventLoop().execute(new Runnable() {
					 @Override
					 public void run(){
							 if (regFuture.isSuccess()) {
									 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
							 } else {
									 promise.setFailure(regFuture.cause());
							 }
					 }
			 });
	 }

doBind0调用AbstractChannel的bind方法,然后调用pipeline.bind。

后面会讲到ChannelPipeline,ChannelPipeline中有一个特殊的ChannelHandler是HeadContext,作为pipelien的head节点。

public void bind(
            ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
            throws Exception {
        unsafe.bind(localAddress, promise);
    }

unsafe是一个接口,不同的io channel有不同的实现,

NioServerSocketChannel

可以看到 java 7以上使用jdk的ServerSocketChannel.bind方法,小于的版本得到对应的socket后进行bind

protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Definitive Guide to HTML5 WebSocket

The Definitive Guide to HTML5 WebSocket

Vanessa Wang、Frank Salim、Peter Moskovits / Apress / 2013-3 / USD 26.30

The browser is, hands down, the most popular and ubiquitous deployment platform available to us today: virtually every computer, smartphone, tablet, and just about every other form factor imaginable c......一起来看看 《The Definitive Guide to HTML5 WebSocket》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换