实战 | 数据湖中的流式数据摄取之DeltaSteamer

栏目: IT技术 · 发布时间: 3年前

内容简介:2、可以跳过集成测试模块3、编译过后会得到hudi-utilities-bundle_2.11-0.6.0-SNAPSHOT.jar和hudi-hadoop-mr-bundle-0.6.0-SNAPSHOT.jar两个我们所需要的jar包

一、首先从 https://github.com/apache/incubator-hudi.git 将hudi clone到自己本地idea 使用clean install -DskipTests -DskipITs -Dcheckstyle.skip=true进行编译 注意: 1、目前hudi使用的是hadoop2.7.3版本,如果使用hadoop3.x版本,请修改pom重新编译

<hadoop.version>3.0.0</hadoop.version>

2、可以跳过集成测试模块

<modules>
    <module>hudi-common</module>
    <module>hudi-cli</module>
    <module>hudi-client</module>
    <module>hudi-hadoop-mr</module>
    <module>hudi-hive-sync</module>
    <module>hudi-spark</module>
    <module>hudi-timeline-service</module>
    <module>hudi-utilities</module>
    <module>packaging/hudi-hadoop-mr-bundle</module>
    <module>packaging/hudi-hive-sync-bundle</module>
    <module>packaging/hudi-spark-bundle</module>
    <module>packaging/hudi-presto-bundle</module>
    <module>packaging/hudi-utilities-bundle</module>
    <module>packaging/hudi-timeline-server-bundle</module>
    <module>docker/hoodie/hadoop</module>
    <!--<module>hudi-integ-test</module>-->  <!--将此注释即可-->
  </modules>

3、编译过后会得到hudi-utilities-bundle_2.11-0.6.0-SNAPSHOT.jar和hudi-hadoop-mr-bundle-0.6.0-SNAPSHOT.jar两个我们所需要的jar包

二、使用hudi自带的DeltaStreamer工具写数据到hudi,开启--enable-hive-sync 即可同步数据到hive表

DeltaStreamer启动命令

