Kafka 源码解析:生产者运行机制

栏目: 后端 · 发布时间: 4年前

内容简介:Kafka 生产者 KafkaProducer 是 kafka 与开发者交互的媒介之一,肩负接收用户自定义消息(这里的消息指代往 kafka 发送的各类数据),并投递给目标 topic 分区的职责。在设计上为了提升消息吞吐量,考量降低与服务端交互的压力等,每次发送消息的请求并非是直接与 kafka 集群进行交互,而是一个异步的过程。当调用在本篇文章中,我们首先来回忆一下 KafkaProducer 的使用方式,然后重点分析消息的收集、缓存、投递,以及响应过程。

Kafka 生产者 KafkaProducer 是 kafka 与开发者交互的媒介之一,肩负接收用户自定义消息(这里的消息指代往 kafka 发送的各类数据),并投递给目标 topic 分区的职责。在设计上为了提升消息吞吐量,考量降低与服务端交互的压力等,每次发送消息的请求并非是直接与 kafka 集群进行交互,而是一个异步的过程。

当调用 KafkaProducer#send 方法发送消息时,实际上只是将消息缓存到了本地的消息收集器中,Kafka 定义了一个 RecordAccumulator 收集器用于收集用户提交的消息数据,同时又在后台维护了一个 sender 线程,以异步的方式循环将收集器中缓存的消息定期定量地投递给 kafka 集群。

在本篇文章中,我们首先来回忆一下 KafkaProducer 的使用方式,然后重点分析消息的收集、缓存、投递,以及响应过程。

一. KafkaProducer 使用示例

KafkaProducer 往 kafka 发送消息需要依赖于客户端 SDK,kafka 提供了 多种语言的客户端 供开发者选择,这里我们以 kafka 内置的 java 客户端为例,介绍如何向 kafka 集群发送消息。示例:

Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-demo");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<Integer, String> producer = new KafkaProducer<>(properties);

boolean isAsync = true;

for (int i = 0; i < 10; i++) {
    if (isAsync) {
        // 异步发送消息
        producer.send(new ProducerRecord<>(DEFAULT_TOPIC, i, "zhenchao"), (metadata, e) -> {
            if (null != e) {
                // ... 处理错误
                return;
            }
            printResult(metadata); // 打印结果信息
        });
    } else {
        // 同步发送消息,指定 topic 和消息内容
        Future<RecordMetadata> future = producer.send(new ProducerRecord<>(DEFAULT_TOPIC, i, "zhenchao"));
        RecordMetadata metadata = future.get(10, TimeUnit.SECONDS);
        this.printResult(metadata); // 打印结果信息

    }
}

示例中发送消息依赖于 KafkaProducer 对象,KafkaProducer 类也是我们分析生产者运行机制的入口。创建该对象时我们需要指定 kafka 集群地址,以及消息 key 和 value 的序列化器,但是客户端 ID 不是必须指定的,后面在分析源码时会看到如果未明确指定客户端 ID,Kafka 会自动为当前客户端创建一个。

接着我们可以调用 KafkaProducer#send 方法向 kafka 集群特定的 topic 投递消息。消息在被投递之前需要封装成 ProducerRecord 对象,该对象封装了当前消息的目标 topic、目标分区,key、value,以及时间戳信息。ProducerRecord 的字段定义如下:

public class ProducerRecord<K, V> {

    /** 主题 */
    private final String topic;
    /** 分区 */
    private final Integer partition;
    /** 消息对应的 key */
    private final K key;
    /** 消息内容 */
    private final V value;
    /** 时间戳 */
    private final Long timestamp;

    // ... 省略方法定义

}

示例中我们定义了 isAsync 参数,需要说明的一点是,isAsync 参数虽然表面意思是指以异步的方式发送消息,但是本质上不管该参数如何设置,Kafka 都只有一种消息发送的方式,即异步发送。参数 isAsync 设置为 true 或者 false 的的意义在于指定如何获取消息发送的响应结果,区别在于:

isAsync=false
isAsync=true

二. 消息收集与发送过程分析

在具体开始分析消息的发送过程之前,我们需要明确 消息发送是一个异步的过程 ,该过程涉及到 2 个线程的协同工作,其中 1 个线程将待发送的消息写入缓冲区(即收集待发送消息),另外 1 个线程(Sender 线程)负责定期定量将缓冲区中的数据投递给 kafka 集群,并反馈投递结果。

2.1 收集待发送的消息

2.1.1 KafkaProducer 的字段定义与构造方法

首先来看一下 KafkaProducer 类的字段定义,如下:

public class KafkaProducer<K, V> implements Producer<K, V> {

    /** clientId 生成器,如果没有明确指定客户端 ID,则使用该字段顺序生成一个 */
    private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    /** 生产者唯一标识(对应 client.id 属性配置 ) */
    private String clientId;
    /** 分区选择器(对应 partitioner.class 属性配置),如果未明确指定分区,则基于一定的策略为消息选择合适的分区 */
    private final Partitioner partitioner;
    /** 消息的最大长度(对应 max.request.size 配置,包含消息头、序列化之后的 key 和 value) */
    private final int maxRequestSize;
    /** 发送单条消息的缓冲区大小(对应 buffer.memory 配置) */
    private final long totalMemorySize;
    /** kafka 集群元数据 */
    private final Metadata metadata;
    /** 消息收集器,用于收集并缓存消息,等待 Sender 线程的发送 */
    private final RecordAccumulator accumulator;
    /** 消息发送线程对象 */
    private final Sender sender;
    /** 消息发送线程 */
    private final Thread ioThread;
    /** 压缩算法(对应 compression.type 配置) */
    private final CompressionType compressionType;
    /** 时间戳工具 */
    private final Time time;
    /** key 序列化器(对应 key.serializer 配置) */
    private final Serializer<K> keySerializer;
    /** value 序列化器(对应 value.serializer 配置) */
    private final Serializer<V> valueSerializer;
    /** 封装配置信息 */
    private final ProducerConfig producerConfig;
    /** 等待更新 kafka 集群元数据的最大时长 */
    private final long maxBlockTimeMs;
    /** 消息发送的超时时间(从发送到收到 ACK 响应) */
    private final int requestTimeoutMs;
    /** 发送拦截器(对应 interceptor.classes 配置),用于待发送的消息进行拦截并修改,也可以对 ACK 响应进行拦截处理 */
    private final ProducerInterceptors<K, V> interceptors;

    // ... 省略方法定义

}

接下来继续看一下 KafkaProducer 类对象的构造过程,KafkaProducer 提供了多个重载的构造方法,这里来看一下最底层的构造方法:

private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    try {
        log.trace("Starting the Kafka producer");
        // 获取用户配置信息
        Map<String, Object> userProvidedConfigs = config.originals();
        this.producerConfig = config;
        this.time = Time.SYSTEM;

        // 尝试获取用户配置的 clientId,如果未配置则基于 PRODUCER_CLIENT_ID_SEQUENCE 顺序生成一个
        this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
        if (clientId.length() <= 0) {
            // 用户未指定 clientId,基于 PRODUCER_CLIENT_ID_SEQUENCE 顺序生成一个
            clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
        }

        // ... 省略打点相关注册逻辑

        // 获取配置的分区器对象(反射创建)
        this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
        // 获取生产者重试间隔
        long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);

        // 如果参数未指定 key 序列化器,则尝试从配置中获取 key 序列化器对象(反射创建)
        if (keySerializer == null) {
            this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
            this.keySerializer.configure(config.originals(), true);
        } else {
            config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
            this.keySerializer = keySerializer;
        }

        // 如果参数未指定 value 序列化器,则尝试从配置中获取 value 序列化器对象(反射创建)
        if (valueSerializer == null) {
            this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
            this.valueSerializer.configure(config.originals(), false);
        } else {
            config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
            this.valueSerializer = valueSerializer;
        }

        // load interceptors and make sure they get clientId
        userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);

        // 获取注册的拦截器列表
        List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false))
                .getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
        this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);

        ClusterResourceListeners clusterResourceListeners =
                this.configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);

        // 创建并更新 kafka 集群的元数据信息
        this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);
        // 获取并设置生产者发送请求的大小
        this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
        // 获取并设置生产者内存缓冲区大小,用于缓存要发送到服务器的消息
        this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
        // 获取并设置消息压缩算法,可以设置为 snappy、gzip 或 lz4,默认不压缩。
        this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));

        // ... 基于用户配置设置 maxBlockTimeMs 和 requestTimeoutMs,省略

        // 创建消息收集器,用于异步发送消息
        this.accumulator = new RecordAccumulator(
                config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), // 指定每个批次的大小(单位:字节)
                this.totalMemorySize,
                this.compressionType,
                config.getLong(ProducerConfig.LINGER_MS_CONFIG), // 消息缓存超时发送时间
                retryBackoffMs,
                metrics,
                time);

        // 获取 kafka 集群主机列表
        List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
        // 更新 kafka 集群元数据信息
        this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
        ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
        // 创建 NetworkClient 对象,NetworkClient 是 producer 网络 I/O 的核心
        NetworkClient client = new NetworkClient(
                new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, "producer", channelBuilder),
                metadata,
                clientId,
                config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
                config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                requestTimeoutMs,
                time,
                true);

        // 创建并启动 Sender 线程
        this.sender = new Sender(
                client,
                metadata,
                accumulator,
                config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
                config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                config.getInt(ProducerConfig.RETRIES_CONFIG),
                metrics,
                Time.SYSTEM,
                requestTimeoutMs);
        String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
        this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
        this.ioThread.start();

        // 打印未使用的配置
        config.logUnused();
        AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
        log.debug("Kafka producer started");
    } catch (Throwable t) {
        // ... 省略异常处理
    }
}

2.1.2 消息收集的过程

了解了 KafkaProducer 的字段定义和对象的构造过程之后,下面正式开始对消息收集的过程进行分析,相关实现位于 KafkaProducer#send 方法中:

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // 遍历注册的拦截器对待发送的消息执行拦截修改
    ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
    // 调用 doSend 方法开始发送消息
    return this.doSend(interceptedRecord, callback);
}

该方法只是简单应用了注册的拦截器对发送的消息进行拦截修改,而具体消息收集的过程则封装在 KafkaProducer#doSend 方法中,实现如下:

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    try {
        // 1. 获取 kafka 集群元数据信息,如果当前请求的是新 topic,或者指定的分区超过已知的分区范围,则会触发更新集群元数据信息
        ClusterAndWaitTime clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
        long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
        Cluster cluster = clusterAndWaitTime.cluster;

        // 2 基于注册的序列化器对 key 执行序列化
        byte[] serializedKey;
        try {
            serializedKey = keySerializer.serialize(record.topic(), record.key());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer");
        }

        // 3. 基于注册的序列化器对 value 执行序列化
        byte[] serializedValue;
        try {
            serializedValue = valueSerializer.serialize(record.topic(), record.value());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer");
        }

        // 4. 为当前消息选择合适的分区,如果未明确指定的话,则基于注册的分区器为当前消息计算分区
        int partition = this.partition(record, serializedKey, serializedValue, cluster);

        /* 5. 将消息追加到消息收集器(RecordAccumulator)中 */

        // 计算当前消息大小,并校验消息是否过大
        int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
        this.ensureValidRecordSize(serializedSize);
        tp = new TopicPartition(record.topic(), partition); // 消息投递的目标 topic 分区
        // 如果未明确为当前消息指定时间戳,则设置为当前时间戳
        long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
        log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
        // producer callback will make sure to call both 'callback' and interceptor callback
        Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
        // 追加消息到收集器中
        RecordAccumulator.RecordAppendResult result = accumulator.append(
                tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);

        /* 6. 条件性唤醒消息发送线程 */

        // 如果队列中不止一个 RecordBatch,或者最后一个 RecordBatch 满了,或者有创建新的 RecordBatch,则唤醒 Sender 线程发送消息
        if (result.batchIsFull || result.newBatchCreated) {
            log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
            // 唤醒 sender 线程,发送消息
            this.sender.wakeup();
        }
        return result.future;
    }
    // ... 省略异常处理
}

我们可以将消息的发送过程概括为以下 6 个步骤:

  1. 获取集群的元数据(Metadata)信息,如果请求的是新 topic,或者指定的分区 ID 超过了已知的合法区间,则触发更新本地缓存的集群元数据信息;
  2. 基于注册的 key 序列化器对消息的 key 执行序列化;
  3. 基于注册的 value 序列化器对消息的 value 执行序列化;
  4. 如果未指定目标 topic 分区,则基于注册的分区器为当前消息计算目标分区;
  5. 缓存消息到消息收集器 RecordAccumulator 中;
  6. 条件性唤醒消息发送线程。

下面逐一对上述过程中的 6 个步骤展开分析。首先来看一下获取集群元数据信息的过程( 步骤 1 ),KafkaProducer 本地会缓存集群的元数据信息,当客户端向集群投递消息时实际上是投递到了目标 topic 指定分区的 leader 副本上。因为集群状态是动态变化的,leader 副本所在的网络位置也会发生迁移,所以客户端在投递消息之前,需要确保本地所缓存的集群信息是最新的,否则需要标记当前集群信息需要更新,具体的更新操作由 sender 线程完成。

KafkaProducer 在发送消息之前会先调用 KafkaProducer#waitOnMetadata 方法获取集群元数据信息,如果感知到本地缓存的集群元数据信息已经过期,则会通知 sender 线程进行更新。首先来看一下保存集群元数据信息的 Metadata 类的字段定义:

public final class Metadata {

    /** 元数据最小更新时间间隔,默认是 100 毫秒,防止更新太频繁 */
    private final long refreshBackoffMs;
    /** 元数据更新时间间隔,默认为 5 分钟 */
    private final long metadataExpireMs;
    /** 元数据版本号,每更新成功一次则版本号加 1 */
    private int version;
    /** 上一次更新元数据的时间戳,不管成功还是失败 */
    private long lastRefreshMs;
    /** 上一次成功更新元数据的时间戳 */
    private long lastSuccessfulRefreshMs;
    /** 集群信息 */
    private Cluster cluster;
    /** 标记是否需要更新集群元数据信息 */
    private boolean needUpdate;
    /** 记录集群中所有的 topic 信息,key 是 topic,value 是 topic 过期的时间戳 */
    private final Map<String, Long> topics;
    /** 元数据更新监听器 */
    private final List<Listener> listeners;
    /** 标记是否需要更新所有 topic 的元数据信息,一般只更新当前用到的 topic 的元数据信息 */
    private boolean needMetadataForAllTopics;
    /** 是否允许 topic 过期 */
    private final boolean topicExpiryEnabled;

    // ... 省略方法定义

}

下面继续来看一下 KafkaProducer#waitOnMetadata 方法的实现:

private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
    // 添加 topic 到集合中,如果是新 topic,标记需要更新集群元数据信息
    metadata.add(topic);
    // 获取当前集群信息
    Cluster cluster = metadata.fetch();
    // 获取指定 topic 的分区数目
    Integer partitionsCount = cluster.partitionCountForTopic(topic);

    // 如果参数未指定分区,或指定的分区在当前记录的分区范围之内,则返回历史集群信息
    if (partitionsCount != null && (partition == null || partition < partitionsCount)) {
        return new ClusterAndWaitTime(cluster, 0);
    }

    /* 否则,当前缓存的集群元数据信息可能已经过期,需要进行更新 */

    long begin = time.milliseconds();
    long remainingWaitMs = maxWaitMs; // 剩余等待时间
    long elapsed;

    /* 请求集群的元数据信息,直到获取到信息或者超时 */
    do {
        log.trace("Requesting metadata update for topic {}.", topic);
        // 更新 Metadata 的 needUpdate 字段,并获取当前元数据的版本号
        int version = metadata.requestUpdate();
        // 唤醒 sender 线程,由 sender 线程负责更新元数据信息
        sender.wakeup();
        try {
            // 等待元数据更新完成
            metadata.awaitUpdate(version, remainingWaitMs);
        } catch (TimeoutException ex) {
            // 等待超时
            throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
        }

        // 获取更新后的集群信息
        cluster = metadata.fetch();
        elapsed = time.milliseconds() - begin;
        if (elapsed >= maxWaitMs) {
            // 等待超时
            throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
        }
        // 权限检测
        if (cluster.unauthorizedTopics().contains(topic)) {
            throw new TopicAuthorizationException(topic);
        }
        remainingWaitMs = maxWaitMs - elapsed; // 更新剩余等待时间
        partitionsCount = cluster.partitionCountForTopic(topic); // 获取指定 topic 的分区数目
    } while (partitionsCount == null); // 更新集群信息失败,继续重试

    /* 更新集群信息成功 */

    // 参数指定的分区非法
    if (partition != null && partition >= partitionsCount) {
        throw new KafkaException(String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
    }

    return new ClusterAndWaitTime(cluster, elapsed);
}

上述方法首先会尝试将当前 topic 加入到本地缓存的 topic 集合中,因为客户端对于 topic 会有一个过期机制,对于长时间未使用的 topic 会从本地缓存中移除。这里一开始调用 Metadata#add 方法除了标记当前 topic 是活跃的之外,另外一个目的在于判断本地是否有该 topic 的缓存信息,如果没有则需要通知 sender 线程更新集群元数据信息。通知的过程实际上只是简单将 Metadata#needUpdate 字段设置为 true,Sender 线程会检查该字段以更新集群元数据信息。

接下来会调用 Metadata#fetch 方法获取集群信息 Cluster 对象,Cluster 类是对集群节点、topic、分区等信息的一个封装,其字段定义如下:

public final class Cluster {

    /** kafka 集群中的节点信息列表(包括 id、host、port 等信息) */
    private final List<Node> nodes;
    /** 未授权的 topic 集合 */
    private final Set<String> unauthorizedTopics;
    /** 内部 topic 集合 */
    private final Set<String> internalTopics;
    /** 记录 topic 分区与分区详细信息的映射关系 */
    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
    /** 记录 topic 及其分区信息的映射关系 */
    private final Map<String, List<PartitionInfo>> partitionsByTopic;
    /** 记录 topic 及其分区信息的映射关系(必须包含 leader 副本) */
    private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
    /** 记录节点 ID 与分区信息的映射关系 */
    private final Map<Integer, List<PartitionInfo>> partitionsByNode;
    /** key 是 brokerId,value 是 broker 节点信息,方便基于 brokerId 获取对应的节点信息 */
    private final Map<Integer, Node> nodesById;

    // ... 省略方法定义

}

其中 Node、TopicPartition 和 PartitionInfo 类定义比较简单,其作用分别为:

  • Node :封装 kafka 节点信息,包括 ID、主机名,以及端口号等信息。
  • TopicPartition :封装分区摘要信息,包含分区所属 topic 和分区编号。
  • PartitionInfo :封装分区详细信息,包括分区所属 topic、分区编号、leader 副本所在节点、全部副本所在节点列表,以及 ISR 副本所在节点列表。

继续回到 KafkaProducer#waitOnMetadata 方法。接下来方法会判断是否需要更新集群元数据信息,判断的依据是当前本地缓存的目标 topic 的分区数目不为空,同时如果发送消息时明确指定了分区编号,则此编号必须在本地认为合法的分区编号区间范围内。如果能够满足这些条件,则认为本地缓存的集群信息是合法的,可以直接拿来使用,否则就会触发更新集群元数据的逻辑。如果需要更新集群元数据,则会调用 Metadata#requestUpdate 方法设置标记位,同时唤醒 sender 线程进行处理,并等待集群元数据更新完成。判定更新完成的策略就是判定本地缓存的集群元数据的版本号( Metadata#version 字段)是否被更新,因为集群元数据每更新成功一次,版本号会加 1。如果等待过程超时则会抛出 TimeoutException 异常。

回到 KafkaProducer#doSend 方法,在拿到集群信息之后,方法会基于配置的 key 和 value 序列化器分别对消息 ID 和消息内容进行序列化( 步骤 2步骤 3 ),这一过程比较简单。 步骤 4 会为当前消息选择合适的分区,相关实现位于 KafkaProducer#partition 方法中:

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    // 获取当前待发送消息所指定的分区
    Integer partition = record.partition();
    // 如果未指定分区,则为当前消息计算一个分区编号
    return partition != null ?
            partition :
            partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

