获取Kafka Consumer的offset

栏目: 服务器 · Apache · 发布时间: 6年前

内容简介:获取Kafka Consumer的offset

从kafka的0.8.11版本开始,它会将consumer的offset提交给ZooKeeper。然而当offset的数量(consumer数量 * partition的数量)的很多的时候,ZooKeeper的适应性就可能会出现不足。幸运的是,Kafka现在提供了一种理想的机制来存储Consumer的offset。Kafka现在是将Consumer的offset写入到一个分布式持久化的、高可用的topic中。开发者可以通过消费这个topic的方式来获取Consumer的offset。为了提升访问速度,kafka还提供了offset的内存缓存。也就是说,现在提交offset是通过普通的生产者请求(代价并不高)来完成的,而获取offset则是通过在内存中的快速查询完成的。

Kafka的官方文档描述了这个特性是如何工作的,以及如何将offset从zookeeper迁移到kafka。下面的代码演示了如何利用基于kafka存储offset的特性。

第一步:通过发送consumer元数据请求到任意Broker来发现并连接offset manager:

import kafka.api.*;
import kafka.cluster.Broker;
import kafka.common.OffsetAndMetadata;
import kafka.common.OffsetMetadataAndError;
import kafka.common.TopicAndPartition;
import kafka.javaapi.ConsumerMetadataResponse;
import kafka.javaapi.OffsetCommitRequest;
import kafka.javaapi.OffsetCommitResponse;
import kafka.javaapi.OffsetFetchRequest;
import kafka.javaapi.OffsetFetchResponse;
import kafka.network.BlockingChannel;
import java.util.*;
 
...
 
    try {
        BlockingChannel channel = new BlockingChannel("localhost", 9092,
                BlockingChannel.UseDefaultBufferSize(),
                BlockingChannel.UseDefaultBufferSize(),
                5000 /* read timeout in millis */);
        channel.connect();
        final String MY_GROUP = "demoGroup";
        final String MY_CLIENTID = "demoClientId";
        int correlationId = 0;
        final TopicAndPartition testPartition0 = new TopicAndPartition("demoTopic", 0);
        final TopicAndPartition testPartition1 = new TopicAndPartition("demoTopic", 1);
        channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID));
        ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer());
 
        if (metadataResponse.errorCode() == ErrorMapping.NoError()) {
            Broker offsetManager = metadataResponse.coordinator();
            // if the coordinator is different, from the above channel's host then reconnect
            channel.disconnect();
            channel = new BlockingChannel(offsetManager.host(), offsetManager.port(),
                                          BlockingChannel.UseDefaultBufferSize(),
                                          BlockingChannel.UseDefaultBufferSize(),
                                          5000 /* read timeout in millis */);
            channel.connect();
        } else {
            // retry (after backoff)
        }
    }
    catch (IOException e) {
        // retry the query (after backoff)
    }
    
...

第二步:发送OffsetCommitRequest 或者 OffsetFetchRequest到offset manager:

 // How to commit offsets
 
        long now = System.currentTimeMillis();
        Map<TopicAndPartition, OffsetAndMetadata> offsets = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>();
        offsets.put(testPartition0, new OffsetAndMetadata(100L, "associated metadata", now));
        offsets.put(testPartition1, new OffsetAndMetadata(200L, "more metadata", now));
        OffsetCommitRequest commitRequest = new OffsetCommitRequest(
                MY_GROUP,
                offsets,
                correlationId++,
                MY_CLIENTID,
                (short) 1 /* version */); // version 1 and above commit to Kafka, version 0 commits to ZooKeeper
        try {
            channel.send(commitRequest.underlying());
            OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(channel.receive().buffer());
            if (commitResponse.hasError()) {
                for (partitionErrorCode: commitResponse.errors().values()) {
                    if (partitionErrorCode == ErrorMapping.OffsetMetadataTooLargeCode()) {
                        // You must reduce the size of the metadata if you wish to retry
                    } else if (partitionErrorCode == ErrorMapping.NotCoordinatorForConsumerCode() || partitionErrorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) {
                      channel.disconnect();
                      // Go to step 1 (offset manager has moved) and then retry the commit to the new offset manager
                    } else {
                        // log and retry the commit
                    }
                }
            }
        }
        catch (IOException ioe) {
            channel.disconnect();
            // Go to step 1 and then retry the commit
        }
 
 // How to fetch offsets
 
        List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>();
        partitions.add(testPartition0);
        OffsetFetchRequest fetchRequest = new OffsetFetchRequest(
                MY_GROUP,
                partitions,
                (short) 1 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper
                correlationId,
                MY_CLIENTID);
        try {
            channel.send(fetchRequest.underlying());
            OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer());
            OffsetMetadataAndError result = fetchResponse.offsets().get(testPartition0);
            short offsetFetchErrorCode = result.error();
            if (offsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) {
                channel.disconnect();
                // Go to step 1 and retry the offset fetch
            } else if (errorCode == ErrorMapping.OffsetsLoadInProgress()) {
                // retry the offset fetch (after backoff)
            } else {
                long retrievedOffset = result.offset();
                String retrievedMetadata = result.metadata();
            }
        }
        catch (IOException e) {
            channel.disconnect();
            // Go to step 1 and then retry offset fetch after backoff
        }

####


以上所述就是小编给大家介绍的《获取Kafka Consumer的offset》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

微信小程序(开发入门及案例详解)

微信小程序(开发入门及案例详解)

李骏、边思 / 机械工业出版社 / 2017-3-1 / 59.0

本书可分为3部分,第一部分作为基础章节,介绍了第一个小程序的搭建流程,让大家能快速上手;同时对小程序框架原理进行了详细介绍,为后面学习组件、API打下基础。 第二部分对小程序组件、API进行介绍,对组件、API的使用、注意事项进行详细讲解,并给出示例代码。 最后一部分精选5个由浅入深的案例,对小程序研发进行实战讲解,涵盖了实际项目中可能涉及的技术方案和使用方法,具备很强的实战意义。 ......一起来看看 《微信小程序(开发入门及案例详解)》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具