对 Spark 中 DAGScheduler 阶段划分的一次探索

栏目: 服务器 · 发布时间: 7年前

内容简介:对 Spark 中 DAGScheduler 阶段划分的一次探索

首发个人公众号  spark技术分享 ,  同步个人网站  coolplayer.net ,未经本人同意,禁止一切转载

几个概念

对 Spark 中 DAGScheduler 阶段划分的一次探索

  • Narrow Dependency指的是 child RDD只依赖于parent RDD(s)固定数量的partition。

  • Wide Dependency指的是child RDD的每一个partition都依赖于parent RDD(s)所有partition。

下文中 窄依赖 就是指 Narrow Dependency, 宽依赖就是指 Wide Dependency,  宽依赖也称为 shuffle Dependency, 下文图中标识 S 打头,例如 S_X。

根据宽依赖和窄依赖, 整个job,会划分为不同的stage, 像是用篱笆隔开了一样, 如果中间有宽依赖,就用刀切一刀, 前后划分为两个 stage。

stage 分为两种, ResultStage 和 ShuffleMapStage, spark job 中产生结果最后一个阶段生成的stage 是ResultStage , 中间阶段生成的stage 是 ShuffleMapStage

属于 ResultStage 的Task都是 ResultTask , 属于 ShuffleMapStage 的Task都是 ShuffleMapTask

依赖链生成过程

spark 中 DAGScheduler 进行阶段划分主要使用以下几个函数,  在调用过程中使用了递归的思想。函数调用关系如下:

creatResultStage               getMissingParentStages
           |                          |
           |                          |
           v                          | 
getOrCreateParentStages <-------------|--------|
          |  获取所有的shuffle依赖      |        |
          v                           |        |
getOrCreateShuffleMapStage<-----------|        |
         |  当前以及所有存在的上游stage都要创建     |                                         
         |                                     |
         v                                     | 
createShuffleMapStage--------------------------|尝试创建上游stage,然后创建自己

我们先看几个函数实现

  • creatResultStage  创建一个 ResultStage,  整个调用的起始点, 会在 finalRDD 上调用 creatResultStage,  加入我们的依赖链条是

A <------------s---------,
                          \
B <--s-- C <--s-- D <--n---`-- E

这样的,就会在 E 这个 RDD 上调用 creatResultStage, 会先创建所有父亲stage, 调用 getOrCreateParentStages, 返回 List[stage]作为父 stage后创建自己 的 stage 作为 ResultStage。

  • getOrCreateParentStages

这个函数调用 getShuffleDependencies 获取到所有的直接 Shuffle Dependecy,  然后遍历去调用getOrCreateShuffleMapStage 创建直接父亲 stage。

举例: getOrCreateParentStages(E), 会获取 E的所有直接 宽依赖 List(S_A, S_C), 然后 对 S_A 和 S_C 调用 getOrCreateShuffleMapStage。

  • getShuffleDependencies

获取所有的直接 getShuffleDependencies ,  getShuffleDependencies(rdd), 一直追溯rdd的依赖直到依赖类型为ShuffleDenpendency,如果碰到 NarrowDependency 就继续往前追溯,这个方法实现了广度遍历的过程。而且它只返回rdd的直属父shuffle依赖,祖先shuffle依赖不返回。

举个例子

A <------------s---------,
                          \
B <--s-- C <--s-- D <--n---`-- E

如果是  getShuffleDependencies(E), 则 返回的是 List(S_A, S_C),  追溯 rdd_E 的依赖, 碰到 S_A 为 ShuffleDenpendency, 加入集合, 碰到 N_D 为 NarrowDependency, 则继续追溯 D 的依赖, 碰到 S_C 为 ShuffleDenpendency, 加入集合, 最终返回 List(S_A, S_C)。

过程如下:

对 Spark 中 DAGScheduler 阶段划分的一次探索

  • getOrCreateShuffleMapStage

创建好的 stage 都会 把 (shuffleId, stage)映射放入 shuffleIdToMapStage, getOrCreateShuffleMapStage 函数中判断如果映射已经在  shuffleIdToMapStage 中了, 直接取出对应的 stage 返回, 如果没有, 则会调用  getMissingAncestorShuffleDependencies, 获取当前 ShuffleDenpendency 的rdd的所有祖先 ShuffleDenpendency, 遍历调用 createShuffleMapStage 创建 上游的 stage,  然后调用 createShuffleMapStage 创建自己的 stage

