Kafka 源码解析:网络交互模型

栏目: 后端 · 发布时间: 4年前

内容简介:由上一篇分析可以知道,在 broker 节点启动过程中会创建一个 SocketServer 类型的对象,并调用其参考如下示意图,Kafka 为 broker 所在宿主机的每一张网卡创建并绑定了一个 Acceptor 组件,用于接收并处理所有的连接请求;每个 Acceptor 组件维护多个 Processor 线程,其中每个 Processor 拥有专属的 Selector,用于从连接中读取请求和写回响应;每个 Acceptor 组件同时维护多个 Handler 线程,用于处理请求并生成响应传递给 Proce

由上一篇分析可以知道,在 broker 节点启动过程中会创建一个 SocketServer 类型的对象,并调用其 SocketServer#startup 方法执行组件的启动过程。SocketServer 是 kafka 对外提供网络服务的核心实现类,在 kafka 运行过程中用于接收来自客户端和其它 broker 节点的网络请求。考虑到性能上的需求,SocketServer 采用了Reactor 模式,并基于 java NIO 实现。

参考如下示意图,Kafka 为 broker 所在宿主机的每一张网卡创建并绑定了一个 Acceptor 组件,用于接收并处理所有的连接请求;每个 Acceptor 组件维护多个 Processor 线程,其中每个 Processor 拥有专属的 Selector,用于从连接中读取请求和写回响应;每个 Acceptor 组件同时维护多个 Handler 线程,用于处理请求并生成响应传递给 Processor,而 Handler 与 Processor 之间通过请求队列进行通信。

Kafka 源码解析:网络交互模型

一. SocketServer 组件

SocketServer 是整个 kafka server 网络模型的管家类,主要用于构建和启动整个网络模块。SocketServer 类的字段定义如下:

class SocketServer(val config: KafkaConfig,
                   val metrics: Metrics,
                   val time: Time,
                   val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup {

    /** 封装服务器对应的多张网卡,kafka 可以同时监听这些 IP 和端口,每个 EndPoint 对应一个 Acceptor */
    private val endpoints: Map[ListenerName, EndPoint] = config.listeners.map(l => l.listenerName -> l).toMap
    /** 每个 Acceptor 对应的 Processor 对应的线程数 */
    private val numProcessorThreads = config.numNetworkThreads
    /** broker 节点上 Processor 线程总数 */
    private val totalProcessorThreads = numProcessorThreads * endpoints.size
    /** 请求队列中缓存的最大请求个数 */
    private val maxQueuedRequests = config.queuedMaxRequests
    /** 每个 IP 允许创建的最大连接数 */
    private val maxConnectionsPerIp = config.maxConnectionsPerIp
    /** 针对特定 IP 指定的允许创建的最大连接数,会覆盖 maxConnectionsPerIp 配置 */
    private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides
    /** Processor 线程与 Handler 线程之间交换数据的通道 */
    val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
    /** Acceptor 对象集合,每个 EndPoint 对应一个 Acceptor */
    private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
    /** Processor 对象集合,封装所有的 Processor 对象 */
    private val processors = new Array[Processor](totalProcessorThreads)
    /** 用于控制每个 IP 上的最大连接数 */
    private var connectionQuotas: ConnectionQuotas = _

    // ... 省略方法定义

}

各字段的含义参考注释,其中 EndPoint 类用于封装服务器对应的 host、port,以及网络协议等信息,而 RequestChannel 类定义了 Processor 和 Handler 之间交换数据的通道,该类的字段定义如下:

class RequestChannel(val numProcessors: Int, // Processor 线程总数
                     val queueSize: Int // 请求队列的大小
                    ) extends KafkaMetricsGroup {

    /** 响应监听器列表,当 Handler 往响应队列写回响应数据时唤醒对应的 Processor 线程进行处理 */
    private var responseListeners: List[Int => Unit] = Nil
    /** 请求队列,所有的 Processor 共用一个 */
    private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
    /** 响应队列,每个 Processor 对应一个响应队列 */
    private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)

    // ... 省略方法定义

}

