Flink AsyncFunction导致的Kafka数据不消费

栏目: IT技术 · 发布时间: 4年前

内容简介:flinksql从kafka读取数据,异步函数加载Mysql数据进行维表关联,最后将数据写入到mysql中。任务在启动时会消费kafka数据,一段时间后不读kafka或者能够持续读kafka数据但是异步函数不发送给下游算子。打印执行线程堆栈信息,虽然BLOCKED状态的线程很多,但大部分是第三方类的执行线程,都比较正常。突然发现和我们程序有关的代码阻塞线程。

Flink AsyncFunction导致的Kafka数据不消费

问题描述

flinksql从kafka读取数据,异步函数加载 Mysql 数据进行维表关联,最后将数据写入到mysql中。任务在启动时会消费kafka数据,一段时间后不读kafka或者能够持续读kafka数据但是异步函数不发送给下游算子。

  • 不读kafka数据:kafka读取线程像卡住一样,从kafka中读取不到数据,以为是网络原因,但是计算节点和工作节点在同一台机器中,于是排除网络原因。

  • 持续读kafka数据,但是异步函数不下发数据:以为是设置的异步超时间超时,默认是10s,增大超时时间后依然不下发。

Jstack 排查

打印执行线程堆栈信息,虽然BLOCKED状态的线程很多,但大部分是第三方类的执行线程,都比较正常。突然发现和我们程序有关的代码阻塞线程。

Flink AsyncFunction导致的Kafka数据不消费

原来是调用我们的timeout函数出现了阻塞。

public void timeout(Row input, ResultFuture<Row> resultFuture) {
        StreamRecordQueueEntry<Row> future = (StreamRecordQueueEntry<Row>)resultFuture;
        try {
        // 阻塞等待
            if (null == future.get()) {
                resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out."));
            }
        } catch (Exception e) {
            resultFuture.completeExceptionally(new Exception(e));
        }
    }

阻塞原因

在flink异步函数asyncInvoke中,只处理了正常逻辑。也就是匹配上调用 resultFuture.complete(rowList); 但是fillData里面进行数据类型转换时很容易发生异常,当发生异常时,resultFuture并没有结果输出,从而导致整个链路阻塞。

List<Row> rowList = Lists.newArrayList();
      for (Object jsonArray : (List) val.getContent()) {
          Row row = fillData(input, jsonArray);
          rowList.add(row);
       }
  resultFuture.complete(rowList);

解决以及注意事项

fillData进行try-catch捕获发生异常时调用 resultFuture.completeExceptionally(exception);

在flink异步函数中, resultFuture.complete() 只会被调用一次,complete一个集合需要先在填充然后一次性发送,而不是通过遍历调用多次 resultFuture.complete()

使用异步Future一定要记得有输出值。

堆栈信息重点关注有没有我们自己的逻辑 。

欢迎点赞+收藏+转发朋友圈素质三连

Flink AsyncFunction导致的Kafka数据不消费

Flink AsyncFunction导致的Kafka数据不消费

文章不错? 点个【 在看 】吧!   :point_down:


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

创业就是要细分垄断

创业就是要细分垄断

李开复、汪华、傅盛 / 文化发展出版社 / 2017-5-1 / CNY 45.00

对各方面资源极为有限的创业公司而言,想在激烈的市场竞争中站立下来的第一步是:成为细分市场的垄断者。不管是资本还是尖端人才,追逐的永远是行业里尖端的企业,第二名毫无意义。 首先,要精准定位潜在市场。这个市场的需求仍没有被满足,并且潜力巨大。其次,抓住时代和行业的红利,通过高速增长实现“小垄断”,抢滩登陆。最后,在细分领域里建立起自己的竞争壁垒,应对巨头和竞争对手的复制,去扩展更大的市场,从而扩......一起来看看 《创业就是要细分垄断》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

html转js在线工具
html转js在线工具

html转js在线工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具