技术资讯 | Spark SQL优化案例分享

栏目: IT技术 · 发布时间: 3年前

内容简介:01引言

技术资讯 | Spark SQL优化案例分享

01

引言

当前,业务互联网大厂基于Spark计算引擎在大数据离线计算领域所占的比例越来越高。 结合当前我们自身所面临的资源紧张,我们有必要有步骤的将传统的任务从MR计算引擎引擎切换到Spark计算引擎。

技术资讯 | Spark SQL优化案例分享

首先我们从如下两种计算引擎的基本架构图来分析,

分别为:Spark/MapReduce的性能

1.1

MapReduce

技术资讯 | Spark SQL优化案例分享   处理效率低效 :

  • Map/Reduce任务中间结果写磁盘,多个MR之间通过HDFS交换数据,任务调度和启动开销大;

  • 一条 SQL 语句经常被拆分成多个Application,数据在多个Application之间只能通过读写HDFS交换;

  • 无法充分利用内存。

1.2

S park

技术资讯 | Spark SQL优化案例分享  

高效(比MapReduce快几倍到几十倍)

  • 内存计算引擎,提供Cache机制来支持需要反复迭代计算或者多次数据共享,减少数据读取的IO开销,另外为了解决纯内存计算带来的数据可靠性,引入了Checkpoint机制;

  • DAG引擎,减少多次计算之间中间结果写到HDFS的开销;

  • Executor使用线程池模型来减少task启动开销,shuffle过程中避免 不必要的sort操作以及减少磁盘IO操作。

1.3

Spark Engine架构图

技术资讯 | Spark SQL优化案例分享  

  • Spark Core (Spark基本数据结构)

  • Spark Streaming (微批处理)

  • MLib (机器学习)

  • GraphX (图计算)

  • Spark SQL (SQL结构化语言处理)

1.4

Spark SQL 应用场景

技术资讯 | Spark SQL优化案例分享  

Spark SQL的使用场景广泛:传统的数据库,NoSQL数据库,大数据领域的其他存储系统都可以使用Spark SQL访问

02

影响 Spark任务快慢的因素

技术资讯 | Spark SQL优化案例分享

  • 数据量 (GB级 vs TB级)

  • 数据组织形式 (存储结构,压缩算法,数据Schema[Map等复杂结构])

  • 小文件 (很多的KB级,10M级)

  • 内存 (内存偏少,大量溢写)

  • Core个数 (并发量小,shuffle等待)

  • 算法 (聚合因子,过滤条件,SQL组织形式 )

  • 发生大量的Shuffle (可用Broadcast替换)

  • 是否使用缓存 (将经常访问的数据缓存在Executor内存中) 

2.1

Executor

建议不要自己配置Executor个数,使用 动态分配模式

技术资讯 | Spark SQL优化案例分享

概念

根据当前的负载动态的增加或者删除Executor,这样做的好处在于:

在业务组的队列资源 (vcore, memory) 资源恒定情况下,能更好的均衡各个业务的对资源的占用。也就是对于一个计算量较小的任务不用占用太多资源,而对于一个计算量较大的任务,也能从集群中获取相对较多的资源。

而采用指定模式,则会导致任务在获取足够多 (可通过参数设置比例) 的Executor之前一直处于等待状态,而这通常会浪费计算资源。

2.1.1Executor动态分配模型:

技术资讯 | Spark SQL优化案例分享

ExecutorAllocationManager内部会定时根据工作负载计算所需的Executor数量:

  • 如果任务对Executor需求数量大于之前向集群管理器申请的Executor数量,那么向Yarn申请添加Executor;

  • 如果任务对Executor需求数量小于之前向集群管理器申请的Executor数量,那么向Yarn申请取消部分Executor;

  • ExecutorAllocationManager内部还会定时向Yarn申请移除(杀死)过期的Executor。

2.1.1Executor固定分配:

在执行job之前,executor资源申请到的数量要达到 80%(默认),可通过参数:

技术资讯 | Spark SQL优化案例分享

2.2

Core

技术资讯 | Spark SQL优化案例分享

spark.executor.cores (默认值1) ,在默认情况下spark.task.cpus (每个task使用的core个数也为1)

  • 建议executor的cpu core数量设置为2 ~ 3个比较合适

    (同时伴随需要调整 spark.executor.memory);

  • 在队列有大量任务提交的情况下,还要更少,以免影响其他用户提交的任务因申请不到cpu资源而卡主。

2.3

Memory

2.3.1 统一内存模型

技术资讯 | Spark SQL优化案例分享

2.3.2 spark.executor.memory