spark-submit --master yarn   \      
  --driver-memory 1G \
  --num-executors 2 \
  --executor-memory 1G \
  --executor-cores 4 \
  --deploy-mode cluster \
  --conf spark.yarn.executor.memoryOverhead=512 \
  --conf spark.yarn.driver.memoryOverhead=512 \
  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /.../hudi-utilities-bundle_2.11-0.5.2-SNAPSHOT.jar` \
  --props hdfs://../kafka.properties \ '启动delateStreamer所需要的配置文件'
  --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
  --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \  '这里我选择从kafka消费json数据格式'
  --target-base-path hdfs://../business \ 'hudi数据存储地址'
  --op UPSERT \
  --target-table business  \    '这里其实并不是hive表的名称,实际表名是在kafka.properties中配置'
  --enable-hive-sync \          '开启同步至hive'
  --table-type MERGE_ON_READ \
  --source-ordering-field create_time \
  --source-limit 5000000

kafka.properties配置实例

hoodie.upsert.shuffle.parallelism=2
hoodie.insert.shuffle.parallelism=2
hoodie.bulkinsert.shuffle.parallelism=2
hoodie.datasource.write.recordkey.field=uuid
hoodie.datasource.write.partitionpath.field=create_time
hoodie.datasource.write.precombine.field=update_time
hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://../t_business.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://../t3_trip.t_business.avsc
hoodie.deltastreamer.source.kafka.topic=t_business_topic
group.id=t_business_group
bootstrap.servers=localhost
auto.offset.reset=latest
hoodie.parquet.max.file.size=134217728
hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.keygen.TimestampBasedKeyGenerator
hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING
hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
hoodie.datasource.hive_sync.database=dwd
hoodie.datasource.hive_sync.table=test
hoodie.datasource.hive_sync.username=用户名
hoodie.datasource.hive_sync.password=密码
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://.....
hoodie.datasource.hive_sync.partition_fields=分区字段

三、查询

Hudi支持以下存储数据的视图

  • 读优化视图 : 在此视图上的查询将查看给定提交或压缩操作中数据集的最新快照。该视图仅将最新parquet文件暴露给查询,所以它有可能看不到最新的数据,并保证与非Hudi列式数据集相比,具有相同的列式查询性能

  • 增量视图 : 对该视图的查询只能看到从某个提交/压缩后写入数据集的新数据。该视图有效地提供了更改流,来支持增量数据管道。

  • 实时视图 : 在此视图上的查询将查看某个增量提交操作中数据集的最新快照。该视图通过动态合并最新的基本文件和增量文件来提供近实时数据集。

MOR模式

如果使用MOR模式写入数据会在hive的dwd库下面生成两张表。分别是test_ro 和 test_rt test_rt表支持:快照视图和增量视图查询 test_ro表支持:读优化视图查询

使用saprk查询

spark-shell --master yarn \
--driver-memory 1G \
--num-executors 1 \
--executor-memory 1G \
--executor-cores 1 \
--jars /home/t3cx/apps/hudi/hudi-spark-bundle_2.11-0.5.2-SNAPSHOT.jar \
--conf spark.sql.hive.convertMetastoreParquet=false   '在进行快照视图查询的时候需要添加此配置'

#快照视图
spark.sql("select count(*) from dwd.test_rt").show()
#读优化视图
spark.sql("select count(*) from dwd.test_ro").show()
#增量视图
spark sql暂不支持

使用hive查询

beeline -u jdbc:hive2://.. -n ..  -p ..  \
  --hiveconf hive.stats.autogather=false \
  
 #读优化查询
 select * from dwd.test_ro;
 #快照查询
 select * from dwd.test_rt;
 #增量查询
 set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
 set hoodie.test.consume.mode=INCREMENTAL;
 set hoodie.test.consume.max.commits=3;
 set hoodie.test.consume.start.timestamp=20200427114546;
 select count(*) from  dwd.test_rt where `_hoodie_commit_time` > '20200427114546';
 
 #注意:
#1、hudi中parquet做了shaded,我在测试中发现(CDH6.3.0)下必须加载hudi-hadoop-mr中的parquet-avro包才行,cloudera用户需要必须要重新安装mr所需要的jar
#2、set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat 最好显示设置,否则有可能在某种情况下无法加载到hive.input.formate,即便在create-table的时候已经指定

COW模式

如果使用COW模式写入数据,会在hive的dwd库下面生成一张表,test test表支持:快照视图和增量视图

使用spark查询

spark-shell --master yarn \
--driver-memory 1G \
--num-executors 1 \
--executor-memory 1G \
--executor-cores 1 \
--jars /home/t3cx/apps/hudi/hudi-spark-bundle_2.11-0.5.2-SNAPSHOT.jar \
--conf spark.sql.hive.convertMetastoreParquet=false

#快照视图
spark.sql("select count(*) from dwd.test").show()
//增量视图 无需遍历全部数据,即可获取时间大于20200426140637的数据
import org.apache.hudi.DataSourceReadOptions
val hoodieIncViewDF = spark.read.format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20200426140637").load("hdfs://..../t3_trip_t_business15")
spark.sql("select count(*) from dwd.test_rt where _hoodie_commit_time>'20200426140637'").show()  

使用hive查询

beeline -u jdbc:hive2://... -n ..  -p ..  \
  --hiveconf hive.stats.autogather=false \
  
  #快照查询
  select count(*) from dwd.test;
  #增量查询
  set hoodie.test.consume.mode=INCREMENTAL;
  set hoodie.test.consume.max.commits=3;
  set hoodie.test.consume.start.timestamp=20200427114546;
  select count(*) from  dwd.test where `_hoodie_commit_time` > '20200427114546';

注意:我们需要把和hudi-hadoop-mr-bundle-0.6.0-SNAPSHOT.jar放在hive辅助路径和hive/lib包下面。 cloud era用户需要在yarn配置界面,配置辅助路径地址(不支持hdfs目录)

实战 | 数据湖中的流式数据摄取之DeltaSteamer

实战 | 数据湖中的流式数据摄取之DeltaSteamer


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

长尾理论

长尾理论

[美]克里斯·安德森 / 中信出版集团股份有限公司 / 2015-8-1 / 59.00元

互联网时代,大众市场不再一统天下,小众市场也可以呼风唤雨。 在《长尾理论》一书中,克里斯·安德森详细阐释了长尾的精华所在,指出商业和文化的未来不在于传统需求曲线上那个代表“畅销商品”的头部,而是那条代表“冷门商品”的经常被人遗忘的长尾。尽管我们仍然对热门商品着迷,但它们对消费者的吸引力已经大不如从前,因为市场已经大大分化。黄金电视节目的收视率几十年来一直在萎缩,若是在七八十年代,现在的一档最......一起来看看 《长尾理论》 这本书的介绍吧!

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具