kafka集群Broker端基于Reactor模式请求处理流程深入剖析-kafka商业环境实战

栏目: 后端 · 发布时间: 5年前

内容简介:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。}

本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。

kafka集群Broker端基于Reactor模式请求处理流程深入剖析-kafka商业环境实战

1 Reactor单线程案例代码热热身

  • 如下是单线程的JAVA NIO编程模型。

  • 首先服务端创建ServerSocketChannel对象,并注册到Select上OP_ACCEPT事件,然后ServerSocketChannel负责监听指定端口上的连接请求。

  • 客户端一旦连接上ServerSocketChannel,就会触发Acceptor来处理OP_ACCEPT事件,并为来自客户端的连接创建Socket Channel,并设置为非阻塞模式,并在其Selector上注册OP_READ或者OP_WRITE,最终实现客户端与服务端的连接建立和数据通道打通。

  • 这里有一个明显的问题,就是所有时间的处理逻辑都是在Acceptor单线程完成的,在并发连接数较小,数据量较小的场景下,是没有问题的,但是......

  • Selector 允许一个单一的线程来操作多个 Channel. 如果我们的应用程序中使用了多个 Channel, 那么使用 Selector 很方便的实现这样的目的, 但是因为在一个线程中使用了多个 Channel, 因此也会造成了每个 Channel 传输效率的降低.

  • 优化点在于:通道连接|读取或写入|业务处理均采用多线程来处理。通过线程池或者MessageQueue共享队列,进一步优化了高并发的处理要求,这样就解决了同一时间出现大量I/O事件时,单独的Select就可能在分发事件时阻塞(或延时),而成为瓶颈的问题。

    kafka集群Broker端基于Reactor模式请求处理流程深入剖析-kafka商业环境实战

    public class NioEchoServer { private static final int BUF_SIZE = 256; private static final int TIMEOUT = 3000;

    public static void main(String args[]) throws Exception {
          // 打开服务端 Socket
          ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    
          // 打开 Selector
          Selector selector = Selector.open();
    
          // 服务端 Socket 监听8080端口, 并配置为非阻塞模式
          serverSocketChannel.socket().bind(new InetSocketAddress(8080));
          serverSocketChannel.configureBlocking(false);
    
          // 将 channel 注册到 selector 中.
          // 通常我们都是先注册一个 OP_ACCEPT 事件, 然后在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ
          // 注册到 Selector 中.
          serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
          while (true) {
              // 通过调用 select 方法, 阻塞地等待 channel I/O 可操作
              if (selector.select(TIMEOUT) == 0) {
                  System.out.print(".");
                  continue;
              }
    
              // 获取 I/O 操作就绪的 SelectionKey, 通过 SelectionKey 可以知道哪些 Channel 的哪类 I/O 操作已经就绪.
              Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
    
              while (keyIterator.hasNext()) {
    
                  SelectionKey key = keyIterator.next();
    
                  // 当获取一个 SelectionKey 后, 就要将它删除, 表示我们已经对这个 IO 事件进行了处理.
                  keyIterator.remove();
    
                  if (key.isAcceptable()) {
                      // 当 OP_ACCEPT 事件到来时, 我们就有从 ServerSocketChannel 中获取一个 SocketChannel,
                      // 代表客户端的连接
                      // 注意, 在 OP_ACCEPT 事件中, 从 key.channel() 返回的 Channel 是 ServerSocketChannel.
                      // 而在 OP_WRITE 和 OP_READ 中, 从 key.channel() 返回的是 SocketChannel.
                      SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
                      clientChannel.configureBlocking(false);
                      //在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ 注册到 Selector 中.
                      // 注意, 这里我们如果没有设置 OP_READ 的话, 即 interest set 仍然是 OP_CONNECT 的话, 那么 select 方法会一直直接返回.
                      clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE));
                  }
    
                  if (key.isReadable()) {
                      SocketChannel clientChannel = (SocketChannel) key.channel();
                      ByteBuffer buf = (ByteBuffer) key.attachment();
                      long bytesRead = clientChannel.read(buf);
                      if (bytesRead == -1) {
                          clientChannel.close();
                      } else if (bytesRead > 0) {
                          key.interestOps(OP_READ | SelectionKey.OP_WRITE);
                          System.out.println("Get data length: " + bytesRead);
                      }
                  }
    
                  if (key.isValid() && key.isWritable()) {
                      ByteBuffer buf = (ByteBuffer) key.attachment();
                      buf.flip();
                      SocketChannel clientChannel = (SocketChannel) key.channel();
    
                      clientChannel.write(buf);
    
                      if (!buf.hasRemaining()) {
                          key.interestOps(OP_READ);
                      }
                      buf.compact();
                  }
              }
          }
      }
    复制代码

}


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

组合数学

组合数学

Richard A.Brualdi / 机械工业出版社 / 2009-3 / 49.00元

《组合数学(英文版)(第5版)》英文影印版由Pearson Education Asia Ltd.授权机械工业出版社独家出版。未经出版者书面许可,不得以任何方式复制或抄袭奉巾内容。仅限于中华人民共和国境内(不包括中国香港、澳门特别行政区和中同台湾地区)销售发行。《组合数学(英文版)(第5版)》封面贴有Pearson Education(培生教育出版集团)激光防伪标签,无标签者不得销售。Englis......一起来看看 《组合数学》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具