RequestChannel 封装了请求队列和响应队列,这里需要注意的一点是请求队列是 Processor 线程共享的,而响应队列则是每个 Processor 线程专属的。Processor 负责将读取到的请求写入请求队列中,并从自己的响应队列中取出响应对象发送给请求方。Handler 负责从请求队列中读取请求进行处理,并在处理完成之后将响应对象写入到之前读取该请求的 Processor 的响应队列中。关于 Acceptor、Processor 和 Handler 的实现下文会专门进行分析,这里我们先来看一下 SocketServer 的启动逻辑,位于 SocketServer#startup 方法中,实现如下:

def startup() {
    synchronized {

        // 创建控制 IP 最大连接数的 ConnectionQuotas 对象
        connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)

        // 指定 socket send buffer 的大小(对应 socket.send.buffer.bytes 配置)
        val sendBufferSize = config.socketSendBufferBytes
        // 指定 socket receive buffer 的大小(对应 socket.receive.buffer.bytes 配置)
        val recvBufferSize = config.socketReceiveBufferBytes
        // 获取 broker 节点 ID
        val brokerId = config.brokerId

        var processorBeginIndex = 0
        // 遍历为每个 EndPoint,创建并绑定对应的 Acceptor 和 Processor
        config.listeners.foreach { endpoint =>
            val listenerName = endpoint.listenerName
            val securityProtocol = endpoint.securityProtocol
            val processorEndIndex = processorBeginIndex + numProcessorThreads

            // 按照指定的 processor 线程数,为每个 EndPoint 创建对应数量的 Processor 对象,
            // 编号区间 [processorBeginIndex, processorEndIndex)
            for (i <- processorBeginIndex until processorEndIndex)
                processors(i) = this.newProcessor(i, connectionQuotas, listenerName, securityProtocol)

            // 为当前 EndPoint 创建并绑定一个 Acceptor 对象
            val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
                processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
            acceptors.put(endpoint, acceptor)

            // 启动 Acceptor 线程
            Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start()

            // 主线程等待 Acceptor 线程启动完成
            acceptor.awaitStartup()

            processorBeginIndex = processorEndIndex
        }
    }

    info("Started " + acceptors.size + " acceptor threads")
}

SocketServer 启动过程中会遍历为当前 broker 节点上的每张网卡创建并绑定对应 Acceptor 对象,然后按照配置的 Processor 线程数(对应 num.network.threads 配置)为每个 Acceptor 创建并绑定对应数量的 Processor 实例,最后启动 Acceptor 线程。

二. Acceptor 组件

Acceptor 主要负责接收来自客户端和其它 broker 节点的请求,并创建对应的 socket 连接交由 Processor 进行处理。Acceptor 类的字段定义如下:

private[kafka] class Acceptor(val endPoint: EndPoint, // 对应的网卡信息
                              val sendBufferSize: Int, // socket send buffer size
                              val recvBufferSize: Int, // socket receive buffer size
                              brokerId: Int, // broker 节点 id
                              processors: Array[Processor], // 绑定的 Processor 线程集合
                              connectionQuotas: ConnectionQuotas // 控制 IP 连接数的对象
                             ) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {

    /** NIO Selector */
    private val nioSelector = NSelector.open()
    /** ServerSocketChannel 对象,监听对应网卡的指定端口 */
    val serverChannel: ServerSocketChannel = this.openServerSocket(endPoint.host, endPoint.port)

    // ... 省略方法定义

}

SocketServer 在启动过程中会创建并启动 Acceptor 线程,由上面的定义可以看出 Acceptor 继承自 AbstractServerThread 抽象类,而 AbstractServerThread 实现了 Runnable 接口,并提供了对线程的基本管理方法。Acceptor 的具体执行逻辑位于 Acceptor#run 方法中:

