Zeppelin入门 | Flink on Zeppelin (3) - Streaming篇

栏目: IT技术 · 发布时间: 4年前

内容简介:继之前入门篇和Batch篇之后,今天这篇Flink on Zeppelin主要讲述如何在Zeppelin中使用Flink的Streaming功能,我们会以2个主要的场景来讲:本文我们会用Kafka作为我们的数据源,使用Flink Sql处理Kafka中的某个topic数据,然后写入到另外一个Kafka Topic。为了使用Flink的Kafka connector,你需要在Flink Interpreter中配置flink.execution.packages。

继之前入门篇和Batch篇之后,今天这篇Flink on Zeppelin主要讲述如何在Zeppelin中使用Flink的Streaming功能,我们会以2个主要的场景来讲:

  • Streaming ETL

  • Streaming Data Analytics

Zeppelin入门 | Flink on Zeppelin (3) - Streaming篇

准备工作

本文我们会用Kafka作为我们的数据源,使用Flink Sql处理Kafka中的某个topic数据,然后写入到另外一个Kafka Topic。为了使用Flink的Kafka connector,你需要在Flink Interpreter中配置flink.execution.packages。

  • flink.execution.packages org.apache.flink:flink-connector-kafka_2.11:1.10-SNAPSHOT,org.apache.flink:flink-connector-kafka-base_2.11:1.10-SNAPSHOT,org.apache.flink:flink-json:1.10-SNAPSHOT

本文使用的kafka数据源是json格式,所以需要添加 org.apache.flink:flink-json

另外本文的例子会使用这个docker compose来创建Kafka Cluster, https://github.com/xushiyan/kafka-connect-datagen/

需要运行下面2个命令来启动kafka集群和创建kafka topic

docker-compose up -d

curl -X POST http://localhost:8083/connectors \

-H 'Content-Type:application/json' \

-H 'Accept:application/json' \

-d @connect.source.datagen.json

具体请参考这个官方链接 https://kafka-connect-datagen.readthedocs.io/en/latest/

Streaming ETL

接下里我们会用Flink SQL来做基于Kafka的Streaming ETL。首先我们需要创建Kafka source table代表kafka中的源数据。

%flink.ssql


DROP TABLE IF EXISTS source_kafka;


CREATE TABLE source_kafka (

status STRING,

direction STRING,

event_ts BIGINT

) WITH (

'connector.type' = 'kafka',

'connector.version' = 'universal',

'connector.topic' = 'generated.events',

'connector.startup-mode' = 'earliest-offset',

'connector.properties.zookeeper.connect' = 'localhost:2181',

'connector.properties.bootstrap.servers' = 'localhost:9092',

'connector.properties.group.id' = 'testGroup',

'connector.startup-mode' = 'earliest-offset',

'format.type'='json',

'update-mode' = 'append'

);

然后创建Kafka sink table,代表清洗后的数据 (这里我们定义了WATERMARK,是为了下一步做基于window的流式数据分析)。

%flink.ssql


DROP TABLE IF EXISTS sink_kafka;


CREATE TABLE sink_kafka (

status STRING,

direction STRING,

event_ts TIMESTAMP(3),

WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND

) WITH (

'connector.type' = 'kafka',

'connector.version' = 'universal',

'connector.topic' = 'generated.events2',

'connector.properties.zookeeper.connect' = 'localhost:2181',

'connector.properties.bootstrap.servers' = 'localhost:9092',

'connector.properties.group.id' = 'testGroup',

'format.type'='json',

'update-mode' = 'append'

)

接下里我们就可以用Insert Into语句来做Streaming ETL的工作了。

%flink.ssql


insert into sink_kafka select status, direction, cast(event_ts/1000000000 as timestamp(3)) from source_kafka where status <> 'foo'

这条Insert into语句非常简单,我们过滤掉了status为foo的数据,以及将event_ts 转化为timestamp类型。

Zeppelin入门 | Flink on Zeppelin (3) - Streaming篇

然后可以用select语句来预览sink table中的数据来确认Streaming ETL正常工作。

Zeppelin入门 | Flink on Zeppelin (3) - Streaming篇

Streaming Data Analytics

在完成了上面的Streaming ETL工作之后,我们就可以在Zeppelin中做流式数据分析了。在Zeppelin中可以用Select语句来做Flink 流数据分析,Select的结果会push到Zeppelin前端展示,可以用来做流式数据的dashboard。

Zeppelin支持3种模式的流式数据分析:

  • Single 模式

  • Update 模式

  • Append 模式

Single模式

Single模式适合当输出结果是一行的情况,比如下面的Select语句。这条 Sql 语句永远只有一行数据,但这行数据会持续不断的更新。这种模式的数据输出格式是html形式,用户可以用template来指定输出模板,{i} 是第 i 列的placeholder。

%flink.ssql(type=single, parallelism=1, refreshInterval=3000, template=<h1>{1}</h1> until <h2>{0}</h2>)


select max(event_ts), count(1) from sink_kafka

Zeppelin入门 | Flink on Zeppelin (3) - Streaming篇

Update 模式

Update模式适合多行输出的情况,比如下面的select group by语句。这种模式会定期更新这多行数据,输出是Zeppelin的table格式,所以可以用Zeppelin自带的可视化控件。

%flink.ssql(type=update, refreshInterval=2000, parallelism=1)


select status, count(1) as pv from sink_kafka group by status

Zeppelin入门 | Flink on Zeppelin (3) - Streaming篇

Append模式

Append模式适合不断有新数据输出,但不会覆盖原有数据,只会不断append的情况。比如下面的基于窗口的group by语句。Append模式要求第一列数据类型是timestamp,这里的start_time就是timestamp类型。

%flink.ssql(type=append, parallelism=1, refreshInterval=2000, threshold=60000)


select TUMBLE_START(event_ts, INTERVAL '5' SECOND) as start_time, status, count(1) from sink_kafka

group by TUMBLE(event_ts, INTERVAL '5' SECOND), status

Zeppelin入门 | Flink on Zeppelin (3) - Streaming篇

更多Flink SQL资料

本文只是简单介绍如何在Zeppelin中使用Flink Streaming SQL,关于更多Flink SQL请参考Flink官方文档

  • https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/

  • https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html

Zeppelin on Flink 系列

Zeppelin on Flink (1) 入门篇

Zeppelin on Flink (2) Batch 篇

如果有碰到任何问题,请加入下面这个钉钉群讨论。 后续我们会有更多Tutorial的文章,敬请期待。

Zeppelin入门 | Flink on Zeppelin (3) - Streaming篇


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

算法引论

算法引论

[美]乌迪·曼博(Udi Manber) / 黄林鹏、谢瑾奎、陆首博、等 / 电子工业出版社 / 2010-1 / 36.00元

本书是国际算法大师乌迪·曼博(Udi Manber)博士撰写的一本享有盛誉的著作。全书共分12章:第1章到第4章为介绍性内容,涉及数学归纳法、算法分析、数据结构等内容;第5章提出了与归纳证明进行类比的算法设计思想;第6章到第9章分别给出了4个领域的算法,如序列和集合的算法、图算法、几何算法、代数和数值算法;第10章涉及归约,也是第11章的序幕,而后者涉及NP完全问题;第12章则介绍了并行算法;最后......一起来看看 《算法引论》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具