librdkafka: 如何设置Kafka消费者订阅消息的起始偏移位置

栏目: IT技术 · 发布时间: 4年前

内容简介:librdkafka: 如何设置Kafka消费者订阅消息的起始偏移位置

 本文主要介绍使用kafka C语言接口库,如何设置消费者订阅消息的起始偏移。

01

缺省配置

默认情况下,Kafka消费者从最后一次提交的偏移量位置(offset)开始消费消息,如果Topic+Partition和Group之前没有提交过偏移量,它订阅消息开始位置取决于Topic的配置属性auto.offset.reset的设置。默认为最新(latest),也就是在分区末尾开始消耗(仅消费新消息)。相关配置可以参考官方文档:https://kafka.apache.org/documentation/#topicconfigs

方便查阅,截个图:

librdkafka: 如何设置Kafka消费者订阅消息的起始偏移位置

02


相关接口信息

librdkafka提供了assign() API,通过设置rd_kafka_topic_partition_t的.offset属性,你可以指定每一个Partition的起始偏移。偏移量可以是一个绝对的偏移(>0),或逻辑偏移 (BEGINNING, END, STORED, TAIL(…))。

rdkafka.h头文件中定义了Partition的管理结构rd_kafka_topic_partition_t,包含offset信息;同时提供了逻辑偏移的定义RD_KAFKA_OFFSET_XXX。

/** * @brief Generic place holder for a specific Topic+Partition. * * @sa rd_kafka_topic_partition_list_new() */typedef struct rd_kafka_topic_partition_s {        char        *topic;             /**< Topic name */        int32_t      partition;         /**< Partition */      int64_t      offset;            /**< Offset */        void        *metadata;          /**< Metadata */        size_t       metadata_size;     /**< Metadata size */        void        *opaque;            /**< Application opaque */        rd_kafka_resp_err_t err;        /**< Error code, depending on use. */        void       *_private;           /**< INTERNAL USE ONLY,                                         *   INITIALIZE TO ZERO, DO NOT TOUCH */} rd_kafka_topic_partition_t;
////////////////////////////////////////////////////////////#define RD_KAFKA_OFFSET_BEGINNING -2 /**< Start consuming from beginning of * kafka partition queue: oldest msg */#define RD_KAFKA_OFFSET_END -1 /**< Start consuming from end of kafka * partition queue: next msg */#define RD_KAFKA_OFFSET_STORED -1000 /**< Start consuming from offset retrieved * from offset store */#define RD_KAFKA_OFFSET_INVALID -1001 /**< Invalid offset */

/** @cond NO_DOC */#define RD_KAFKA_OFFSET_TAIL_BASE -2000 /* internal: do not use *//** @endcond */
/** * @brief Start consuming \p CNT messages from topic's current end offset. * * That is, if current end offset is 12345 and \p CNT is 200, it will start * consuming from offset \c 12345-200 = \c 12145. */#define RD_KAFKA_OFFSET_TAIL(CNT)  (RD_KAFKA_OFFSET_TAIL_BASE - (CNT))

通过rd_kafka_assign()函数接口可以配置需要消费的Partition信息。

/** * @brief Atomic assignment of partitions to consume. * * The new \p partitions will replace the existing assignment. * * When used from a rebalance callback the application shall pass the * partition list passed to the callback (or a copy of it) (even if the list * is empty) rather than NULL to maintain internal join state.
* A zero-length \p partitions will treat the partitions as a valid, * albeit empty, assignment, and maintain internal state, while a \c NULL * value for \p partitions will reset and clear the internal state. */RD_EXPORT rd_kafka_resp_err_trd_kafka_assign (rd_kafka_t *rk,                 const rd_kafka_topic_partition_list_t *partitions);

03


如何配置offset

对于消费者来说,有两个场景来修改订阅的Parttion offset信息:一是系统初始化时直接指定offset信息,二是消费者群组重平衡(rebalance)的回调函数。接下来分别介绍一下。

系统初始化时,指定offset示例:

rd_kafka_topic_partition_list_t *partitions;partitions = rd_kafka_topic_partition_list_new(0);rd_kafka_topic_partition_list_add(partitions, "mytopic", 3)->offset = 1234;rd_kafka_assign(rk, partitions);rd_kafka_topic_partition_list_destroy(partitions);  

rebalance_cb()回调函数中,指定offset示例。

void my_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,                      rd_kafka_topic_partition_list_t *partitions, void *opaque) {   if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {       rd_kafka_topic_partition_t *part;       if ((part = rd_kafka_topic_partition_list_find(partitions, "mytopic", 3)))           part->offset = 1234;       rd_kafka_assign(rk, partitions);   }  else {       rd_kafka_assign(rk, NULL);   }}

04


更多精彩文章请订阅公众号

librdkafka: 如何设置Kafka消费者订阅消息的起始偏移位置


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

移动风暴

移动风暴

[美]弗雷德·沃格尔斯坦 / 朱邦芊 / 中信出版社 / 2014-1-1 / 39

也许,除了伟大的乔布斯,每一位奋力改变世界的硅谷英雄,都值得我们肃然起敬。苹果与谷歌十年博弈,关于这场移动平台战争的报道早已铺天盖地,而这是第一次,我们能听到幕后工程师的真实声音。两大科技巨人用智能手机和平板电脑颠覆了电脑产业。它们位处变革的中心,凭借各自的经营哲学、魅力领袖和商业敏感度,把竞争变成了残酷对决。商业记者沃格尔斯坦报道这场对抗已逾十载,在《移动风暴》中,他带领我们来到一间间办公室和会......一起来看看 《移动风暴》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

在线进制转换器
在线进制转换器

各进制数互转换器

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具