def run() {
    // 注册监听 OP_ACCEPT 事件
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    // 标记当前线程启动完成,以便 SocketServer 能够继续为其它网卡创建并绑定对应的 Acceptor 线程
    this.startupComplete()
    try {
        var currentProcessor = 0 // 当前生效的 processor 编号
        while (isRunning) {
            try {
                // 等待关注的事件
                val ready = nioSelector.select(500)
                if (ready > 0) {
                    val keys = nioSelector.selectedKeys()
                    val iter = keys.iterator()
                    // 遍历处理接收到的请求
                    while (iter.hasNext && isRunning) {
                        try {
                            val key = iter.next
                            iter.remove()
                            // 如果是 OP_ACCEPT 事件,则调用 accept 方法进行处理
                            if (key.isAcceptable)
                                this.accept(key, processors(currentProcessor))
                            else
                                throw new IllegalStateException("Unrecognized key state for acceptor thread.")
                            // 基于轮询算法选择下一个 Processor 处理下一次请求,负载均衡
                            currentProcessor = (currentProcessor + 1) % processors.length
                        } catch {
                            case e: Throwable => error("Error while accepting connection", e)
                        }
                    }
                }
            } catch {
                case e: ControlThrowable => throw e
                case e: Throwable => error("Error occurred", e)
            }
        }
    } finally {
        debug("Closing server socket and selector.")
        this.swallowError(serverChannel.close())
        this.swallowError(nioSelector.close())
        this.shutdownComplete()
    }
}

def accept(key: SelectionKey, processor: Processor) {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    // 创建 SocketChannel 对象
    val socketChannel = serverSocketChannel.accept()
    try {
        // 增加对应 IP 上的连接数,如果连接数超过阈值,则抛 TooManyConnectionsException 异常
        connectionQuotas.inc(socketChannel.socket().getInetAddress)
        // 配置 SocketChannel 对象,非阻塞模式
        socketChannel.configureBlocking(false)
        socketChannel.socket().setTcpNoDelay(true)
        socketChannel.socket().setKeepAlive(true)
        if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
            socketChannel.socket().setSendBufferSize(sendBufferSize)

        // 将 SocketChannel 交给 Processor 进行处理
        processor.accept(socketChannel)
    } catch {
        // 连接数过多,关闭当前通道上的连接,并将连接计数减 1
        case e: TooManyConnectionsException =>
            info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
            this.close(socketChannel)
    }
}