举例:

A <------------s---------,
                          \
B <--s-- C <--s-- D <--n---`-- E

getOrCreateShuffleMapStage(S_C),  因为 stage 第一次创建,shuffleIdToMapStage 中不存在映射, 所以需要对 rdd_C 调用 getMissingAncestorShuffleDependencies, 获取所有祖先宽依赖 list(S_B),  然后遍历 list(S_B), 对 S_B 调用 createShuffleMapStage, 返回后, 再对自己 S_C 调用 createShuffleMapStage。

  • getMissingAncestorShuffleDependencies

获取一个 rdd的所有祖先 ShuffleDenpendency,  返回一个 祖先 ShuffleDenpendency 的栈, 注意这里栈数据排放的顺序保证了要保证,  直接父亲宽依赖在栈中总是放在后代宽依赖的上面, 直接父亲宽依赖会被先取出创建 stage, 然后映射放入 shuffleIdToMapStage中, 保证了创建后代stage的时候, 后代 stage 总是可以直接从 shuffleIdToMapStage中拿到直接父亲的stage。

例子:

   A <--s---C<---s---E<----s------,
                                    \ 
    B <--s-- D <--s-- F <--s---H--n--`--I

getMissingAncestorShuffleDependencies(I) 最终会返回  stack(S_A, S_C, S_B, S_D, S_E, S_F, S_I), 注意栈的顺序, S_A位于栈顶。

对 Spark 中 DAGScheduler 阶段划分的一次探索

过程如下:

  • getMissingParentStages,  获取所有的直接父亲 stage

整个流程

假如我们有一个 spark job 依赖关系如下,

对 Spark 中 DAGScheduler 阶段划分的一次探索

我们对上图抽象一下, 本质如下:

E <-------n------,
                  \
C <--n---D---n-----F--s---,
                           \
A <-------s------ B <--n----`-- G

还记得函数调用关系么, 我这里再贴一下, 方便对照

creatResultStage               getMissingParentStages
           |                          |
           |                          |
           v                          | 
getOrCreateParentStages <-------------|--------|
          |  获取所有的shuffle依赖      |        |
          v                           |        |
getOrCreateShuffleMapStage<-----------|        |
         |  当前以及所有存在的上游stage都要创建     |                                         
         |                                     |
         v                                     | 
createShuffleMapStage--------------------------|尝试创建上游stage,然后创建自己

我们对着调用关系串起来整个流程

最终结果

  • 对 G 调用 creatResultStage,通过getOrCreateParentStages 获取所有的 parents:List[stage],  作为所有直接父亲stage,创建本身的 ResultStage。 getOrCreateParentStages会先创建上游 stage1 stage2,  然后创建自己的 stage3

  • getOrCreateParentStages 会调用 getShuffleDependencies 获得 rdd_G 所有直接宽依赖 HashSet(S_F, S_A), 然后遍历集合,对  S_F 和 S_A 调用 getOrCreateShuffleMapStage,

  • 对 S_A 调用 getOrCreateShuffleMapStage, shuffleIdToMapStage 中获取判断为None, 对 rdd_A 调用 getMissingAncestorShuffleDependencies, 返回为空, 对 S_A 调用 createShuffleMapStage,  由于rdd_A 没有parent stage 直接就创建 stage1 返回。

  • 对 S_F 调用 getOrCreateShuffleMapStage, shuffleIdToMapStage 中获取判断为None, 对 rdd_F 调用 getMissingAncestorShuffleDependencies, 返回为空, 对 S_F 调用 createShuffleMapStage,  由于rdd_F 没有parent stage 直接就创建 stage2 返回。

  • 把 List(stage1,stage2) 作为 stage3 的 parents stages 创建 stage3

stage parents
S_A stage1 List()
S_F stage2 List()
- ResultStage3 List(stage1, stage2)

ios 用户赞赏通道:

对 Spark 中 DAGScheduler 阶段划分的一次探索


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

RESTful Web Services Cookbook

RESTful Web Services Cookbook

Subbu Allamaraju / Yahoo Press / 2010-3-11 / USD 39.99

While the REST design philosophy has captured the imagination of web and enterprise developers alike, using this approach to develop real web services is no picnic. This cookbook includes more than 10......一起来看看 《RESTful Web Services Cookbook》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具