建议: 每个Executor的每个core分配的内存设置4g较为合适。用户设置该值的时候需要考虑如下影响因子:

  • 自己使用的executor-memory * num-executor所使用的资源不能超过所提交队列的阈值;

  • 在队列资源共用的模式下,所申请的资源还要更小,以免申请不到资源或者阻塞其他用户的任务;

  • 用户申请的executor-momory不能超过yarn设置的最大值,当前设置的最大值为60g。

Storage Memory 这片内存区域是为了解决:

block cache (Rdd.cache, rdd.persist等方法) ,还有就是broadcasts,以及task results的存储。 可以通过参数设置,如果你大量调用了持久化操作或广播变量,那可以适当调高它; 参数:

spark.storage.memoryFraction (默认值:0.6)

Execution Memory 这片内存区域是为了解决 shuffles,joins, sorts and aggregations 过程中为了避免频繁IO需要的buffer; 参数:

spark.shuffle.memoryFraction (默认值:0.2)

User Memory:应用程序本身执行需要的内存

2.3.2.1 统一内存模型动态调节机制

技术资讯 | Spark SQL优化案例分享

根据应用的不同可自己动态调整,但通常情况下不需要调整,使用默认值即可。上图展示的是Storage内存与Execution的内存动动态调节机制。

2.3.3 spark.executor.memoryOverhead

主要用于JVM自身,字符串, NIO Buffer等开销:

  • 拉取远端的RDD Block;

  • RDD.persist(StorageLevel.DISK_ONLY)

RDD.persit(StorageLevel.MEMORY_AND_DISK)

RDD.persist(StorageLevel.MEMORY_AND_DISK_SER)

等含有disk level的cache RDD操作

对于Driver: 拉取Executor端Task Result数据回Driver节点时, 此处消耗的DirectMemory内存 = conf.getInt("spark.resultGetter.threads", 4) * TaskResultSize

2.4

Shuffle并行度

技术资讯 | Spark SQL优化案例分享  

2.4.1spark.default.parallelism

该参数用于设置每个stage的默认task数量,这个参数极为重要,如果不设置可能会直接影响你的任务性能。 (只有在处理RDD时才会起作用,对Spark SQL无效)

建议: 500 ~ 1000较为合适。

技术资讯 | Spark SQL优化案例分享

2.4.2spark.sql.shuffle.partitions