上述方法的执行逻辑是一个典型的 NIO server 的实现。Acceptor 会循环监听 OP_ACCEPT 事件,当有新的连接请求到达时会创建并配置连接对应的 SocketChannel 对象,并交由 Processor 处理(调用 Processor#accept 方法)。我们知道一个 Acceptor 上绑定了多个 Processor 线程,为了保证各个 Processor 的负载均衡,这里使用了简单的轮询算法,逐个选择 Processor 线程处理请求。

对于新进来的请求,Acceptor 首先会使用 ConnectionQuotas 对象管理请求 IP 上的连接数,并在连接数超过配置的阈值(默认对应 max.connections.per.ip 配置,可以通过 max.connections.per.ip.overrides 配置覆盖默认配置)时触发限流机制,关闭当前连接的通道。

三. Processor 组件

Processor 主要负责读取来自请求方的请求,并向请求方发送响应,但是本身不负责对请求进行处理,而是委托给相应的 Handler 线程进行处理。Processor 中几个重要的字段定义如下:

/** Processor 与 Handler 线程之间传递请求数据的队列 */
val requestChannel: RequestChannel
/** 记录分配给当前 Processor 的待处理的 SocketChannel 对象 */
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
/** 缓存未发送给客户端的响应,由于客户端不会进行确认,所以服务端在发送成功之后会将其移除 */
private val inflightResponses = mutable.Map[String, RequestChannel.Response]()

Acceptor 线程在收到连接请求之后,会将请求封装成 SocketChannel 对象,并调用 Processor#accept 方法将其分配给对应的 Processor 线程进行处理,该对象会被记录到 Processor#newConnections 字段中,并唤醒对应的 Processor 线程。方法 Processor#accept 的实现如下:

def accept(socketChannel: SocketChannel) {
    // 将 Acceptor 分配的 SocketChannel 对象缓存到同步队列中
    newConnections.add(socketChannel)
    // 唤醒 Processor 线程处理队列
    this.wakeup() // 本质上调用 NIO Server 的 wakeup 方法
}

Processor 同样继承了 AbstractServerThread 抽象类,所以也是一个线程类实现。在创建 Acceptor 对象的过程中会遍历启动分配给当前 Acceptor 的 Processor 线程。

synchronized {
    // 遍历启动分配给当前 Acceptor 的 Processor 线程
    processors.foreach { processor =>
        Utils.newThread(
            s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
            processor, false).start()
    }
}

Processor 的 Processor#run 方法在线程启动之后,会一直循环处理 Acceptor 分配的请求,读取并封装请求数据到队列中,然后等待 Handler 线程处理。对于已经处理完成的请求对应的响应对象,Processor 线程会依据响应类型分而治之。方法 Processor#run 的实现如下:

override def run() {
    // 标识当前线程启动完成
    this.startupComplete()
    while (isRunning) {
        try {
            // 1. 遍历获取分配给当前 Processor 的 SocketChannel 对象,注册 OP_READ 事件
            this.configureNewConnections()

            // 2. 遍历处理当前 Processor 的响应队列,依据响应类型进行处理
            this.processNewResponses()

            // 3. 发送缓存的响应对象给客户端
            this.poll()

            // 4.
            // 遍历处理 poll 操作放置在 Selector 的 completedReceives 队列中的请求,
            // 封装请求信息为 Request 对象,并记录到请求队列中等待 Handler 线程处理,
            // 同时标记当前 Selector 暂时不再接收新的请求
            this.processCompletedReceives()

            // 5.
            // 遍历处理 poll 操作放置在 Selector 的 completedSends 队列中的请求,
            // 将其从 inflightResponses 集合中移除,并标记当前 Selector 可以继续读取数据
            this.processCompletedSends()

            // 6.
            // 遍历处理 poll 操作放置在 Selector 的 disconnected 集合中的断开的连接,
            // 将连接对应的所有响应从 inflightResponses 中移除,同时更新对应 IP 的连接数
            this.processDisconnected()
        } catch {
            case e: ControlThrowable => throw e
            case e: Throwable =>
                error("Processor got uncaught exception.", e)
        }
    }

    debug("Closing selector - processor " + id)
    // 关闭所有的连接以及选择器
    this.swallowError(closeAll())
    this.shutdownComplete()
}

当 Processor 线程启动完成后会调用 Processor#startupComplete 方法标识当前线程启动完成,然后开始进入循环,依次执行以下操作:

OP_READ

下面对各个步骤逐一进行深入分析,首先来看 步骤 1 ,实现位于 Processor#configureNewConnections 方法中:

private def configureNewConnections() {
    while (!newConnections.isEmpty) {
        // 获取待处理 SocketChannel 对象
        val channel = newConnections.poll()
        try {
            debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
            val localHost = channel.socket().getLocalAddress.getHostAddress
            val localPort = channel.socket().getLocalPort
            val remoteHost = channel.socket().getInetAddress.getHostAddress
            val remotePort = channel.socket().getPort
            val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
            // 注册 OP_READ 事件
            selector.register(connectionId, channel)
        } catch {
            // 对于不致命的异常,则捕获并关闭对应的通道
            case NonFatal(e) =>
                val remoteAddress = channel.getRemoteAddress
                this.close(channel)
                error(s"Processor $id closed connection from $remoteAddress", e)
        }
    }
}

前面我们曾介绍过 Acceptor 会将请求对应的 SocketChannel 对象记录到 Processor#newConnections 字段中,而这一步的主要任务就是遍历处理这些 SocketChannel 对象,分别将 Processor 对应的 Selector 注册到这些通道上(对应 OP_READ 事件),用于读取请求数据。

步骤 2会遍历消费当前 Processor 的响应队列,按照响应的类型分别处理,实现位于 Processor#processNewResponses 方法中:

private def processNewResponses() {
    // 获取当前 Processor 的响应队列
    var curr = requestChannel.receiveResponse(id)
    while (curr != null) {
        try {
            // 依据响应类型对响应进行处理
            curr.responseAction match {
                // 暂时没有响应需要发送,如果对应的通道未被关闭,则继续注册 OP_READ 事件读取请求数据
                case RequestChannel.NoOpAction =>
                    curr.request.updateRequestMetrics()
                    trace("Socket server received empty response to send, registering for read: " + curr)
                    val channelId = curr.request.connectionId
                    if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null)
                        selector.unmute(channelId) // 注册 OP_READ 事件
                // 当前响应需要发送给请求方
                case RequestChannel.SendAction =>
                    // 发送该响应,并将响应对象记录到 inflightResponses 集合中
                    this.sendResponse(curr)
                // 需要关闭当前连接
                case RequestChannel.CloseConnectionAction =>
                    curr.request.updateRequestMetrics()
                    trace("Closing socket connection actively according to the response code.")
                    // 关闭连接
                    this.close(selector, curr.request.connectionId)
            }
        } finally {
            // 获取下一个待处理的响应
            curr = requestChannel.receiveResponse(id)
        }
    }
}

