Kafka Producer 设置 Interceptor 来统计消息

栏目: 后端 · 发布时间: 5年前

内容简介:Kafka 消息的 Producer 在调用这里的关系是Kafka 的所有配置项常量可以在这个页面

Kafka 消息的 Producer 在调用 producer.send() 方法发送消息时会先把消息放到本地缓冲中,然后由  Kafka 网络线程从缓冲中提取消息再送到 Kafka 代理上去。本地缓冲区大小由 buffer.memory 来配置,默认为 32M(32 * 1024 * 1024L)。如果发消息到网络慢于提交消息到缓冲区的话,缓冲区就可能会满就无法接受新的消息,这时候就要依照 block.on.buffer.full 设置是否暂停还是抛出异常,默认为暂停 producer.send() ;暂停时间由 max.block.ms 决定,默认为 60 秒。 producer.send() 返回一个 Future<RecordMetadata> , 也就是每次调用 send() 方法在缓冲区满后要等待 60 秒才能获得结果(异常)。

这里的关系是 send() --a--> 缓冲区 --b--> 发送到 Kafka 代理 ,自然要在 ab 之间进行流量控制,如果 b 太慢,缓冲区满的话必须把 a 放慢下来。如果能基于缓冲区已使用大小来放缓 a 也是也行的,留待以后进行研究。本文提供另一种实现参考,为 Producer 配置一个 Interceptor 能够大致统计多少消息提交到缓冲区,多少消息从缓冲区取出。

Kafka 的所有配置项常量可以在这个页面 https://kafka.apache.org/0100/javadoc/constant-values.html 找到。对 interceptor.classes 的解释是:可以为 Producer 配置一个或多个 Interceptor(需要实现 ProducerInterceptor)。另外 Consumer 也有自己的  Interceptor(实现 ConsumerInterceptor)。

ProducerInterceptor 有三个接口方法:

  1. void close(): Interceptor 关闭时调用,会在 Producer 关闭前被调用
  2. ProducerRecord<K,V> onSend(ProducerRecord<K, V> record): 由 KafkaProducer.send(ProducerRecord) 和 KafkaProducer.send(ProducerRecord, Callback) 调用,在序列化 key 和  value 和指定 partition(如果没有指定) 之前调用,就是说在把消息放到缓冲区之前调用。该方法可能再次对消息进行修改。
  3. void onAcknowledgement(RecordMetadata metadata, Exception exception): 该方法在消息从缓冲区提出来成功发送到了网络,或发送失败后都被调用
  4. void configure(Map<String, ?> configs): 在创建 KafkaProducer 之前还有一次机会对属性进行配置

现在用代码来演示来统计提交到缓冲区,发送成功,发送失败的消息记录数

Producer 相关代码

public class Main {
    private static final Logger logger = LoggerFactory.getLogger(Main.class);
 
    public static void main(String[] args) {
        String topic = "test_topic";
 
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "blog.yanbin.StatisticsProducerInterceptor");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
 
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 5; i++) {
            producer.send(new ProducerRecord<>(topic, String.valueOf(i), String.valueOf(i)));
        }
 
        producer.close();
 
        logger.info(StatisticsProducerInterceptor.getRecordStatistics());
    }

上面用 ProducerConfig.INTERCEPTOR_CLASSES_CONFIG 指定了一个 Interceptor 的实现类  StatisticsProducerInterceptor ,它的代码如下

public class StatisticsProducerInterceptor implements ProducerInterceptor<String, String> {
    private static final Logger logger = LoggerFactory.getLogger(StatisticsProducerInterceptor.class);
 
    private static LongAdder submittedRecords = new LongAdder();
    private static LongAdder deliveredRecords = new LongAdder();
    private static LongAdder failedRecords = new LongAdder();
 
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        ProducerRecord<String, String> updatedRecord = record.value().compareTo("3") < 0 ? record :
            new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(),
                record.value() + "+U"); //演示修改消息
 
        logger.info("record: {} to be sent, updated value from {} to {}",
            updatedRecord, record.value(), updatedRecord.value());
 
        submittedRecords.increment(); //如果消息最终无法被序列化,将不被放到缓冲区,并触发 onAcknowledgement() 方法并带有异常
        return updatedRecord;
    }
 
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if(exception == null) {
            deliveredRecords.increment();
            logger.info("sent message: topic: {}, partition: {}, offset: {}, timestamp: {}, checksum: {}",
                metadata.topic(), metadata.partition(), metadata.offset(), metadata.timestamp(), metadata.checksum());
        } else {
            failedRecords.increment();
            logger.error("failed to send message: {}", metadata, exception);
        }
 
        logger.info(getRecordStatistics());
    }
 
    @Override
    public void close() {
        logger.info("producer closed");
    }
 
    @Override
    public void configure(Map<String, ?> configs) {
        logger.info("configuration: {}", configs);
    }
 
    public static String getRecordStatistics() {
        return String.format("record statistics, submitted: %s, delivered: %s, failed: %s",
            submittedRecords.longValue(), deliveredRecords.longValue(), failedRecords.longValue());
    }
}