如果我们在发送消息时明确指定了分区编号,那么这里只是简单的返回该编号,否则就需要基于注册的分区器计算当前消息对应的分区编号。Partitioner 接口是分区器的抽象,我们可以实现该接口自定义分区器,Kafka 也提供了默认的分区器实现 DefaultPartitioner,分区算法实现如下:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    // 获取当前 topic 的分区详细信息
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    // 获取当前 topic 对应的分区数
    int numPartitions = partitions.size();
    // 如果没有设置 key,则基于轮询算法
    if (keyBytes == null) {
        // 获取当前 topic 对应的上次位置值加 1,如果是第一次则随机生成一个
        int nextValue = this.nextValue(topic);
        // 获取当前 topic 包含 leader 副本的分区详细信息
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    }
    // 如果指定了 key,则使用 murmur2 算法对 key 做哈希取模
    else {
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

默认分区器 DefaultPartitioner 依据消息的 key 计算分区,如果在发送消息时未指定 key,则默认分区器会基于 Round-Robin 算法计算分区编号,以保证目标 topic 分区的负载均衡。否则会基于 32 位的 murmur2 哈希算法计算 key 的哈希值,并与分区数取模得到最后的分区编号。

步骤 5会计算并校验当前消息的大小,同时为消息附加时间戳,并最终调用 RecordAccumulator#append 方法将消息缓存到收集器 RecordAccumulator 中,等待 sender 线程投递给 kafka 集群。RecordAccumulator 是生产者 SDK 中非常重要的一个类,可以将其看做是一个本地缓存消息的队列,消息收集线程将消息最终记录到收集器中,而 sender 线程会定期定量从收集器中取出缓存的消息,并投递给 kafka 集群。RecordAccumulator 类字段定义如下:

public final class RecordAccumulator {

    /** 标识当前收集器是否被关闭,对应 producer 被关闭 */
    private volatile boolean closed;
    /** 记录正在执行 flush 操作的线程数 */
    private final AtomicInteger flushesInProgress;
    /** 记录正在执行 append 操作的线程数 */
    private final AtomicInteger appendsInProgress;
    /** 指定每个 RecordBatch 中 ByteBuffer 的大小 */
    private final int batchSize;
    /** 消息压缩类型 */
    private final CompressionType compression;
    /** 通过参数 linger.ms 指定,当本地消息缓存时间超过该值时,即使消息量未达到阈值也会进行投递 */
    private final long lingerMs;
    /** 生产者重试时间间隔 */
    private final long retryBackoffMs;
    /** 缓存(ByteBuffer)管理工具 */
    private final BufferPool free;
    /** 时间戳工具 */
    private final Time time;
    /** 记录 topic 分区与 RecordBatch 的映射关系,对应的消息都是发往对应的 topic 分区 */
    private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
    /** 记录未发送完成(即未收到服务端响应)的消息集合 */
    private final IncompleteRecordBatches incomplete;
    /**
     * 消息顺序性保证,
     * 缓存当前待发送消息的目标 topic 分区,防止对于同一个 topic 分区同时存在多个未完成的消息,可能导致消息顺序性错乱
     */
    private final Set<TopicPartition> muted;
    /** 记录 drain 方法批量导出消息时上次的偏移量 */
    private int drainIndex;

    // ... 省略方法定义

}

既然 RecordAccumulator 可以看做是一个消息缓存队列,那么这里先了解一下其消息存储的模式。这其中涉及到 RecordAccumulator、RecordBatch、MemoryRecords 和 MemoryRecordsBuilder 4 个类。从上面 RecordAccumulator 类的字段列表中我们看到有一个 ConcurrentMap<TopicPartition, Deque<RecordBatch>> 类型的 batches 字段,这里的 key 对应 topic 的某个分区,而 value 是一个 Deque 类型,其中封装了一批 RecordBatch 对象,这些对象中记录了待发送的消息集合,而这些消息的一个共同点就是都是发往相同的 topic 分区。RecordBatch 类字段定义如下:

public final class RecordBatch {

    /** 当前 RecordBatch 创建的时间戳 */
    final long createdMs;
    /** 当前缓存的消息的目标 topic 分区 */
    final TopicPartition topicPartition;
    /** 标识当前 RecordBatch 发送之后的状态 */
    final ProduceRequestResult produceFuture;
    /** 消息的 Callback 队列,每个消息都对应一个 Callback 对象 */
    private final List<Thunk> thunks = new ArrayList<>();
    /** 用来存储数据的 {@link MemoryRecords} 对应的 builder 对象 */
    private final MemoryRecordsBuilder recordsBuilder;
    /** 发送当前 RecordBatch 的重试次数 */
    volatile int attempts;
    /** 最后一次重试发送的时间戳` */
    long lastAttemptMs;
    /** 记录保存的 record 个数 */
    int recordCount;
    /** 记录最大的 record 字节数 */
    int maxRecordSize;
    /** 记录上次投递当前 BatchRecord 的时间戳 */
    long drainedMs;
    /** 追后一次向当前 RecordBatch 追加消息的时间戳 */
    long lastAppendTime;
    /** 标记是否正在重试 */
    private boolean retry;

    // ... 省略方法定义

}

我们可以从字段定义中看到 RecordBatch 持有一个 MemoryRecordsBuilder 类型的字段,MemoryRecordsBuilder 是 MemoryRecords 的构造和管理器,也就是说 RecordBatch 本质上是以 MemoryRecords 作为存储介质。

了解了 RecordAccumulator 类在存储模式上的设计之后,我们接下来分析 RecordAccumulator#append 方法的实现:

public RecordAppendResult append(TopicPartition tp,
                                 long timestamp,
                                 byte[] key,
                                 byte[] value,
                                 Callback callback,
                                 long maxTimeToBlock) throws InterruptedException {
    // 记录正在向收集器中追加消息的线程数
    appendsInProgress.incrementAndGet();
    try {
        // 获取当前 topic 分区对应的 Deque,如果不存在则创建一个
        Deque<RecordBatch> dq = this.getOrCreateDeque(tp);
        synchronized (dq) {
            if (closed) {
                // producer 已经被关闭了,抛出异常
                throw new IllegalStateException("Cannot send after the producer is closed.");
            }
            // 向 Deque 中最后一个 RecordBatch 追加 Record,并返回对应的 RecordAppendResult 对象
            RecordAppendResult appendResult = this.tryAppend(timestamp, key, value, callback, dq);
            if (appendResult != null) {
                // 追加成功,直接返回
                return appendResult;
            }
        }

        /* 追加 Record 失败,尝试申请新的 buffer */

        int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
        log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
        // 申请新的 buffer
        ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
        synchronized (dq) {
            if (closed) {
                // 再次校验 producer 状态,如果已经被关闭了,抛出异常
                throw new IllegalStateException("Cannot send after the producer is closed.");
            }

            // 再次尝试向 Deque 中最后一个 RecordBatch 追加 Record
            RecordAppendResult appendResult = this.tryAppend(timestamp, key, value, callback, dq);
            if (appendResult != null) {
                // 追加成功则返回,同时归还之前申请的 buffer
                free.deallocate(buffer);
                return appendResult;
            }

            /* 仍然追加失败,创建一个新的 RecordBatch 进行追加 */

            MemoryRecordsBuilder recordsBuilder = MemoryRecords.builder(buffer, compression, TimestampType.CREATE_TIME, this.batchSize);
            RecordBatch batch = new RecordBatch(tp, recordsBuilder, time.milliseconds());
            // 在新创建的 RecordBatch 中追加 Record
            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
            dq.addLast(batch);
            // 追加到未完成的集合中
            incomplete.add(batch);
            // 封装成 RecordAppendResult 对象返回
            return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
        }
    } finally {
        appendsInProgress.decrementAndGet();
    }
}

追加消息到收集器的过程首先会获取指定 topic 分区对应的发送队列,如果不存在则会创建一个。然后同步往该队列的最后一个 RecordBatch 对象中追加数据,追加的过程位于 RecordAccumulator#tryAppend 方法中。如果追加失败,一般都是因为该 RecordBatch 没有足够的空间足以容纳,则方法会尝试申请新的空间,然后继续尝试追加。如果还是失败,则方法会创建一个新的 RecordBatch 对象进行追加。整个过程多次调用到 RecordAccumulator#tryAppend 方法,下面来看一下该方法的实现:

private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) {
    // 获取 deque 的最后一个 RecordBatch
    RecordBatch last = deque.peekLast();
    if (last != null) {
        // 尝试往该 RecordBatch 末尾追加消息
        FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
        if (future == null) {
            // 追加失败
            last.close();
        } else {
            // 追加成功,将结果封装成 RecordAppendResult 对象返回
            return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
        }
    }
    return null;
}

// org.apache.kafka.clients.producer.internals.RecordBatch#tryAppend
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
    // 检测是否还有多余的空间容纳该消息
    if (!recordsBuilder.hasRoomFor(key, value)) {
        // 没有多余的空间则直接返回,后面会尝试申请新的空间
        return null;
    }
    // 添加当前消息到 MemoryRecords,并返回消息对应的 CRC32 校验码
    long checksum = this.recordsBuilder.append(timestamp, key, value);
    // 更新最大 record 字节数
    this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
    // 更新最后一次追加记录时间戳
    this.lastAppendTime = now;
    FutureRecordMetadata future = new FutureRecordMetadata(
            produceFuture, recordCount,
            timestamp, checksum,
            key == null ? -1 : key.length,
            value == null ? -1 : value.length);
    if (callback != null) {
        // 如果指定了 Callback,将 Callback 和 FutureRecordMetadata 封装到 Trunk 中
        thunks.add(new Thunk(callback, future));
    }
    this.recordCount++;
    return future;
}

上面过程最终调用 MemoryRecordsBuilder#append 方法将消息追加到 MemoryRecords 相应的位置进行存储,并返回消息的 CRC32 校验码,至于 MemoryRecords 存储消息的细节这里不再继续深入。消息追加成功之后,如果在发送消息时指定了 Callback 函数,那么这里会将其封装成 Thunk 类对象,至于其作用这里先不展开分析,等到后面分析 sender 线程的执行过程时再一探究竟,这里初步猜测 sender 线程在向集群投递完消息并收到来自集群的响应时,会循环遍历 thunks 集合,并应用 Callback 对应的回调方法。

回到 KafkaProducer#doSend 方法,来看最后一步( 步骤 6 )。上面追加的过程会返回一个 RecordAppendResult 对象,该对象通过 RecordAppendResult#batchIsFullRecordAppendResult#newBatchCreated 两个字段分别标记了追加过程中末端的 RecordBatch 是否已满,以及追加过程中是否有创建新的 RecordBatch 对象,如果这两个条件满足其中之一,则会唤醒 sender 线程尝试向集群投递收集的消息数据。

2.2 投递待发送的消息

前面曾提出一个概念,即客户端发送消息的过程实际上是一个异步的过程,由 2 个线程协同执行,其中 1 个线程将待发送的消息写入缓冲区,另外 1 个线程(Sender 线程)负责定期定量将缓冲区中的数据投递给远端 kafka 集群,并反馈投递结果。上面我们分析了过程 1,下面我们继续分析过程 2,即将缓存的消息发送给 kafka 集群。