用于配置 join 或聚合操作shuffle数据时使用的分区数(则是对sparks SQL专用的设置,目前不用自己设置,使用:

技术资讯 | Spark SQL优化案例分享

2.5

存储结构

目前HADOOP中常用的数据存储结构包括:

Text (行式存储)

CSV (行式存储)

RCFile (列式存储)

ORC (列式存储)

Parquet (列式存储)

目前Spark默认存储的格式为Parquet。 下图展示的是相同数据以不同存储结构存储,存储文件的Size对比:

技术资讯 | Spark SQL优化案例分享  

2.5.1列式存储的好处:

  • 查询的时候不需要扫描全部的数据,而只需要读取每次查询涉及的列,这样可以将I/O消耗降低N倍,另外可以保存每一列的统计信息 (min、max、sum等) ,实现部分的谓词下推;

  • 由于每一列的成员都是同构的,可以针对不同的数据类型使用更高效的数据压缩算法,进一步减小I/O;

  • 由于每一列的成员的同构性,可以使用更加适合CPU pipeline的编码方式,减小CPU的缓存失效;

  • 由于列式存储数据量更小,Spark的Task读取数据的时间更短,不光节省计算资源,还节省存储资源。

技术资讯 | Spark SQL优化案例分享  

建表是可通过如下方式指定存储格式:

CREATE TABLE parquet_table_name (x INT, y STRING) STORED AS PARQUET

2.5.2 向量化读 

列式存储的向量化操作,相对于行式存储一行一行的操作,列式存储可做到一个batch一个batch的操作,这样的操作方式极大的提升了运算性能。

技术资讯 | Spark SQL优化案例分享  

技术资讯 | Spark SQL优化案例分享

从Spark 2.3开始,Spark使用新ORC文件格式的向量化的ORC reader来支持ORC文件。为此,新添加了以下配置。

当spark.sql.orc.impl设置为native,并且spark.sql.orc.enableVectorizedReader设置为true时:

向量化reader用于原生ORC表对于Hive ORC serde表,

当spark.sql.hive.convertMetastoreOrc也设置为true时,使用向量化reader。

原生ORC表 例如:

使用USING ORC子句创建的表

Hive ORC serde表 例如:

使用USING HIVE OPTIONS(fileFormat'ORC')子句创建的表

2.5.2.1 向量化读限制

对于不同的数据存储格式需要满足不同的条件才能使用向量化阅读

ORC格式 需要满足以下条件:

  • 开启spark.sql.orc.enableVectorizedReader: 默认true;

  • 开启spark.sql.codegen.wholeStage: 默认true, 并且其scheme的长度不大于wholeStageMaxNumFields(默认100列), 参数:spark.sql.codegen.maxFields,可设置;

  • [关键]所有列数据类型需要为AtomicType类型的。

Parquet格式 需要满足以下条件:

  • 开启spark.sql.orc.enableVectorizedReader: 默认true;

  • 开启spark.sql.codegen.wholeStage: 默认true;

  • [关键]所有列数据类型需要为AtomicType类型的。

AtomicType 是指:

Map,Array,Object,UDT等复杂结构体之外的类型。

2.6

压缩方式 (gzip, bzip2, lzo, snappy)

技术资讯 | Spark SQL优化案例分享  

技术资讯 | Spark SQL优化案例分享  

通过上述两张图得出以下结论:

  • CSV,Text的低效性;

  • 压缩比不压缩性能高;

  • 不同压缩格式性能有所差异 。

2.6.1 snappy压缩前后对比  

Snappy压缩前后比例为3:1

技术资讯 | Spark SQL优化案例分享

2.6.2 压缩优点 

对于Spark任务来说,压缩的数据带来的好处是显而易见的:

  • 大幅节省内存

  • 大幅节省磁盘

  • 大幅节省数据读取时间

2.6.3 设置压缩格式

Text 格式

技术资讯 | Spark SQL优化案例分享

Parquet格式

技术资讯 | Spark SQL优化案例分享

2.7

小文件

所谓小文件,我们定义为小于一个HDFS Block大小一下的文件。

案例1:

技术资讯 | Spark SQL优化案例分享

技术资讯 | Spark SQL优化案例分享

2.7.1 缺点 

  • 小文件太多,导致每个task读取的数据量较小,计算的时间很短;

  • 执行的时间不足以弥补JVM启动的时间;

  • 由于集群中NameNode节点需要维护文件的元数据信息,太多的输出小文件会给集群的NameNode带来巨大的压力;

2.7.2 解决方案 

可通过控制task个数的方式来对输出数据重分区,通过这种方式可以达到减少或者扩大task个数的目的,从而控制输出文件数量。

2.8

控制task个数

2.8.1 Repartition 

使用hint将会使得输入数据进行重新Repartition,调节最终task的个数以及输出文件的个数:

技术资讯 | Spark SQL优化案例分享

通过重分区将减少或者增大分区数量以达到增加或减少task的数量,从而增大或者减少Task输出的文件个数。

案例:

1.没有repartition

技术资讯 | Spark SQL优化案例分享

时间:

2.repartition

技术资讯 | Spark SQL优化案例分享

时间:

2.8.2 控制每个task处理的数据量 

Orc, parquet格式

其他格式

技术资讯 | Spark SQL优化案例分享

通过调整上述参数可调整每个task处理数据的大小,从而调整task的个数。调整这些会对任务的执行性能带来一些改变,也能在一定程度上解决小文件问题。

2.8

下推 (PushDownPredicate)

技术资讯 | Spark SQL优化案例分享   上图中:

  • 方式1从磁盘中读取出所有的数据,在内存中过滤;

  • 方式2,3将过滤从内存中下推到磁盘,在扫描磁盘的数据的时候就过滤掉数据。

概念: 所谓下推是指将过滤尽可能地下沉到数据源端,从而避免从磁盘读取不必要数据。

下推与不下推性能对比 (DataBricks官方)

通过上图可知: 60%的下推比不下推的性能提高了2~18倍

2.9.1 SQL分区裁剪 

技术资讯 | Spark SQL优化案例分享

目前Spark 在第一次读取Hive表数据时,会全量查询一次Hive数据以便拿到元数据信息,这样会list所有分区,这是个社区的bug已经在修复中。

2.9.2 谓词下推的限制 

  • 只有operator 包含的所有expression都是确定性的时候才可以下推, 比如 rand 表达式等等;

  • Filter 的字段必须要在group by 的维度字段里面,举个例子:

①下面的聚合是 可以谓词下推 的:

select a, count(*) as c from t1 group by a  where a ==“1"

②下面的聚合是 不可以谓词下推 的:  

select  count(*) as  c  from t1 where c ==  “10”

select a, count(b) as c   from t1  group by a  where c == “10“

案例

Regexp是 非确定 的,导致下推失败

regexp导致非确定性

确定性

技术资讯 | Spark SQL优化案例分享  

二者的执行计划不同--不带regexp的SQL的执行计划显示在数据源扫描阶段执行过滤数据的操作;而带有regexp的SQL的执行计划显示是先把所有的数据都扫描完了才做过滤。

2.10

广播

对该变量 (表) 进行广播。广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低GC的频率。

可适当调整广播变量大小的阈值,使得稍微大一些的数据也能被广播:

技术资讯 | Spark SQL优化案例分享

技术资讯 | Spark SQL优化案例分享  

案例:

2.10.1 关闭Broadcast 

技术资讯 | Spark SQL优化案例分享  

耗时:

2.10.2 开启Broadcast 

技术资讯 | Spark SQL优化案例分享

耗时:

通过案例可以看出: 开启Broadcast比不开启 B roadcast 程序性能提升 1倍。

2.11

缓存

2.11.1 缓存原则 

将数据缓存在内存中, 遵循的原则:

  • 数据重复使用

  • 重新生成这部分数据的代价昂贵

权衡cache与否的代价,不cache则多次使用同一份数据都需要重新计算一次。Cache则只会计算一次,但是会占用executor的内存资源,那是否应该cache就是把计算RDD,从hdfs上获取数据的时间资源与缓存数据的内存资源之间进行权衡。

技术资讯 | Spark SQL优化案例分享  

技术资讯 | Spark SQL优化案例分享

rdd1,rdd2不需要缓存

技术资讯 | Spark SQL优化案例分享

rdd可以缓存,rdd1,rdd2不需要缓存

2.11.2 使用方式 

对View缓存:

2.12

表结构嵌套

Spark SQL 处理嵌套类型数据时,存在以下问题:

  • 读取大量不必要的数据:

对于 Parquet / ORC 等列式存储格式,可只读取需要的字段,而直接跳过其它字段,从而极大节省 IO。而对于嵌套数据类型的字段,如下图中的 Map 类型的 people 字段,往往只需要读取其中的子字段,如 people.age。却需要将整个 Map 类型的 people 字段全部读取出来然后抽取出 people.age 字段。这会引入大量的无意义的 IO 开销。如果是几百个 Key,这也就意味着 IO 被放大了几十至几百倍。

  • 无法进行向量化读取:

而向量化读能极大的提升性能。但截止到目前,  Spark 不支持包含嵌套数据类型的向量化读取。这极大地影响了包含嵌套数据类型的查询性能。

  • 不支持 Filter 下推:

Spark 不支持嵌套类型字段上的 Filter 的下推。

  • 重复计算:

JSON 字段,在 Spark SQL 中以 String 类型存在,严格来说不算嵌套数据类型。不过实践中也常用于保存不固定的多个字段,在查询时通过 JSON Path 抽取目标子字段,而大型 JSON 字符串的字段抽取非常消耗 CPU。对于热点表,频繁重复抽取相同子字段非常浪费资源。

案例1:

技术资讯 | Spark SQL优化案例分享  

这张表是业务用户的表结构,用户行为数据以Json形式上报。由于表的结构实在太过复杂,Column字段存在大量的Map结构,分析层面很难通过简单的SQL语句来分析这行数据,只能以读取HDFS,在代码层面来做数据分析。

案例2:

用户需要查询某个年龄段的人群,第一张表需要读取people整个struct,而第二张表则只需要基于age过滤。

2.13

SQL标准化

SQL语句中的Column类型一定要与Hive数仓中的表的Column类型保持一致,Spark SQL相对Hive SQL来说对于语法的检查更为严格。

案例

Partition中dayno是String类型,此处是Int类型,导致需要做转换,耗费CPU计算资源。

技术资讯 | Spark SQL优化案例分享

查询值类型与Hive表字段类型一致

技术资讯 | Spark SQL优化案例分享  

耗时

查询值类型与Hive表字段类型不一致

技术资讯 | Spark SQL优化案例分享

    技术资讯 | Spark SQL优化案例分享

二者执行计划不一样,如果字符串不匹配会先去做cast类型转换,然后才比较。

03

结语

通过不断的优化SQL,优化表结构,优化数据存储格式等等措施,一定能让Spark SQL的性能得到极大的提升。

投稿 | 大数据平台

编辑 | sea

排版 | sea

往期推荐

技术资讯 | Spark SQL优化案例分享

技术资讯 | Spark SQL优化案例分享

技术资讯 | Spark SQL优化案例分享

技术资讯 | Spark SQL优化案例分享

技术资讯 | Spark SQL优化案例分享

技术资讯 | Spark SQL优化案例分享

在看点一下 大家都知道


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

九败一胜

九败一胜

李志刚 / 北京联合出版公司 / 2014-9-1 / 42.00元

所有的创业者都面临着很多问题,困惑不是个人的,是有共性的。 除了自身去摸索着石头走路,他们还可以通过学习,从那些在创业路上走得更远的创业者身上学到经验、教训。 这本书的主角——王兴,恰好就是一个很好的学习对象。出生于1979年的王兴,很早就创业了,2004他就开始和同学一块创业,2005年做出了校内网;2007年,他又做出了饭否网——这是中国最早的类似twitter的网站。 ......一起来看看 《九败一胜》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具