执行后效果大概如下

2018-11-01 00:33:23 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=0, value=0, timestamp=null) to be sent, updated value from 0 to 0  2018-11-01 00:33:23 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=1, value=1, timestamp=null) to be sent, updated value from 1 to 1  2018-11-01 00:33:23 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=2, value=2, timestamp=null) to be sent, updated value from 2 to 2  2018-11-01 00:33:23 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=3, value=3+U, timestamp=null) to be sent, updated value from 3 to 3+U  2018-11-01 00:33:23 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=4, value=4+U, timestamp=null) to be sent, updated value from 4 to 4+U  2018-11-01 00:33:23 [main] INFO KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.  2018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - sent message: topic: test_topic, partition: 0, offset: 6351, timestamp: 1541050403463, checksum: 1478612472  2018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 5, delivered: 1, failed: 0  2018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - sent message: topic: test_topic, partition: 0, offset: 6352, timestamp: 1541050403475, checksum: 4199907714  2018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 5, delivered: 2, failed: 0  2018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - sent message: topic: test_topic, partition: 0, offset: 6353, timestamp: 1541050403475, checksum: 3855131286  2018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 5, delivered: 3, failed: 0  2018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - sent message: topic: test_topic, partition: 0, offset: 6354, timestamp: 1541050403475, checksum: 1502822821  2018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 5, delivered: 4, failed: 0  2018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - sent message: topic: test_topic, partition: 0, offset: 6355, timestamp: 1541050403475, checksum: 3673351358  2018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 5, delivered: 5, failed: 0  2018-11-01 00:33:23 [main] INFO StatisticsProducerInterceptor - producer closed  2018-11-01 00:33:23 [main] INFO Main - record statistics, submitted: 5, delivered: 5, failed: 0

从日志中可以看到总共提交了 5 条消息,成功发送了 5 条消息,失败消息数为 0。消息能在 onSend(..) 函数中被修改。而且看起来好像完成把全部消息放到缓冲区后才开始发送消息,main 函数中数字改为 10,也差不多, onSend(..) 调用完 10 才开始真正发送消息到网络。但是注意到  onSend(..)onAcknowledgement(..) 是由不同的线程调用的,所以它们不该存在先后顺序的。

若是不信,我们可以一次性发送 2000 条消息,修改 main 函数的循环次数为 2000,执行后再查看日志,以下是片断

2018-11-01 01:01:40 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=0, value=0, timestamp=null) to be sent, updated value from 0 to 0  2018-11-01 01:01:40 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=1, value=1, timestamp=null) to be sent, updated value from 1 to 1  2018-11-01 01:01:40 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=821, value=821+U, timestamp=null) to be sent, updated value from 820 to 820+U  2018-11-01 01:01:40 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 821, delivered: 1, failed: 0  2018-11-01 01:01:40 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - sent message: topic: test_topic, partition: 0, offset: 19357, timestamp: 1541052100757, checksum: 791494235  2018-11-01 01:01:40 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=855, value=855+U, timestamp=null) to be sent, updated value from 855 to 855+U  2018-11-01 01:01:40 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 855, delivered: 2, failed: 0  2018-11-01 01:01:40 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=1612, value=1612, timestamp=null) to be sent, updated value from 1612 to 1612  2018-11-01 01:01:40 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 1611, delivered: 242, failed: 0  2018-11-01 01:01:40 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=1614, value=1614, timestamp=null) to be sent, updated value from 1614 to 1614  2018-11-01 01:01:40 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=1999, value=1999, timestamp=null) to be sent, updated value from 1999 to 1999  2018-11-01 01:01:40 [main] INFO KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.  2018-11-01 01:01:41 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 2000, delivered: 1999, failed: 0  2018-11-01 01:01:41 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - sent message: topic: test_topic, partition: 0, offset: 21355, timestamp: 1541052100856, checksum: 2489747570  2018-11-01 01:01:41 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 2000, delivered: 2000, failed: 0  2018-11-01 01:01:41 [main] INFO StatisticsProducerInterceptor - producer closed  2018-11-01 01:01:41 [main] INFO Main - record statistics, submitted: 2000, delivered: 2000, failed: 0

从日志来说明,发送从缓冲区中取消息发送到网络上并不需要等待所有的消息都放到缓冲区后再进行,它们是不同的两个线程,但是从最络来看待发送的消息都成功发送到了 Kafka 代理上。

使用 ProducerInterceptor 还是可以比较准确的统计到待发送消息与成功送到网络的记录数,如果消息不能被序列化将直接带异常的触发 onAcknowledgement(..) 方法,并统计为发送失败。这也是我们想要的结果。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Dreamweaver CS3 Bible

Dreamweaver CS3 Bible

Joseph W. Lowery / Wiley / May 21, 2007 / $49.99

Book Description Learn to create dynamic, data-driven Web sites using the exciting enhancements in the Dreamweaver CS3 version. You get a thorough understanding of the basics and then progress to l......一起来看看 《Dreamweaver CS3 Bible》 这本书的介绍吧!

html转js在线工具
html转js在线工具

html转js在线工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具