这一过程由 sender 线程负责执行,前面的分析中曾多次唤醒过该线程,下面来看一下其实现,位于 Sender 类中,该类实现了 java.lang.Runnable 接口,其 Sender#run 方法实现如下:

public void run() {

    // 主循环,一直运行直到 KafkaProducer 被关闭
    while (running) {
        try {
            this.run(time.milliseconds());
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }

    /* 如果 KafkaProducer 被关闭,尝试发送剩余的消息 */
    while (!forceClose // 不是强制关闭
            // 存在未发送或已发送待响应的请求
            && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
        try {
            this.run(time.milliseconds());
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }

    // 如果是强制关闭,忽略所有未发送和已发送待响应的请求
    if (forceClose) {
        // 丢弃所有未发送完成的消息
        this.accumulator.abortIncompleteBatches();
    }
    try {
        // 关闭网络连接
        this.client.close();
    } catch (Exception e) {
        log.error("Failed to close network client", e);
    }
}

由上述方法实现可知,Sender 线程在启动后会一直循环执行另外一个重载版本的 Sender#run 方法,其中包含了 sender 线程的主要逻辑。如果客户端被关闭(一般都是调用 KafkaProducer#close 方法),在不是强制关闭的前提下,Sender 线程会继续处理本地未发送和已发送但未收到服务端确认的消息,如果是强制关闭(在调用 KafkaProducer#close 方法时允许指定超时等待时间,如果在既定时间内客户端仍未完成对缓存消息的处理,则会触发强制关闭机制),则会丢弃本地缓存的所有未发送的消息,最后关闭到 kafka 集群的网络连接。

下面来看一下 sender 线程的核心实现,即重载版本的 Sender#run 方法:

void run(long now) {

    // 1. 计算需要以及可以向哪些节点发送请求
    Cluster cluster = metadata.fetch(); // 获取 kafka 集群信息
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); // 计算需要向哪些节点发送请求

    // 2. 如果存在未知的 leader 副本对应的节点(对应的 topic 分区正在执行 leader 选举,或者对应的 topic 已经失效),标记需要更新缓存的集群元数据信息
    if (!result.unknownLeaderTopics.isEmpty()) {
        for (String topic : result.unknownLeaderTopics) this.metadata.add(topic);
        this.metadata.requestUpdate();
    }

    // 3. 遍历处理待发送请求的目标节点,基于网络 IO 检查对应节点是否可用,对于不可用的节点则剔除
    Iterator<Node> iter = result.readyNodes.iterator();
    long notReadyTimeout = Long.MAX_VALUE;
    while (iter.hasNext()) {
        Node node = iter.next();
        // 检查目标节点是否准备好接收请求,如果未准备好但目标节点允许创建连接,则创建到目标节点的连接
        if (!this.client.ready(node, now)) {
            // 对于未准备好的节点,则从 ready 集合中删除
            iter.remove();
            notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
        }
    }

    // 4. 获取每个节点待发送消息集合,其中 key 是目标 leader 副本所在节点 ID
    Map<Integer, List<RecordBatch>> batches =
            this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);

    // 5. 如果需要保证消息的强顺序性,则缓存对应 topic 分区对象,防止同一时间往同一个 topic 分区发送多条处于未完成状态的消息
    if (guaranteeMessageOrder) {
        // 将所有 RecordBatch 的 topic 分区对象加入到 muted 集合中
        // 防止同一时间往同一个 topic 分区发送多条处于未完成状态的消息
        for (List<RecordBatch> batchList : batches.values()) {
            for (RecordBatch batch : batchList)
                this.accumulator.mutePartition(batch.topicPartition);
        }
    }

    // 6. 处理本地过期的消息,返回 TimeoutException,并释放空间
    List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);

    // 如果存在待发送的消息,则设置 pollTimeout 等于 0,这样可以立即发送请求,从而能够缩短剩余消息的缓存时间,避免堆积
    long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
    if (!result.readyNodes.isEmpty()) {
        log.trace("Nodes with data ready to send: {}", result.readyNodes);
        pollTimeout = 0;
    }

    // 7. 发送请求到服务端,并处理服务端响应
    this.sendProduceRequests(batches, now);
    this.client.poll(pollTimeout, now);
}

发送收集器 RecordAccumulator 中缓存的消息的整体执行流程可以概括为如下 7 个步骤:

  1. 计算需要向哪些 broker 节点投递消息;
  2. 如果步骤 1 中发现一些 topic 分区的 leader 副本所在 broker 节点失效,则需要标记更新本地缓存的集群元数据信息;
  3. 遍历处理步骤 1 中获取到的 broker 节点集合,基于 IO 检测对应节点是否可用,如果不可用则剔除;
  4. 以 broker 节点 ID 为键,获取发往目标节点的消息集合;
  5. 如果需要对消息顺序进行强一致性保证,则需要缓存当前目标 topic 分区对象,防止同一时间往同一个 topic 分区发送多条处于未完成状态的消息;
  6. 处理本地已过期的消息,返回超时异常,并释放占据的空间;
  7. 发送消息到服务端,并处理服务端的响应。

下面就各个步骤展开说明,首先来看 步骤 1 ,该步骤用于计算需要向哪些节点投递消息,实现位于 RecordAccumulator#ready 方法中:

public ReadyCheckResult ready(Cluster cluster, long nowMs) {
    // 用于记录接收请求的节点
    Set<Node> readyNodes = new HashSet<>();
    // 记录下次执行 ready 判断的时间间隔
    long nextReadyCheckDelayMs = Long.MAX_VALUE;
    // 记录找不到 leader 副本的分区对应的 topic 集合
    Set<String> unknownLeaderTopics = new HashSet<>();

    // 是否有线程在等待 BufferPool 分配空间
    boolean exhausted = this.free.queued() > 0;
    // 遍历每个 topic 分区及其 RecordBatch 队列,对每个分区的 leader 副本所在的节点执行判定
    for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
        TopicPartition part = entry.getKey();
        Deque<RecordBatch> deque = entry.getValue();

        // 获取当前 topic 分区 leader 副本所在的节点
        Node leader = cluster.leaderFor(part);
        synchronized (deque) {
            // 当前分区 leader 副本未知,但存在发往该分区的消息
            if (leader == null && !deque.isEmpty()) {
                unknownLeaderTopics.add(part.topic());
            }
            // 如果需要保证消息顺序性,则不应该存在多个发往该 leader 副本节点且未完成的消息
            else if (!readyNodes.contains(leader) && !muted.contains(part)) {
                RecordBatch batch = deque.peekFirst();
                if (batch != null) {
                    // 当前为重试操作,且重试时间间隔未达到阈值时间
                    boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
                    long waitedTimeMs = nowMs - batch.lastAttemptMs; // 重试等待的时间
                    long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                    long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                    boolean full = deque.size() > 1 || batch.isFull();
                    boolean expired = waitedTimeMs >= timeToWaitMs;

                    // 标记当前节点是否可以接收请求
                    boolean sendable = full // 1. 队列中有多个 RecordBatch,或第一个 RecordBatch 已满
                            || expired // 2. 当前等待重试的时间过长
                            || exhausted // 3. 有其他线程在等待 BufferPoll 分配空间,即本地消息缓存已满
                            || closed // 4. producer 已经关闭
                            || flushInProgress(); // 5. 有线程正在等待 flush 操作完成
                    if (sendable && !backingOff) {
                        // 允许发送消息,且当前为首次发送,或者重试等待时间已经较长,则记录目标 leader 副本所在节点
                        readyNodes.add(leader);
                    } else {
                        // 更新下次执行 ready 判定的时间间隔
                        nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                    }
                }
            }
        }
    }

    // 封装结果返回
    return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

