kafka0.9 Consumer poll()方法阻塞

栏目: 后端 · 发布时间: 6年前

内容简介:kafka0.9 Consumer poll()方法阻塞

最近项目中用到了Kafka0.9,在使用0.9的Consumer API的时候遇到了poll()方法阻塞的问题。程序没有报任何错误,只是持续在poll()方法处阻塞。深入poll()方法可以看到是在AbstractCoordinator.ensureCoordinatorKnown()方法中出现了死循环。在循环中不停地输出如下DEBUG日志:

DEBUG main org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Issuing group metadata request to broker 2

DEBUG main org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Group metadata response ClientResponse(receivedTimeMs=1495335769027, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@5c3bd550, request=RequestSend(header={api_key=10,api_version=0,correlation_id=90,client_id=consumer-1}, body={group_id=stats_consume_group}), createdTimeMs=1495335768924, sendTimeMs=1495335768924), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})

DEBUG main org.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=91,client_id=consumer-1}, body={topics=[statistic_test]}), isInitiatedByNetworkClient, createdTimeMs=1495335769027, sendTimeMs=0) to node 2

DEBUG main org.apache.kafka.clients.Metadata - Updated cluster metadata version 47 to Cluster(nodes = [Node(0, kafka-node, 9092), Node(2, sz-pg-adtracking-bigdisk-003, 9092), Node(1, sz-pg-adtracking-bigdisk-002, 9092)], partitions = [Partition(topic = statistic_test, partition = 1, leader = 2, replicas = [2,1,], isr = [1,2,], Partition(topic = statistic_test, partition = 2, leader = 0, replicas = [0,2,], isr = [0,2,], Partition(topic = statistic_test, partition = 0, leader = 1, replicas = [1,0,], isr = [0,1,], Partition(topic = statistic_test, partition = 5, leader = 0, replicas = [0,1,], isr = [0,1,], Partition(topic = statistic_test, partition = 6, leader = 1, replicas = [1,0,], isr = [0,1,], Partition(topic = statistic_test, partition = 3, leader = 1, replicas = [1,2,], isr = [1,2,], Partition(topic = statistic_test, partition = 4, leader = 2, replicas = [2,0,], isr = [0,2,], Partition(topic = statistic_test, partition = 7, leader = 2, replicas = [2,1,], isr = [1,2,], Partition(topic = statistic_test, partition = 8, leader = 0, replicas = [0,2,], isr = [0,2,]])

需要关注的是这处信息:

error_code=15,coordinator={node_id=-1,host=,port=-1}

看样子是kafka的连接出了问题。不过我的Producer向kafka写数据是没问题的,使用kafka提供的消费工具kafka-console-consumer.sh执行消费也是没问题的。

在网上找到了一些关于这个现象的解释:在客户端进行消费之前会为ConsumerGroup向Kafka集群申请coordinater节点。kafka集群在配置或分配coordinater节点的时候可能会短暂的报这个错误。

我这里不是短暂的报错,而是陷入了死循环。目前可以想到的就是我的kafka集群配置出现问题了。在简单粗暴地将zookeeper上kafka的配置完全删掉再重启Kafka后,消费可以正常执行了。至于问题具体出在哪儿还没有找到。目前只能是持续关注,等问题再次出现了。

就这样。

######


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Think Python

Think Python

Allen B. Downey / O'Reilly Media / 2012-8-23 / GBP 29.99

Think Python is an introduction to Python programming for students with no programming experience. It starts with the most basic concepts of programming, and is carefully designed to define all terms ......一起来看看 《Think Python》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

URL 编码/解码
URL 编码/解码

URL 编码/解码

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试