我们知道 Processor 本身不负责处理请求,它只是封装请求交由 Handler 线程进行处理,同时每一个 Processor 会维护一个响应队列,Handler 线程在处理完请求之后会将对应的响应对象放置到对应 Processor 的响应队列中,而这一步会遍历处理该响应队列,并依据响应类型分而治之:

  1. 如果当前没有响应需要处理,那么会重新在对应的通道上注册 OP_READ 事件,以继续读取新的请求数据。
  2. 如果当前的响应需要发送给请求方,则会调用 Processor#sendResponse 方法发送响应,并将响应对象记录到 Processor#inflightResponses 字段中,表示该响应对象正在被发送。
  3. 如果当前的响应类型表示需要关闭对应的连接,则会调用 Processor#close 方法关闭对应的通道,并更新对应 IP 上的连接数。

步骤 3会发送步骤 2 缓存的响应请求,并将读取到的请求、已经发送成功的请求,以及断开的连接分别放置到 Selector 的 completedReceives、completedSends 和 disconnected 集合中,而步骤 4 至 6 的逻辑则分别对应处理这 3 个集合。首先看一下 步骤 4 ,相应实现位于 Processor#processCompletedReceives 方法中:

private def processCompletedReceives() {
    // 遍历处理接收到的请求
    selector.completedReceives.asScala.foreach { receive =>
        try {
            // 获取请求对应的通道
            val openChannel = selector.channel(receive.source)
            // 创建通道对应的 Session 对象,用于权限控制
            val session = {
                // Only methods that are safe to call on a disconnected channel should be invoked on 'channel'.
                val channel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
                RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress)
            }
            // 封装请求信息为 Request 对象
            val req = RequestChannel.Request(
                processor = id,
                connectionId = receive.source,
                session = session,
                buffer = receive.payload,
                startTimeMs = time.milliseconds,
                listenerName = listenerName,
                securityProtocol = securityProtocol)
            // 将请求对象放入请求队列中,等待 Handler 线程处理
            requestChannel.sendRequest(req)
            // 取消注册的 OP_READ 事件,处理期间不再接收新的请求(即不读取新的请求数据)
            selector.mute(receive.source)
        } catch {
            case e@(_: InvalidRequestException | _: SchemaException) =>
                // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
                error(s"Closing socket for ${receive.source} because of error", e)
                close(selector, receive.source)
        }
    }
}