整个计算的逻辑就是遍历我们之前缓存到收集器 RecordAccumulator 中的消息集合,并按照下面 5 个条件进行判定,如果满足其中一个则认为需要往目标节点投递消息:

  1. 当前 topic 名下的消息队列持有多个 RecordBatch,或者第 1 个 RecordBatch 已满;
  2. 当前 topic 分区等待重试的时间过长,如果是首次发送则无需校验重试等待时间;
  3. 当前 topic 分区下有其他线程在等待 BufferPoll 分配空间,即本地缓存已满;
  4. Producer 被关闭,需要立即投递剩余未完成的消息;
  5. 有线程正在等待 flush 操作完成,则需要立即投递消息,避免线程等待时间过长。

如果遍历过程中发现某个 topic 分区对应的 leader 副本所在节点失效(对应的 topic 分区正在执行 leader 副本选举,或者对应的 topic 已经失效),但是本地又缓存了发往该分区的消息,则需要标记当前本地缓存的集群元数据需要更新( 步骤 2 )。上面获取目标 broker 节点的过程是站在收集器 RecordAccumulator 的角度看的,对于一个节点是否可用,还需要从网络 IO 的角度检查其连通性,这也是 步骤 3 所要做的工作,这一步基于 KafkaClient#ready 方法检查目标节点的是否连通,如果目标节点并未准备好接收请求,则需要从待请求节点集合中剔除。

知道了需要向哪些节点投递消息,接下来自然而然就需要获取发往每个节点的数据, 步骤 4 的实现位于 RecordAccumulator#drain 方法中:

public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
    if (nodes.isEmpty()) {
        return Collections.emptyMap();
    }

    // 记录转换后的结果,key 是目标节点 ID
    Map<Integer, List<RecordBatch>> batches = new HashMap<>();
    for (Node node : nodes) {
        int size = 0;
        // 获取当前节点上的分区信息
        List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
        // 记录待发往当前节点的 RecordBatch 集合
        List<RecordBatch> ready = new ArrayList<>();
        /*
         * drainIndex 用于记录上次发送停止的位置,本次继续从当前位置开始发送,
         * 如果每次都是从 0 位置开始,可能会导致排在后面的分区饿死,可以看做是一个简单的负载均衡策略
         */
        int start = drainIndex = drainIndex % parts.size();
        do {
            PartitionInfo part = parts.get(drainIndex);
            TopicPartition tp = new TopicPartition(part.topic(), part.partition());
            // 如果需要保证消息强顺序性,则不应该同时存在多个发往目标分区的消息
            if (!muted.contains(tp)) {
                // 获取当前分区对应的 RecordBatch 集合
                Deque<RecordBatch> deque = this.getDeque(new TopicPartition(part.topic(), part.partition()));
                if (deque != null) {
                    synchronized (deque) {
                        RecordBatch first = deque.peekFirst();
                        if (first != null) {
                            // 重试 && 重试时间间隔未达到阈值时间
                            boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
                            // 仅发送第一次发送,或重试等待时间较长的消息
                            if (!backoff) {
                                if (size + first.sizeInBytes() > maxSize && !ready.isEmpty()) {
                                    // 单次消息数据量已达到上限,结束循环,一般对应一个请求的大小,防止请求消息过大
                                    break;
                                } else {
                                    // 每次仅获取第一个 RecordBatch,并放入 read 列表中,这样给每个分区一个机会,保证公平,防止饥饿
                                    RecordBatch batch = deque.pollFirst();
                                    // 将当前 RecordBatch 设置为只读
                                    batch.close();
                                    size += batch.sizeInBytes();
                                    ready.add(batch);
                                    batch.drainedMs = now;
                                }
                            }
                        }
                    }
                }
            }
            // 更新 drainIndex
            this.drainIndex = (this.drainIndex + 1) % parts.size();
        } while (start != drainIndex);
        batches.put(node.id(), ready);
    }
    return batches;
}

上述方法的返回类型是 Map<Integer, List<RecordBatch>> ,其中 key 是目标节点的 ID,value 是本次待发往该节点的消息集合。为了防止饥饿,方法会轮询从当前 topic 的每个分区队列对头取数据,并记录每次轮询的偏移量,下次轮询即从该偏移量位置开始,以保证尽量的公平。

下面来看一下 步骤 5 ,这是客户端保证消息绝对有序的逻辑。在具体分析之前,我们先来看一个导致消息顺序错乱的场景。假设生产者发送了 2 条指向同一个目标 topic 分区的消息 A 和 B,但是 A 发送失败,B 却成功了,此时生产者会重发消息 A,结果就变成了 B 消息排在了 A 消息的前面。解决该问题的方法就是将参数 max.in.flight.requests.per.connection 参数设置为 1,以禁止生产者往同一个分区一次发送多条消息,不过这样会严重降低系统吞吐量,只有在对消息顺序有严格要求时才推荐这样做。步骤 5 的参数 guaranteeMessageOrder=true 对应着 max.in.flight.requests.per.connection=1 ,客户端解决上述问题的实现方式也很简单,就是在本地缓存有处于发送中消息对应的目标 topic 分区对象,保证该分区上的消息在被正确响应之前不会再投递第 2 条消息。

下面继续来看 步骤 6 ,这一步会遍历收集器 RecordAccumulator 中缓存的 RecordBatch,并调用 RecordBatch#maybeExpire 方法检测当前 RecordBatch 是否过期,对于已经过期的 RecordBatch 会执行相应的 RecordBatch#done 方法(下一步中会对该方法展开说明),并释放占用的内存空间。

最后我们来看一下消息发送的过程( 步骤 7 ),位于 Sender#sendProduceRequests 方法中:

private void sendProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {
    // 遍历处理待发送消息集合,key 是目标节点 ID
    for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
        this.sendProduceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue());
}

private void sendProduceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
    // 遍历 RecordBatch 集合,整理成 produceRecordsByPartition 和 recordsByPartition
    Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
    final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<>(batches.size());
    for (RecordBatch batch : batches) {
        TopicPartition tp = batch.topicPartition;
        produceRecordsByPartition.put(tp, batch.records());
        recordsByPartition.put(tp, batch);
    }

    // 创建 ProduceRequest 请求构造器
    ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(acks, timeout, produceRecordsByPartition);

    // 创建回调对象,用于处理响应
    RequestCompletionHandler callback = new RequestCompletionHandler() {
        @Override
        public void onComplete(ClientResponse response) {
            handleProduceResponse(response, recordsByPartition, time.milliseconds());
        }
    };

    String nodeId = Integer.toString(destination);

    // 创建 ClientRequest 请求对象,如果 acks 不等于 0 则表示期望获取服务端响应
    ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
    // 缓存 ClientRequest 请求对象到 InFlightRequests 中
    client.send(clientRequest, now);
    log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}

这一步主要逻辑就是创建客户端请求 ClientRequest 对象,并通过 NetworkClient#send 方法将请求加入到网络 IO 通道(KafkaChannel)中,同时将该对象缓存到 InFlightRequests 中,等接收到服务端响应时会通过缓存的 ClientRequest 对象调用对应的 callback 方法。最后调用 NetworkClient#poll 方法执行具体的网络请求和响应,下面来看一下该方法的具体实现:

public List<ClientResponse> poll(long timeout, long now) {
    /*
     * 如果距离上次更新超过指定时间,且存在负载小的目标节点,
     * 则创建 MetadataRequest 请求更新本地缓存的集群元数据信息,并在下次执行 poll 操作时一并送出
     */
    long metadataTimeout = metadataUpdater.maybeUpdate(now);

    /* 发送网络请求 */
    try {
        this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
    } catch (IOException e) {
        log.error("Unexpected error during I/O", e);
    }

    /* 处理服务端响应 */

    long updatedNow = this.time.milliseconds();
    List<ClientResponse> responses = new ArrayList<>(); // 响应队列
    // 添加需要被丢弃的请求对应的响应到 responses 队列中,都是一些版本不匹配的请求
    this.handleAbortedSends(responses);
    // 对于发送成功且不期望服务端响应的请求,创建本地的响应对象添加到 responses 队列中
    this.handleCompletedSends(responses, updatedNow);
    /*
     * 获取并解析服务端响应
     * - 如果是更新集群元数据对应的响应,则更新本地缓存的集群元数据信息
     * - 如果是更新 API 版本的响应,则更新本地缓存的目标节点支持的 API 版本信息
     * - 否则,获取 ClientResponse 添加到 responses 队列中
     */
    this.handleCompletedReceives(responses, updatedNow);
    // 处理连接断开的请求,构建对应的 ClientResponse 添加到 responses 列表中,并标记需要更新集群元数据信息
    this.handleDisconnections(responses, updatedNow);
    // 处理 connections 列表,更新相应节点的连接状态
    this.handleConnections();
    // 如果需要更新本地的 API 版本信息,则创建对应的 ApiVersionsRequest 请求,并在下次执行 poll 操作时一并送出
    this.handleInitiateApiVersionRequests(updatedNow);
    // 遍历获取 inFlightRequests 中的超时请求,构建对应的 ClientResponse 添加到 responses 列表中,并标记需要更新集群元数据信息
    this.handleTimedOutRequests(responses, updatedNow);

    // 遍历处理响应对应的 onComplete 方法
    for (ClientResponse response : responses) {
        try {
            // 本质上就是在调用注册的 RequestCompletionHandler#onComplete 方法
            response.onComplete();
        } catch (Exception e) {
            log.error("Uncaught error in request completion:", e);
        }
    }

    return responses;
}

整个方法的执行流程可以概括为 4 个步骤:

Selector#poll
Selector#poll
RequestCompletionHandler#onComplete

首先来看更新本地缓存的集群元数据信息的过程( 步骤 1 ),前面曾多次提及到更新集群元数据的场景,而这些更新操作实际上都是标记集群元数据需要更新,真正执行更新的操作则发生在这里。实现位于 DefaultMetadataUpdater#maybeUpdate 方法中:

public long maybeUpdate(long now) {
    // 获取下次更新集群信息的时间戳
    long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
    // 检查是否已经发送了 MetadataRequest 请求
    long waitForMetadataFetch = this.metadataFetchInProgress ? requestTimeoutMs : 0;
    // 计算当前距离下次发送 MetadataRequest 请求的时间差
    long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
    if (metadataTimeout > 0) {
        // 如果时间还未到,则暂时不更新
        return metadataTimeout;
    }

    // 寻找负载最小的可用节点,如果没有可用的节点则返回 null
    Node node = leastLoadedNode(now);
    if (node == null) {
        log.debug("Give up sending metadata request since no node is available");
        return reconnectBackoffMs;
    }

    // 检查是否允许向目标节点发送请求,如果允许则创建 MetadataRequest 请求,并在下次执行 poll 操作时一并送出
    return this.maybeUpdate(now, node);
}

private long maybeUpdate(long now, Node node) {
    String nodeConnectionId = node.idString();

    // 如果允许向该节点发送请求
    if (canSendRequest(nodeConnectionId)) {
        // 标识正在请求更新集群元数据信息
        this.metadataFetchInProgress = true;
        // 创建集群元数据请求 MetadataRequest 对象
        MetadataRequest.Builder metadataRequest;
        if (metadata.needMetadataForAllTopics()) {
            // 需要更新所有 topic 的元数据信息
            metadataRequest = MetadataRequest.Builder.allTopics();
        } else {
            // 仅需更新指定 topic 的元数据信息
            metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics()));
        }

        // 将 MetadataRequest 包装成 ClientRequest 进行发送,在下次执行 poll 操作时一并发送
        log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
        sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
        return requestTimeoutMs;
    }

    /* 不允许向目标节点发送请求 */

    // 如果存在到目标节点的连接,则等待一会,无需再次尝试创建新的连接
    if (isAnyNodeConnecting()) {
        return reconnectBackoffMs;
    }

    /* 如果不存在到目标节点连接 */

    // 如果允许创建到目标节点的连接,则初始化连接
    if (connectionStates.canConnect(nodeConnectionId, now)) {
        log.debug("Initialize connection to node {} for sending metadata request", node.id());
        initiateConnect(node, now); // 初始化连接
        return reconnectBackoffMs;
    }

    return Long.MAX_VALUE;
}

方法首先会依据之前设置的标记,以及上次的更新时间决定是否需要更新集群元数据信息,如果需要则依据本地记录的已发往服务端的请求数目寻找集群中负载最小且可用的节点,并创建对应的 MetadataRequest 请求,但是这里的请求不是立即发出的,而是将请求包装成 ClientRequest 对象,并在下次 Selector#poll 操作时一并送出,也就是接下去即将执行的步骤 2。

步骤 2是真正发送网络请求的地方,这里的请求是异步的,客户端在发出请求之后继续执行步骤 3。 步骤 3 的逻辑主要是为每一个 ClientRequest 请求构造对应的 ClientResponse 响应对象,这些响应对象有的是依据服务端的响应进行构造,有的则是在本地伪造,因为不是所有的请求都需要等待服务端的响应,也不是所有的请求都能得到服务端的响应。这一步的实现对应了一系列的 handle* 方法:

  • handleAbortedSends
  • handleCompletedSends
  • handleCompletedReceives
  • handleDisconnections
  • handleConnections
  • handleInitiateApiVersionRequests
  • handleTimedOutRequests

下面逐一来看一下相应方法的实现。

  • handleAbortedSends

该方法的实现就是简单的将 NetworkClient#abortedSends 字段中记录的 ClientResponse 响应对象添加到结果集合中,并清空该字段。这些 ClientResponse 对象是在 NetworkClient#doSend 时添加的,添加的原因是本地请求与目标节点所支持的 API 版本不匹配。

  • handleCompletedSends

该方法会遍历客户端已经发送成功的请求,对于那些不期望服务端响应的请求可以直接创建对应的 ClientResponse 响应对象,并添加到结果集合中。实现如下:

private void handleCompletedSends(List<ClientResponse> responses, long now) {
    for (Send send : this.selector.completedSends()) {
        // 获取缓存到 inFlightRequests 集合中的请求对象
        InFlightRequest request = this.inFlightRequests.lastSent(send.destination());
        // 检测请求是否期望响应
        if (!request.expectResponse) {
            // 当前请求不期望服务端响应,则从 inFlightRequests 集合中删除
            this.inFlightRequests.completeLastSent(send.destination());
            // 为当前请求生成 ClientResponse 对象
            responses.add(request.completed(null, now));
        }
    }
}
  • handleCompletedReceives

该方法会获取并解析服务端的响应结果,并依据响应类型分别处理。实现如下:

private void handleCompletedReceives(List<ClientResponse> responses, long now) {
    for (NetworkReceive receive : this.selector.completedReceives()) {
        // 获取返回响应的节点 ID
        String source = receive.source();
        // 从 inFlightRequests 集合中获取缓存的 ClientRequest 对象
        InFlightRequest req = inFlightRequests.completeNext(source);
        // 解析响应
        AbstractResponse body = parseResponse(receive.payload(), req.header);
        log.trace("Completed receive from node {}, for key {}, received {}", req.destination, req.header.apiKey(), body);
        if (req.isInternalRequest && body instanceof MetadataResponse) {
            // 如果是更新集群元数据对应的响应,则更新本地的缓存的集群元数据信息
            metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
        } else if (req.isInternalRequest && body instanceof ApiVersionsResponse) {
            // 如果是更新 API 版本的响应,则更新本地缓存的目标节点支持的 API 版本信息
            this.handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
        } else {
            // 否则,获取 ClientResponse 响应对象添加到队列中
            responses.add(req.completed(body, now));
        }
    }
}

如果当前是针对之前请求更新集群元数据信息的响应,则会调用 DefaultMetadataUpdater#handleCompletedMetadataResponse 方法解析响应内容,如果响应正常则会调用 Metadata#update 方法更新本地缓存的集群元数据信息。如果当前是针对请求更新本地 API 版本信息的响应,则会调用 NetworkClient#handleApiVersionsResponse 方法更新本地缓存的目标节点支持的 API 版本信息。对于其它类型的响应,则直接封装成 ClientResponse 对象添加到结果集合中。

  • handleDisconnections

该方法会调用 Selector#disconnected 方法获取断开连接的节点 ID 集合,并更新相应节点的连接状态为 DISCONNECTED ,同时会清空本地缓存的与该节点相关的数据,最终创建一个 disconnected 类型的 ClientResponse 对象添加到结果集合中。如果这一步确实发现了已断开的连接,则标记需要更新本地缓存的节点元数据信息。

  • handleConnections

该方法会调用 Selector#connected 方法获取连接正常的节点 ID 集合,如果当前节点是第一次建立连接,则需要获取节点支持的 API 版本信息,方法会将当前节点的连接状态设置为 CHECKING_API_VERSIONS ,并将节点 ID 添加到 NetworkClient#nodesNeedingApiVersionsFetch 集合中,对于其它节点,则更新相应连接状态为 READY

  • handleInitiateApiVersionRequests

该方法用于处理 NetworkClient#handleConnections 方法中标记的需要获取支持的 API 版本信息的节点,即记录到 NetworkClient#nodesNeedingApiVersionsFetch 集合中的节点。方法会遍历处理集合中的节点,并在判断目标节点允许接收请求的情况下,构建 ApiVersionsRequest 请求以获取目标节点支持的 API 版本信息,该请求会被包装成 ClientRequest 对象,并在下次 Selector#poll 操作时一并送出。

  • handleTimedOutRequests

该方法会遍历缓存在 inFlightRequests 中已经超时的相关请求对应的节点集合,针对此类节点将其视作断开连接进行处理。方法会创建一个 disconnected 类型的 ClientResponse 对象添加到结果集合中,并标记需要更新本地缓存的集群元数据信息。

在完成了将各种类型请求对应的响应对象 ClientResponse 添加到结果集合中之后,会继续遍历该集合并应用 ClientResponse#onComplete 方法,该方法最终调用的是我们注册的 RequestCompletionHandler 对应的 RequestCompletionHandler#onComplete 方法。我们在分析 Sender#sendProduceRequest 方法时曾遇到过下面这一段代码:

RequestCompletionHandler callback = new RequestCompletionHandler() {
    @Override
    public void onComplete(ClientResponse response) {
        handleProduceResponse(response, recordsByPartition, time.milliseconds());
    }
};

实际上在调用 ClientResponse#onComplete 方法时本质上也就是在调用 Sender#handleProduceResponse 方法,该方法所做的工作就是区分当前的响应类型,并针对每一种响应类型设置对应的参数并回调 Sender#completeBatch 方法,区别仅在于方法的 response 参数设置:

response=new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION)
response=new ProduceResponse.PartitionResponse(Errors.INVALID_REQUEST)
response=new ProduceResponse.PartitionResponse(Errors.NONE)

下面来看一下 Sender#completeBatch 方法的具体实现:

private void completeBatch(RecordBatch batch, ProduceResponse.PartitionResponse response, long correlationId, long now) {
    Errors error = response.error;
    // 异常响应,但是允许重试
    if (error != Errors.NONE && this.canRetry(batch, error)) {
        log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}", correlationId, batch.topicPartition, retries - batch.attempts - 1, error);
        // 将消息重新添加到收集器中,等待再次发送
        this.accumulator.reenqueue(batch, now);
    }
    // 正常响应,或不允许重试的异常
    else {
        RuntimeException exception;
        if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
            // 权限认证失败
            exception = new TopicAuthorizationException(batch.topicPartition.topic());
        } else {
            // 其他异常,如果是正常响应,则为 null
            exception = error.exception();
        }
        // 将响应信息传递给用户,并释放 RecordBatch 占用的空间
        batch.done(response.baseOffset, response.logAppendTime, exception);
        this.accumulator.deallocate(batch);
    }

    // 如果是集群元数据异常,则标记需要更新集群元数据信息
    if (error.exception() instanceof InvalidMetadataException) {
        metadata.requestUpdate();
    }

    // 释放已经处理完成的 topic 分区,对于需要保证消息强顺序性,以允许接收下一条消息
    if (guaranteeMessageOrder) {
        this.accumulator.unmutePartition(batch.topicPartition);
    }
}

上述方法会判断当前响应是否异常且可以需要重试,如果是则将 RecordBatch 重新添加到收集器 RecordAccumulator 中,等待再次发送。如果是正常响应或不允许重试,则调用 RecordBatch#done 方法结束本次发送消息的过程,并将响应结果传递给用户,同时释放 RecordBatch 占用的空间。下面来看一下方法 RecordBatch#done 的实现:

public void done(long baseOffset, long logAppendTime, RuntimeException exception) {
    log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.", topicPartition, baseOffset, exception);

    // 标识当前 RecordBatch 已经处理完成
    if (completed.getAndSet(true)) {
        throw new IllegalStateException("Batch has already been completed");
    }

    // 设置当前 RecordBatch 发送之后的状态
    produceFuture.set(baseOffset, logAppendTime, exception);

    // 循环执行每个消息的 Callback 回调
    for (Thunk thunk : thunks) {
        try {
            // 消息处理正常
            if (exception == null) {
                // RecordMetadata 是服务端返回的
                RecordMetadata metadata = thunk.future.value();
                thunk.callback.onCompletion(metadata, null);
            }
            // 消息处理异常
            else {
                thunk.callback.onCompletion(null, exception);
            }
        } catch (Exception e) {
            log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
        }
    }

    // 标记本次请求已经完成(正常响应、超时,以及关闭生产者)
    produceFuture.done();
}

前面我们曾分析过在消息追加成功之后,如果在发送消息时指定了 Callback 回调函数,会将其封装成 Thunk 类对象,当时我们猜测 sender 线程在向集群投递完消息并收到来自集群的响应时,会循环遍历 thunks 集合,并应用 Callback 相应的回调方法,而上述方法的实现证实了我们的猜想。

方法中的变量 produceFuture 是一个 ProduceRequestResult 类型的对象,用于表示一次消息生产过程是否完成,该类基于 CountDownLatch 实现了类似 Future 的功能,在构造 ProduceRequestResult 对象时会创建一个大小为 1 的 CountDownLatch 对象,并在调用 ProduceRequestResult#done 方法时执行 CountDownLatch#countDown 操作,而 ProduceRequestResult#completed 方法判定消息发送是否完成的依据就是判定 CountDownLatch 对象值是否等于 0。

三. 总结

本文我们介绍了 java 版本的 KafkaProducer 的使用,并深入分析了相关设计和实现。从执行流程上来说,Kafka 生产者运行机制在整体设计上还是比较简单和直观的,但不可否认在实现上也有很多需要注意的细节。Kafka 在老版本的 SDK 中默认使用同步的方式往服务端投递消息,因为采用异步的方式存在消息丢失的问题,直到 0.8.2.0 版本以后才修复了这一问题,并将异步提交设置为默认方式。

了解 KafkaProducer 的设计和实现能够帮助我们在实际开发中更好的使用 kafka 生产者客户端,知晓如何能够保证消息的强顺序性,以及如何保证消息不丢失,甚至是利用其它编程语言自定义 SDK。下一篇,我们将继续分析消费者的运行机制。

转载声明 : 版权所有,商业转载请联系作者,非商业转载请注明出处

本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

数据结构与算法分析

数据结构与算法分析

韦斯 (Mark Allen Weiss) / 陈越 / 机械工业出版社 / 2016-3-1 / 69.00元

本书是国外数据结构与算法分析方面的经典教材,使用卓越的Java编程语言作为实现工具讨论了数据结构(组织大量数据的方法)和算法分析(对算法运行时间的估计)。本书把算法分析与有效率的Java程序的开发有机地结合起来,深入分析每种算法,内容全面、缜密严格,并细致讲解精心构造程序的方法。一起来看看 《数据结构与算法分析》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

SHA 加密
SHA 加密

SHA 加密工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具