这一步会遍历处理 Selector 的 completedReceives 集合,对于收到的请求对象会读取请求数据,并封装成 Request 对象记录到请求队列 Processor#requestChannel 中,等待 Handler 线程处理,同时取消之前注册到对应通道的 OP_READ 事件,在处理完成之前不再读取新的请求数据。这里调用了 RequestChannel#sendRequest 方法将 Request 对象放置到一个被 Processor 共享的请求队列中,后续 Handler 线程会消费该队列处理对应的请求。

步骤 5会遍历处理 Selector 的 completedSends 集合,其中存放了已经发送成功的响应,对于这些响应可以从 Processor#inflightResponses 中移除,实现如下:

private def processCompletedSends() {
    // 遍历处理已经完全发送出去的请求
    selector.completedSends.asScala.foreach { send =>
        // 因为当前响应已经发送成功,从 inflightResponses 中移除,不需要客户端确认
        val resp = inflightResponses.remove(send.destination).getOrElse {
            throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
        }
        resp.request.updateRequestMetrics()
        // 注册 OP_READ 事件,继续读取请求数据
        selector.unmute(send.destination)
    }
}

步骤 6会遍历处理 Selector 的 disconnected 集合,对于已经断开的连接,将本地记录的待发送完成的响应对象从 Processor#inflightResponses 中移除,同时更新对应 IP 上的连接数,实现如下:

private def processDisconnected() {
    // 遍历处理已经断开的连接
    selector.disconnected.asScala.foreach { connectionId =>
        val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
            throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
        }.remoteHost
        // 将连接对应的所有响应从 inflightResponses 中移除
        inflightResponses.remove(connectionId).foreach(_.request.updateRequestMetrics())
        // 对应的通道已经被关闭,所以需要减少对应 IP 上的连接数
        connectionQuotas.dec(InetAddress.getByName(remoteHost))
    }
}

四. Handler 组件

Processor 在将对应的 Request 请求对象记录到全局共享的请求队列之后,Handler 线程会消费该队列并处理对应的请求,同时将处理完成的请求对应的响应对象写入到之前读取该请求的 Processor 的响应队列中。Handler 的实现由 KafkaRequestHandler 和 KafkaRequestHandlerPool 两个类构成,其中 KafkaRequestHandlerPool 是对 KafkaRequestHandler 的封装,提供了对 Handler 线程的管理。KafkaRequestHandlerPool 的实现比较简单,我们主要来看一下 KafkaRequestHandler 的实现,KafkaRequestHandler 实现了 Runnable 接口,其 KafkaRequestHandler#run 方法实现如下:

override def run() {
    while (true) {
        try {
            var req: RequestChannel.Request = null
            while (req == null) {
                val startSelectTime = time.nanoseconds
                // 从请求队列中获取 Processor 封装的请求
                req = requestChannel.receiveRequest(300)
                val idleTime = time.nanoseconds - startSelectTime
                aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
            }

            // 如果是 AllDone 请求,则退出当前线程
            if (req eq RequestChannel.AllDone) {
                debug("Kafka request handler %d on broker %d received shut down command".format(id, brokerId))
                return
            }
            req.requestDequeueTimeMs = time.milliseconds
            trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
            // 处理请求,将响应写回到对应 Processor 的响应队列中,并唤醒 Processor 线程
            apis.handle(req)
        } catch {
            case e: Throwable => error("Exception when handling request", e)
        }
    }
}

上述方法诠释了 Handler 的全部运行逻辑,首先调用 RequestChannel#receiveRequest 方法超时等待从全局请求队列中获取请求对象,如果获取到的请求对象是 RequestChannel.AllDone 类型,则说明当前请求退出相应线程,否则 Handler 线程会调用 KafkaApis#handle 方法对请求进行处理,并将响应结果写入到对应 Processor 的响应队列中。

KafkaApis 类是 kafka 中的一个核心类实现,用于分发各种类型的请求给到相应的组件,针对每一种请求都定义了相应的方法进行处理,上面调用 KafkaApis#handle 方法实现如下:

def handle(request: RequestChannel.Request) {
    try {
        trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
                format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
        // 依据请求类型分发请求
        ApiKeys.forId(request.requestId) match {
            // 处理 ProduceRequest 请求
            case ApiKeys.PRODUCE => handleProducerRequest(request)
            // 处理 FetchRequest 请求
            case ApiKeys.FETCH => handleFetchRequest(request)
            // 处理 ListOffsetRequest 请求
            case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
            // 处理 MetadataRequest 请求
            case ApiKeys.METADATA => handleTopicMetadataRequest(request)
            // 处理 LeaderAndIsrRequest 请求
            case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
            // 处理 StopReplicaRequest 请求
            case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
            // 处理 UpdateMetadataRequest 请求
            case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
            // 处理 ControlledShutdownRequest 请求
            case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
            // 处理 OffsetCommitRequest 请求
            case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
            // 处理 OffsetFetchRequest 请求
            case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
            // 处理 GroupCoordinatorRequest 请求
            case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
            // 处理 JoinGroupRequest 请求
            case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
            // 处理 HeartbeatRequest 请求
            case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
            // 处理 LeaveGroupRequest 请求
            case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
            // 处理 SyncGroupRequest 请求
            case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
            // 处理 DescribeGroupsRequest 请求
            case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
            // 处理 ListGroupsRequest 请求
            case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
            // 处理 SaslHandshakeRequest 请求
            case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
            // 处理 ApiVersionsRequest 请求
            case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
            // 处理 CreateTopicsRequest 请求
            case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
            // 处理 DeleteTopicsRequest 请求
            case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
            case requestId => throw new KafkaException("Unknown api code " + requestId)
        }
    } catch {
        // ... 省略异常处理
    } finally
        request.apiLocalCompleteTimeMs = time.milliseconds
}

枚举类 ApiKeys 为每一种请求类型定义了一个唯一的标识,KafkaApis 会依据具体的请求类型,将请求委托给对应的 handle* 方法进行处理,这些方法基本的执行逻辑可以概括为:

  1. 解析获取相应类型的请求对象;
  2. 权限校验;
  3. 委托对应的组件处理请求;
  4. 发送响应,或定义响应回调函数,并由具体的组件回调执行。

相应的实现这里先不展开,后续分析具体组件时再针对性分析。

五. 总结

本文我们分析了 kafka 的网络交互模型设计与实现,考虑到客户端与集群之间,以及 broker 节点之间的交互均基于请求进行通信,所以必须保证网络交互这一块的低延迟和高性能。相比于传统的“thread-per-connection”线程模型,Kafka 采用了 reactor 模式以满足实际的需求,并借助于 java NIO 进行实现。整个网络交互模型主要分为 Acceptor、Processor 和 Handler 三大组件,其中 Acceptor 负责接收请求,Processor 负责解析请求并发送响应,而具体的请求处理过程则交由 Handler 负责,其中的设计思想值得我们在开发自己的项目中借鉴。

转载声明 : 版权所有,商业转载请联系作者,非商业转载请注明出处

本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

人人都是产品经理——写给产品新人

人人都是产品经理——写给产品新人

苏杰 / 电子工业出版社 / 2017-6 / 66.60

《人人都是产品经理——写给产品新人》为经典畅销书《人人都是产品经理》的内容升级版本,和《人人都是产品经理2.0——写给泛产品经理》相当于上下册的关系。对于大量成长起来的优秀互联网产品经理、众多想投身产品工作的其他岗位从业者,以及更多有志从事这一职业的学生而言,这《人人都是产品经理——写给产品新人》曾是他们记忆深刻的启蒙读物、思想基石和行动手册。作者以分享经历与体会为出发点,以“朋友间聊聊如何做产品......一起来看看 《人人都是产品经理——写给产品新人》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具